diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift index 9c300b5e6..fcd59bc3d 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift @@ -52,6 +52,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { private var idleReadTimeoutStateMachine: IdleReadStateMachine? private var idleReadTimeoutTimer: Scheduled? + /// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions. + /// We therefore give each timer an ID and increase the ID every time we reset or cancel it. + /// We check in the task if the timer ID has changed in the meantime and do not execute any action if has changed. + private var currentIdleReadTimeoutTimerID: Int = 0 + private let backgroundLogger: Logger private var logger: Logger @@ -253,6 +258,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { let oldRequest = self.request! self.request = nil + self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context) switch finalAction { case .close: @@ -271,6 +277,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { // see comment in the `succeedRequest` case. let oldRequest = self.request! self.request = nil + self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context) switch finalAction { case .close: @@ -292,7 +299,9 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { case .startIdleReadTimeoutTimer(let timeAmount): assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.") + let timerID = self.currentIdleReadTimeoutTimerID self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) } @@ -302,17 +311,19 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { oldTimer.cancel() } + self.currentIdleReadTimeoutTimerID &+= 1 + let timerID = self.currentIdleReadTimeoutTimerID self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) } - case .clearIdleReadTimeoutTimer: if let oldTimer = self.idleReadTimeoutTimer { self.idleReadTimeoutTimer = nil + self.currentIdleReadTimeoutTimerID &+= 1 oldTimer.cancel() } - case .none: break } @@ -465,7 +476,7 @@ struct IdleReadStateMachine { return .resetIdleReadTimeoutTimer(self.timeAmount) case .end: self.state = .responseEndReceived - return .clearIdleReadTimeoutTimer + return .none } case .responseEndReceived: diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index 281287750..48419d01a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -196,11 +196,13 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { case .failRequest(let error, let finalAction): self.request!.fail(error) self.request = nil + self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context) self.runFinalAction(finalAction, context: context) case .succeedRequest(let finalAction, let finalParts): self.request!.succeedRequest(finalParts) self.request = nil + self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context) self.runFinalAction(finalAction, context: context) } } @@ -224,6 +226,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.") self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + guard self.idleReadTimeoutTimer != nil else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) } @@ -234,10 +237,10 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { } self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + guard self.idleReadTimeoutTimer != nil else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) } - case .clearIdleReadTimeoutTimer: if let oldTimer = self.idleReadTimeoutTimer { self.idleReadTimeoutTimer = nil diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift index 7781d0820..8d28c15c4 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift @@ -29,6 +29,7 @@ extension HTTP1ClientChannelHandlerTests { ("testWriteBackpressure", testWriteBackpressure), ("testClientHandlerCancelsRequestIfWeWantToShutdown", testClientHandlerCancelsRequestIfWeWantToShutdown), ("testIdleReadTimeout", testIdleReadTimeout), + ("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled), ("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand), ] } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index 30a0a287c..62e42f94e 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -287,6 +287,56 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { } } + func testIdleReadTimeoutIsCanceledIfRequestIsCanceled() { + let embedded = EmbeddedChannel() + var maybeTestUtils: HTTP1TestTools? + XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection()) + guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/")) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseBackpressureDelegate(eventLoop: embedded.eventLoop) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embedded.eventLoop), + task: .init(eventLoop: embedded.eventLoop, logger: testUtils.logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(idleReadTimeout: .milliseconds(200)), + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + testUtils.connection.executeRequest(requestBag) + + XCTAssertNoThrow(try embedded.receiveHeadAndVerify { + XCTAssertEqual($0.method, .GET) + XCTAssertEqual($0.uri, "/") + XCTAssertEqual($0.headers.first(name: "host"), "localhost") + }) + XCTAssertNoThrow(try embedded.receiveEnd()) + + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) + + XCTAssertEqual(testUtils.readEventHandler.readHitCounter, 0) + embedded.read() + XCTAssertEqual(testUtils.readEventHandler.readHitCounter, 1) + XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead))) + + // canceling the request + requestBag.cancel() + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + + // the idle read timeout should be cleared because we canceled the request + // therefore advancing the time should not trigger a crash + embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250)) + } + func testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand() { let embedded = EmbeddedChannel() var maybeTestUtils: HTTP1TestTools? diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift index 53be47f01..a0facdb65 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift @@ -28,6 +28,7 @@ extension HTTP2ClientRequestHandlerTests { ("testResponseBackpressure", testResponseBackpressure), ("testWriteBackpressure", testWriteBackpressure), ("testIdleReadTimeout", testIdleReadTimeout), + ("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift index c1fbced67..7bbb30105 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift @@ -233,4 +233,56 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { XCTAssertEqual($0 as? HTTPClientError, .readTimeout) } } + + func testIdleReadTimeoutIsCanceledIfRequestIsCanceled() { + let embedded = EmbeddedChannel() + let readEventHandler = ReadEventHitHandler() + let requestHandler = HTTP2ClientRequestHandler(eventLoop: embedded.eventLoop) + XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandlers([readEventHandler, requestHandler])) + XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait()) + let logger = Logger(label: "test") + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/")) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseBackpressureDelegate(eventLoop: embedded.eventLoop) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embedded.eventLoop), + task: .init(eventLoop: embedded.eventLoop, logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(idleReadTimeout: .milliseconds(200)), + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + embedded.write(requestBag, promise: nil) + + XCTAssertNoThrow(try embedded.receiveHeadAndVerify { + XCTAssertEqual($0.method, .GET) + XCTAssertEqual($0.uri, "/") + XCTAssertEqual($0.headers.first(name: "host"), "localhost") + }) + XCTAssertNoThrow(try embedded.receiveEnd()) + + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) + + XCTAssertEqual(readEventHandler.readHitCounter, 0) + embedded.read() + XCTAssertEqual(readEventHandler.readHitCounter, 1) + XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead))) + + // canceling the request + requestBag.cancel() + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + + // the idle read timeout should be cleared because we canceled the request + // therefore advancing the time should not trigger a crash + embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250)) + } }