From acceb762411c1af665c4c63fd2d90ab332316f26 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 6 Jul 2021 15:53:37 +0200 Subject: [PATCH 1/4] HTTPRequestStateMachine buffers `channelRead`s until `channelReadComplete` --- .../HTTPRequestStateMachine.swift | 291 ++++++++++++------ .../HTTPRequestStateMachineTests+XCTest.swift | 5 +- .../HTTPRequestStateMachineTests.swift | 177 ++++++----- 3 files changed, 297 insertions(+), 176 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 0e30e0fd1..eef8b77ca 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -36,6 +36,8 @@ struct HTTPRequestStateMachine { case finished /// The request has failed case failed(Error) + + case modifying } /// A sub state for a running request. More specifically for sending a request body. @@ -58,17 +60,19 @@ struct HTTPRequestStateMachine { } fileprivate enum ResponseState { - /// A sub state for receiving a response. Stores whether the consumer has either signaled demand for more data or - /// is busy consuming the so far forwarded bytes + /// A sub state for receiving a response. Stores whether the consumer has either signaled demand and whether the + /// channel has issued `read` events. enum ConsumerControlState { - /// the state machine is in this state once it has passed down a request head or body part. If a read event - /// occurs while in this state, the readPending flag will be set true. If the consumer signals more demand - /// by invoking `forwardMoreBodyParts`, the state machine will forward the read event. - case downstreamIsConsuming(readPending: Bool) - /// the state machine is in this state once the consumer has signaled more demand by invoking - /// `forwardMoreBodyParts`. If a read event occurs in this state the read event will be forwarded - /// immediately. - case downstreamHasDemand + /// The state machines expects further writes to `channelRead`. The writes are appended to the buffer. + case waitingForBytes(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is + /// empty. It is preserved for performance reasons. + case waitingForReadOrDemand(CircularBuffer) + /// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons. + case waitingForRead(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is + /// preserved for performance reasons. + case waitingForDemand(CircularBuffer) } /// A response head has not been received yet. @@ -84,8 +88,9 @@ struct HTTPRequestStateMachine { enum FinalStreamAction { /// Close the connection case close - /// Trigger a read event - case read + /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded, + /// as soon as we wrote the request end onto the wire. + case sendRequestEnd /// Do nothing. This is action is used, if the request failed, before we the request head was written onto the wire. /// This might happen if the request is cancelled, or the request failed the soundness check. case none @@ -93,18 +98,16 @@ struct HTTPRequestStateMachine { case sendRequestHead(HTTPRequestHead, startBody: Bool) case sendBodyPart(IOData) - /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded, - /// as soon as we wrote the request end onto the wire. In this case the succeedRequest property is set. - case sendRequestEnd(succeedRequest: FinalStreamAction?) + case sendRequestEnd case pauseRequestBodyStream case resumeRequestBodyStream case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool) - case forwardResponseBodyPart(ByteBuffer) + case forwardResponseBodyParts(CircularBuffer) case failRequest(Error, FinalStreamAction) - case succeedRequest(FinalStreamAction) + case succeedRequest(FinalStreamAction, CircularBuffer) case read case wait @@ -168,6 +171,9 @@ struct HTTPRequestStateMachine { self.state = .running(requestState, responseState) return .resumeRequestBodyStream + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -191,32 +197,9 @@ struct HTTPRequestStateMachine { ) self.state = .running(requestState, responseState) return .pauseRequestBodyStream - } - } - mutating func readEventCaught() -> Action { - switch self.state { - case .initialized, - .waitForChannelToBecomeWritable, - .running(_, .waitingForHead), - .running(_, .endReceived), - .finished, - .failed: - // If we are not in the middle of streaming the response body, we always want to get - // more data... - return .read - case .running(_, .receivingBody(_, .downstreamIsConsuming(readPending: true))): - // We have caught another `read` event already. We don't need to change the state and - // we should continue to wait for the consumer to call `forwardMoreBodyParts` - return .wait - case .running(let requestState, .receivingBody(let responseHead, .downstreamIsConsuming(readPending: false))): - self.state = .running(requestState, .receivingBody(responseHead, .downstreamIsConsuming(readPending: true))) - return .wait - case .running(_, .receivingBody(_, .downstreamHasDemand)): - // The consumer has signaled a demand for more response body bytes. If a `read` is - // caught, we pass it on right away. The state machines does not transition into another - // state. - return .read + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -233,6 +216,8 @@ struct HTTPRequestStateMachine { return .failRequest(error, .close) case .finished, .failed: preconditionFailure("If the request is finished or failed, we expect the connection state machine to remove the request immediately from its state. Thus this state is unreachable.") + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -294,6 +279,9 @@ struct HTTPRequestStateMachine { // We may still receive something, here because of potential race conditions with the // producing thread. return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -312,7 +300,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .waitingForHead) - return .sendRequestEnd(succeedRequest: nil) + return .sendRequestEnd case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .receivingBody(let head, let streamState)): assert(head.status.code < 300) @@ -324,7 +312,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .receivingBody(head, streamState)) - return .sendRequestEnd(succeedRequest: nil) + return .sendRequestEnd case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived): if let expected = expectedBodyLength, expected != sentBodyBytes { @@ -334,7 +322,7 @@ struct HTTPRequestStateMachine { } self.state = .finished - return .sendRequestEnd(succeedRequest: .some(.none)) + return .succeedRequest(.sendRequestEnd, .init()) case .failed: return .wait @@ -350,6 +338,9 @@ struct HTTPRequestStateMachine { // We may still receive something, here because of potential race conditions with the // producing thread. return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -359,14 +350,20 @@ struct HTTPRequestStateMachine { let error = HTTPClientError.cancelled self.state = .failed(error) return .failRequest(error, .none) + case .running: let error = HTTPClientError.cancelled self.state = .failed(error) return .failRequest(error, .close) + case .finished: return .wait + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -376,22 +373,109 @@ struct HTTPRequestStateMachine { let error = HTTPClientError.remoteConnectionClosed self.state = .failed(error) return .failRequest(error, .none) + case .finished: return .wait + case .failed: // don't overwrite error return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } // MARK: - Response - mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action { + mutating func read() -> Action { + switch self.state { + case .initialized, + .waitForChannelToBecomeWritable, + .running(_, .waitingForHead), + .running(_, .endReceived), + .finished, + .failed: + // If we are not in the middle of streaming the response body, we always want to get + // more data... + return .read + + case .running(_, .receivingBody(_, .waitingForBytes)): + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return .read + + case .running(let requestState, .receivingBody(let head, .waitingForReadOrDemand(let buffer))): + self.state = .running(requestState, .receivingBody(head, .waitingForDemand(buffer))) + return .wait + + case .running(let requestState, .receivingBody(let head, .waitingForRead(let buffer))): + self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) + return .read + + case .running(_, .receivingBody(_, .waitingForDemand)): + // we have already received a read event. We will issue it as soon as we received demand + // from the consumer + return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func channelRead(_ part: HTTPClientResponsePart) -> Action { + switch part { + case .head(let head): + return self.receivedHTTPResponseHead(head) + case .body(let body): + return self.receivedHTTPResponseBodyPart(body) + case .end: + return self.receivedHTTPResponseEnd() + } + } + + mutating func channelReadComplete() -> Action { + switch self.state { + case .initialized, + .waitForChannelToBecomeWritable, + .running(_, .waitingForHead), + .running(_, .endReceived), + .finished, + .failed: + return .wait + + case .running(let requestState, .receivingBody(let head, .waitingForBytes(let buffer))): + var newBuffer = buffer + newBuffer.removeAll(keepingCapacity: true) + self.state = .running(requestState, .receivingBody(head, .waitingForReadOrDemand(newBuffer))) + return .forwardResponseBodyParts(buffer) + + case .running(_, .receivingBody(_, .waitingForDemand)), + .running(_, .receivingBody(_, .waitingForRead)), + .running(_, .receivingBody(_, .waitingForReadOrDemand)): + preconditionFailure( + "Expect to receive `channelReadComplete` events only if we are in state: `.waitingForBytes`. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + private mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action { guard head.status.code >= 200 else { // we ignore any leading 1xx headers... No state change needed. return .wait } + var expectBody: Bool = false + if let length = head.headers.first(name: "content-length").flatMap({ Int($0) }) { + if length > 0 { + expectBody = true + } + } else if head.headers.contains(name: "transfer-encoding") { + expectBody = true + } + switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves") @@ -399,7 +483,7 @@ struct HTTPRequestStateMachine { case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead): self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) @@ -407,29 +491,32 @@ struct HTTPRequestStateMachine { if head.status.code >= 300 { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) ) return .forwardResponseHead(head, pauseRequestBodyStream: true) } else { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .producing), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) } case .running(.endSent, .waitingForHead): - self.state = .running(.endSent, .receivingBody(head, .downstreamIsConsuming(readPending: false))) + self.state = .running(.endSent, .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0)))) return .forwardResponseHead(head, pauseRequestBodyStream: false) case .running(_, .receivingBody), .running(_, .endReceived), .finished: preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)") case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } - mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action { + private mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action { switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)") @@ -437,23 +524,29 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response body, if we haven't received a head. Invalid state: \(self.state)") - case .running(let requestState, .receivingBody(let head, .downstreamHasDemand)): - self.state = .running(requestState, .receivingBody(head, .downstreamIsConsuming(readPending: false))) - return .forwardResponseBodyPart(body) + case .running(let requestState, .receivingBody(let head, .waitingForBytes(var buffer))): + self.state = .modifying + buffer.append(body) + self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) + return .wait - case .running(_, .receivingBody(_, .downstreamIsConsuming)): - // the state doesn't need to be changed. we are already in the correct state. - // just forward the data. - return .forwardResponseBodyPart(body) + case .running(_, .receivingBody(_, .waitingForRead)), + .running(_, .receivingBody(_, .waitingForDemand)), + .running(_, .receivingBody(_, .waitingForReadOrDemand)): + preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") case .running(_, .endReceived), .finished: preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)") + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } - mutating func receivedHTTPResponseEnd() -> Action { + private mutating func receivedHTTPResponseEnd() -> Action { switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)") @@ -461,74 +554,71 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)") - case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, let consumerState)) where head.status.code < 300: + case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, .waitingForBytes(let buffer))) where head.status.code < 300: self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), .endReceived ) + return .forwardResponseBodyParts(buffer) - switch consumerState { - case .downstreamHasDemand, .downstreamIsConsuming(readPending: false): - return .wait - case .downstreamIsConsuming(readPending: true): - // If we have a received a read event before, we must ensure that the read event - // eventually gets onto the channel pipeline again. The end of the request gives - // us an opportunity for this clean up task. - // It is very unlikely that we can see this in the real world. If we have swallowed - // a read event we don't expect to receive further data from the channel incl. - // response ends. - - return .read - } - - case .running(.streaming(_, _, let producerState), .receivingBody(let head, _)): + case .running(.streaming(_, _, let producerState), .receivingBody(let head, .waitingForBytes(let buffer))): assert(head.status.code >= 300) assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)") self.state = .finished - return .succeedRequest(.close) - - case .running(.endSent, .receivingBody(_, .downstreamIsConsuming(readPending: true))): - // If we have a received a read event before, we must ensure that the read event - // eventually gets onto the channel pipeline again. The end of the request gives - // us an opportunity for this clean up task. - // It is very unlikely that we can see this in the real world. If we have swallowed - // a read event we don't expect to receive further data from the channel incl. - // response ends. - self.state = .finished - return .succeedRequest(.read) + return .succeedRequest(.close, buffer) - case .running(.endSent, .receivingBody(_, .downstreamIsConsuming(readPending: false))), - .running(.endSent, .receivingBody(_, .downstreamHasDemand)): + case .running(.endSent, .receivingBody(_, .waitingForBytes(let buffer))): self.state = .finished - return .succeedRequest(.none) + return .succeedRequest(.none, buffer) + + case .running(_, .receivingBody(_, .waitingForRead)), + .running(_, .receivingBody(_, .waitingForDemand)), + .running(_, .receivingBody(_, .waitingForReadOrDemand)): + preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") case .running(_, .endReceived), .finished: preconditionFailure("How can we receive a response end, if another one was already received. Invalid state: \(self.state)") + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } - mutating func forwardMoreBodyParts() -> Action { + mutating func demandMoreResponseBodyParts() -> Action { switch self.state { case .initialized, .running(_, .waitingForHead), .waitForChannelToBecomeWritable: preconditionFailure("The response is expected to only ask for more data after the response head was forwarded") - case .running(let requestState, .receivingBody(let head, .downstreamIsConsuming(readPending: false))): - self.state = .running(requestState, .receivingBody(head, .downstreamHasDemand)) - return .wait - case .running(let requestState, .receivingBody(let head, .downstreamIsConsuming(readPending: true))): - self.state = .running(requestState, .receivingBody(head, .downstreamIsConsuming(readPending: false))) + + case .running(let requestState, .receivingBody(let head, .waitingForDemand(let buffer))): + self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) return .read - case .running(_, .receivingBody(_, .downstreamHasDemand)): - // We have received a request for more data before. Normally we only expect one request - // for more data, but a race can come into play here. + + case .running(let requestState, .receivingBody(let head, .waitingForReadOrDemand(let buffer))): + self.state = .running(requestState, .receivingBody(head, .waitingForRead(buffer))) + return .wait + + case .running(_, .receivingBody(_, .waitingForRead)): + // if we are `waitingForRead`, no action needs to be taken. Demand was already signalled + // once we receive the next `read`, we will forward it, right away return .wait + + case .running(_, .receivingBody(_, .waitingForBytes)): + // if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive + // the next channelReadComplete we will forward all buffered data + return .wait + case .running(_, .endReceived), .finished, .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -549,6 +639,9 @@ struct HTTPRequestStateMachine { case .finished, .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -581,6 +674,8 @@ extension HTTPRequestStateMachine: CustomStringConvertible { return "HTTPRequestStateMachine(.finished, isWritable: \(self.isChannelWritable))" case .failed(let error): return "HTTPRequestStateMachine(.failed(\(error)), isWritable: \(self.isChannelWritable))" + case .modifying: + return "HTTPRequestStateMachine(.modifying, isWritable: \(self.isChannelWritable))" } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift index 95b6acdb8..87c4de590 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift @@ -36,13 +36,14 @@ extension HTTPRequestStateMachineTests { ("testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200", testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200), ("testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerSendHeadWithStatus200", testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerSendHeadWithStatus200), ("testRequestIsNotSendUntilChannelIsWritable", testRequestIsNotSendUntilChannelIsWritable), + ("testConnectionBecomesInactiveWhileWaitingForWritable", testConnectionBecomesInactiveWhileWaitingForWritable), ("testResponseReadingWithBackpressure", testResponseReadingWithBackpressure), - ("testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree", testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree), + ("testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly", testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly), ("testCancellingARequestInStateInitializedKeepsTheConnectionAlive", testCancellingARequestInStateInitializedKeepsTheConnectionAlive), ("testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive", testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive), ("testCancellingARequestThatIsSent", testCancellingARequestThatIsSent), ("testRemoteSuddenlyClosesTheConnection", testRemoteSuddenlyClosesTheConnection), - ("testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore", testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore), + ("testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored", testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored), ("testResponseWithStatus1XXAreIgnored", testResponseWithStatus1XXAreIgnored), ("testReadTimeoutThatFiresToLateIsIgnored", testReadTimeoutThatFiresToLateIsIgnored), ("testCancellationThatIsInvokedToLateIsIgnored", testCancellationThatIsInvokedToLateIsIgnored), diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index 1c0bf48cf..5aac21079 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -26,10 +26,11 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelReadComplete(), .wait) } func testPOSTRequestWithWriterBackpressure() { @@ -56,13 +57,14 @@ class HTTPRequestStateMachineTests: XCTestCase { // once we receive a writable event again, we can allow the producer to produce more data XCTAssertEqual(state.writabilityChanged(writable: true), .resumeRequestBodyStream) XCTAssertEqual(state.requestStreamPartReceived(part3), .sendBodyPart(part3)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: nil)) + XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelReadComplete(), .wait) } func testPOSTContentLengthIsTooLong() { @@ -106,15 +108,15 @@ class HTTPRequestStateMachineTests: XCTestCase { let part = IOData.byteBuffer(ByteBuffer(bytes: [0, 1, 2, 3])) XCTAssertEqual(state.requestStreamPartReceived(part), .sendBodyPart(part)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .movedPermanently) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: true)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: true)) XCTAssertEqual(state.writabilityChanged(writable: false), .wait) XCTAssertEqual(state.writabilityChanged(writable: true), .wait) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.close)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") @@ -132,14 +134,14 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(part), .sendBodyPart(part)) XCTAssertEqual(state.writabilityChanged(writable: false), .pauseRequestBodyStream) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .movedPermanently) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) XCTAssertEqual(state.writabilityChanged(writable: true), .wait) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.close)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") @@ -158,14 +160,14 @@ class HTTPRequestStateMachineTests: XCTestCase { // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2), .sendBodyPart(part2)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: .some(.none))) + XCTAssertEqual(state.requestStreamFinished(), .succeedRequest(.sendRequestEnd, .init())) } func testRequestBodyStreamIsContinuedIfServerSendHeadWithStatus200() { @@ -178,15 +180,15 @@ class HTTPRequestStateMachineTests: XCTestCase { // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2), .sendBodyPart(part2)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: nil)) + XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) } func testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200() { @@ -197,10 +199,10 @@ class HTTPRequestStateMachineTests: XCTestCase { let part0 = IOData.byteBuffer(ByteBuffer(bytes: 0...3)) XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) @@ -216,14 +218,14 @@ class HTTPRequestStateMachineTests: XCTestCase { let part0 = IOData.byteBuffer(ByteBuffer(bytes: 0...3)) XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) XCTAssertEqual(state.requestStreamFinished(), .failRequest(HTTPClientError.bodyLengthMismatch, .close)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .wait) } func testRequestIsNotSendUntilChannelIsWritable() { @@ -231,17 +233,25 @@ class HTTPRequestStateMachineTests: XCTestCase { let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .wait) - XCTAssertEqual(state.readEventCaught(), .read) + XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.writabilityChanged(writable: true), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .wait) } + func testConnectionBecomesInactiveWhileWaitingForWritable() { + var state = HTTPRequestStateMachine(isChannelWritable: false) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .wait) + XCTAssertEqual(state.channelInactive(), .failRequest(HTTPClientError.remoteConnectionClosed, .none)) + } + func testResponseReadingWithBackpressure() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") @@ -249,42 +259,55 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) let part1 = ByteBuffer(bytes: 4...7) let part2 = ByteBuffer(bytes: 8...11) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part1), .forwardResponseBodyPart(part1)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.readEventCaught(), .wait, "Expected to be able to consume a second read event") - XCTAssertEqual(state.forwardMoreBodyParts(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part2), .forwardResponseBodyPart(part2)) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.readEventCaught(), .read) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0, part1]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.read(), .wait, "Expected to be able to consume a second read event") + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) } - func testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree() { + func testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) let part1 = ByteBuffer(bytes: 4...7) let part2 = ByteBuffer(bytes: 8...11) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part1), .forwardResponseBodyPart(part1)) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait, "Calling forward more bytes twice is okay") - XCTAssertEqual(state.readEventCaught(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part2), .forwardResponseBodyPart(part2)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.read)) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part1]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait, "Calling forward more bytes twice is okay") + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.read(), .read, "Calling `read` while we wait for a channelReadComplete doesn't crash") + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait, "Calling `demandMoreResponseBodyParts` while we wait for a channelReadComplete doesn't crash") + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) } func testCancellingARequestInStateInitializedKeepsTheConnectionAlive() { @@ -317,21 +340,21 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(.byteBuffer(.init(bytes: 1...3))), .wait) } - func testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore() { + func testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) XCTAssertEqual(state.idleReadTimeoutTriggered(), .failRequest(HTTPClientError.readTimeout, .close)) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(ByteBuffer(bytes: 4...7)), .wait) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(ByteBuffer(bytes: 8...11)), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.body(ByteBuffer(bytes: 4...7))), .wait) + XCTAssertEqual(state.channelRead(.body(ByteBuffer(bytes: 8...11))), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .wait) } func testResponseWithStatus1XXAreIgnored() { @@ -341,11 +364,13 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) + XCTAssertEqual(state.channelRead(.head(continueHead)), .wait) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) } func testReadTimeoutThatFiresToLateIsIgnored() { @@ -354,12 +379,9 @@ class HTTPRequestStateMachineTests: XCTestCase { let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) - let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) - let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) XCTAssertEqual(state.idleReadTimeoutTriggered(), .wait, "A read timeout that fires to late must be ignored") } @@ -369,12 +391,9 @@ class HTTPRequestStateMachineTests: XCTestCase { let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) - let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) - let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored") } @@ -394,10 +413,12 @@ extension HTTPRequestStateMachine.Action: Equatable { switch (lhs, rhs) { case (.sendRequestHead(let lhsHead, let lhsStartBody), .sendRequestHead(let rhsHead, let rhsStartBody)): return lhsHead == rhsHead && lhsStartBody == rhsStartBody + case (.sendBodyPart(let lhsData), .sendBodyPart(let rhsData)): return lhsData == rhsData - case (.sendRequestEnd(let lhsFinalAction), .sendRequestEnd(let rhsFinalAction)): - return lhsFinalAction == rhsFinalAction + + case (.sendRequestEnd, .sendRequestEnd): + return true case (.pauseRequestBodyStream, .pauseRequestBodyStream): return true @@ -406,18 +427,22 @@ extension HTTPRequestStateMachine.Action: Equatable { case (.forwardResponseHead(let lhsHead, let lhsPauseRequestBodyStream), .forwardResponseHead(let rhsHead, let rhsPauseRequestBodyStream)): return lhsHead == rhsHead && lhsPauseRequestBodyStream == rhsPauseRequestBodyStream - case (.forwardResponseBodyPart(let lhsData), .forwardResponseBodyPart(let rhsData)): + + case (.forwardResponseBodyParts(let lhsData), .forwardResponseBodyParts(let rhsData)): return lhsData == rhsData - case (.succeedRequest(let lhsFinalAction), .succeedRequest(let rhsFinalAction)): - return lhsFinalAction == rhsFinalAction + case (.succeedRequest(let lhsFinalAction, let lhsFinalBuffer), .succeedRequest(let rhsFinalAction, let rhsFinalBuffer)): + return lhsFinalAction == rhsFinalAction && lhsFinalBuffer == rhsFinalBuffer + case (.failRequest(_, let lhsFinalAction), .failRequest(_, let rhsFinalAction)): return lhsFinalAction == rhsFinalAction case (.read, .read): return true + case (.wait, .wait): return true + default: return false } From 96fbbbdf3d70a4a6c30517fe180ed595341b94e4 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 6 Jul 2021 17:54:25 +0200 Subject: [PATCH 2/4] Extracting into extra state machine --- .../HTTPRequestStateMachine+Demand.swift | 149 +++++++++++++ .../HTTPRequestStateMachine.swift | 197 +++++++++--------- 2 files changed, 253 insertions(+), 93 deletions(-) create mode 100644 Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift new file mode 100644 index 000000000..afad6f51d --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift @@ -0,0 +1,149 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +extension HTTPRequestStateMachine { + /// A sub state for receiving a response events. Stores whether the consumer has either signaled demand and whether the + /// channel has issued `read` events. + struct ResponseStreamState { + private enum State { + /// The state machines expects further writes to `channelRead`. The writes are appended to the buffer. + case waitingForBytes(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is + /// empty. It is preserved for performance reasons. + case waitingForReadOrDemand(CircularBuffer) + /// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons. + case waitingForRead(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is + /// preserved for performance reasons. + case waitingForDemand(CircularBuffer) + + case modifying + } + + enum Action { + case read + case wait + } + + private var state: State + + init(expectingBody: Bool) { + self.state = .waitingForBytes(CircularBuffer(initialCapacity: expectingBody ? 16 : 0)) + } + + mutating func receivedBodyPart(_ body: ByteBuffer) { + switch self.state { + case .waitingForBytes(var buffer): + self.state = .modifying + buffer.append(body) + self.state = .waitingForBytes(buffer) + + case .waitingForRead, + .waitingForDemand, + .waitingForReadOrDemand: + preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func channelReadComplete() -> CircularBuffer { + switch self.state { + case .waitingForBytes(let buffer): + var newBuffer = buffer + newBuffer.removeAll(keepingCapacity: true) + self.state = .waitingForReadOrDemand(newBuffer) + return buffer + + case .waitingForRead, + .waitingForDemand, + .waitingForReadOrDemand: + preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func demandMoreResponseBodyParts() -> Action { + switch self.state { + case .waitingForDemand(let buffer): + self.state = .waitingForBytes(buffer) + return .read + + case .waitingForReadOrDemand(let buffer): + self.state = .waitingForRead(buffer) + return .wait + + case .waitingForRead: + // if we are `waitingForRead`, no action needs to be taken. Demand was already signalled + // once we receive the next `read`, we will forward it, right away + return .wait + + case .waitingForBytes: + // if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive + // the next channelReadComplete we will forward all buffered data + return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func read() -> Action { + switch self.state { + case .waitingForBytes: + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return .read + + case .waitingForReadOrDemand(let buffer): + self.state = .waitingForDemand(buffer) + return .wait + + case .waitingForRead(let buffer): + self.state = .waitingForBytes(buffer) + return .read + + case .waitingForDemand: + // we have already received a read event. We will issue it as soon as we received demand + // from the consumer + return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func end() -> CircularBuffer { + switch self.state { + case .waitingForBytes(let buffer): + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return buffer + + case .waitingForReadOrDemand, + .waitingForRead, + .waitingForDemand: + preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index eef8b77ca..1531a612f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -60,25 +60,10 @@ struct HTTPRequestStateMachine { } fileprivate enum ResponseState { - /// A sub state for receiving a response. Stores whether the consumer has either signaled demand and whether the - /// channel has issued `read` events. - enum ConsumerControlState { - /// The state machines expects further writes to `channelRead`. The writes are appended to the buffer. - case waitingForBytes(CircularBuffer) - /// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is - /// empty. It is preserved for performance reasons. - case waitingForReadOrDemand(CircularBuffer) - /// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons. - case waitingForRead(CircularBuffer) - /// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is - /// preserved for performance reasons. - case waitingForDemand(CircularBuffer) - } - /// A response head has not been received yet. case waitingForHead /// A response head has been received and we are ready to consume more data off the wire - case receivingBody(HTTPResponseHead, ConsumerControlState) + case receivingBody(HTTPResponseHead, ResponseStreamState) /// A response end has been received. We don't expect more bytes from the wire. case endReceived } @@ -400,23 +385,14 @@ struct HTTPRequestStateMachine { // more data... return .read - case .running(_, .receivingBody(_, .waitingForBytes)): + case .running(let requestState, .receivingBody(let head, var streamState)): // This should never happen. But we don't want to precondition this behavior. Let's just // pass the read event on - return .read - - case .running(let requestState, .receivingBody(let head, .waitingForReadOrDemand(let buffer))): - self.state = .running(requestState, .receivingBody(head, .waitingForDemand(buffer))) - return .wait - - case .running(let requestState, .receivingBody(let head, .waitingForRead(let buffer))): - self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) - return .read - - case .running(_, .receivingBody(_, .waitingForDemand)): - // we have already received a read event. We will issue it as soon as we received demand - // from the consumer - return .wait + return self.avoidingStateMachineCoW { (state) -> Action in + let action = streamState.read() + state = .running(requestState, .receivingBody(head, streamState)) + return action.toRequestAction() + } case .modifying: preconditionFailure("Invalid state: \(self.state)") @@ -444,17 +420,14 @@ struct HTTPRequestStateMachine { .failed: return .wait - case .running(let requestState, .receivingBody(let head, .waitingForBytes(let buffer))): - var newBuffer = buffer - newBuffer.removeAll(keepingCapacity: true) - self.state = .running(requestState, .receivingBody(head, .waitingForReadOrDemand(newBuffer))) - return .forwardResponseBodyParts(buffer) - - case .running(_, .receivingBody(_, .waitingForDemand)), - .running(_, .receivingBody(_, .waitingForRead)), - .running(_, .receivingBody(_, .waitingForReadOrDemand)): - preconditionFailure( - "Expect to receive `channelReadComplete` events only if we are in state: `.waitingForBytes`. Invalid state: \(self.state)") + case .running(let requestState, .receivingBody(let head, var streamState)): + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return self.avoidingStateMachineCoW { (state) -> Action in + let buffer = streamState.channelReadComplete() + state = .running(requestState, .receivingBody(head, streamState)) + return .forwardResponseBodyParts(buffer) + } case .modifying: preconditionFailure("Invalid state: \(self.state)") @@ -467,13 +440,13 @@ struct HTTPRequestStateMachine { return .wait } - var expectBody: Bool = false + var expectingBody: Bool = false if let length = head.headers.first(name: "content-length").flatMap({ Int($0) }) { if length > 0 { - expectBody = true + expectingBody = true } } else if head.headers.contains(name: "transfer-encoding") { - expectBody = true + expectingBody = true } switch self.state { @@ -483,7 +456,7 @@ struct HTTPRequestStateMachine { case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead): self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) + .receivingBody(head, .init(expectingBody: expectingBody)) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) @@ -491,19 +464,19 @@ struct HTTPRequestStateMachine { if head.status.code >= 300 { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) + .receivingBody(head, .init(expectingBody: expectingBody)) ) return .forwardResponseHead(head, pauseRequestBodyStream: true) } else { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .producing), - .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0))) + .receivingBody(head, .init(expectingBody: expectingBody)) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) } case .running(.endSent, .waitingForHead): - self.state = .running(.endSent, .receivingBody(head, .waitingForBytes(CircularBuffer(initialCapacity: expectBody ? 8 : 0)))) + self.state = .running(.endSent, .receivingBody(head, .init(expectingBody: expectingBody))) return .forwardResponseHead(head, pauseRequestBodyStream: false) case .running(_, .receivingBody), .running(_, .endReceived), .finished: @@ -516,7 +489,7 @@ struct HTTPRequestStateMachine { } } - private mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action { + mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action { switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)") @@ -524,16 +497,12 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response body, if we haven't received a head. Invalid state: \(self.state)") - case .running(let requestState, .receivingBody(let head, .waitingForBytes(var buffer))): - self.state = .modifying - buffer.append(body) - self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) - return .wait - - case .running(_, .receivingBody(_, .waitingForRead)), - .running(_, .receivingBody(_, .waitingForDemand)), - .running(_, .receivingBody(_, .waitingForReadOrDemand)): - preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + case .running(let requestState, .receivingBody(let head, var responseStreamState)): + return self.avoidingStateMachineCoW { (state) -> Action in + responseStreamState.receivedBodyPart(body) + state = .running(requestState, .receivingBody(head, responseStreamState)) + return .wait + } case .running(_, .endReceived), .finished: preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)") @@ -554,27 +523,34 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)") - case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, .waitingForBytes(let buffer))) where head.status.code < 300: - self.state = .running( - .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), - .endReceived - ) - return .forwardResponseBodyParts(buffer) + case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, var responseStreamState)) + where head.status.code < 300: - case .running(.streaming(_, _, let producerState), .receivingBody(let head, .waitingForBytes(let buffer))): + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .running( + .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), + .endReceived + ) + return .forwardResponseBodyParts(remainingBuffer) + } + + case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)): assert(head.status.code >= 300) assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)") - self.state = .finished - return .succeedRequest(.close, buffer) - case .running(.endSent, .receivingBody(_, .waitingForBytes(let buffer))): - self.state = .finished - return .succeedRequest(.none, buffer) + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .finished + return .succeedRequest(.close, remainingBuffer) + } - case .running(_, .receivingBody(_, .waitingForRead)), - .running(_, .receivingBody(_, .waitingForDemand)), - .running(_, .receivingBody(_, .waitingForReadOrDemand)): - preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + case .running(.endSent, .receivingBody(_, var responseStreamState)): + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .finished + return .succeedRequest(.none, remainingBuffer) + } case .running(_, .endReceived), .finished: preconditionFailure("How can we receive a response end, if another one was already received. Invalid state: \(self.state)") @@ -594,23 +570,12 @@ struct HTTPRequestStateMachine { .waitForChannelToBecomeWritable: preconditionFailure("The response is expected to only ask for more data after the response head was forwarded") - case .running(let requestState, .receivingBody(let head, .waitingForDemand(let buffer))): - self.state = .running(requestState, .receivingBody(head, .waitingForBytes(buffer))) - return .read - - case .running(let requestState, .receivingBody(let head, .waitingForReadOrDemand(let buffer))): - self.state = .running(requestState, .receivingBody(head, .waitingForRead(buffer))) - return .wait - - case .running(_, .receivingBody(_, .waitingForRead)): - // if we are `waitingForRead`, no action needs to be taken. Demand was already signalled - // once we receive the next `read`, we will forward it, right away - return .wait - - case .running(_, .receivingBody(_, .waitingForBytes)): - // if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive - // the next channelReadComplete we will forward all buffered data - return .wait + case .running(let requestState, .receivingBody(let head, var responseStreamState)): + return self.avoidingStateMachineCoW { state -> Action in + let action = responseStreamState.demandMoreResponseBodyParts() + state = .running(requestState, .receivingBody(head, responseStreamState)) + return action.toRequestAction() + } case .running(_, .endReceived), .finished, @@ -661,6 +626,52 @@ struct HTTPRequestStateMachine { } } +extension HTTPRequestStateMachine { + /// So, uh...this function needs some explaining. + /// + /// While the state machine logic above is great, there is a downside to having all of the state machine data in + /// associated data on enumerations: any modification of that data will trigger copy on write for heap-allocated + /// data. That means that for _every operation on the state machine_ we will CoW our underlying state, which is + /// not good. + /// + /// The way we can avoid this is by using this helper function. It will temporarily set state to a value with no + /// associated data, before attempting the body of the function. It will also verify that the state machine never + /// remains in this bad state. + /// + /// A key note here is that all callers must ensure that they return to a good state before they exit. + /// + /// Sadly, because it's generic and has a closure, we need to force it to be inlined at all call sites, which is + /// not ideal. + @inline(__always) + private mutating func avoidingStateMachineCoW(_ body: (inout State) -> ReturnType) -> ReturnType { + self.state = .modifying + defer { + assert(!self.isModifying) + } + + return body(&self.state) + } + + private var isModifying: Bool { + if case .modifying = self.state { + return true + } else { + return false + } + } +} + +extension HTTPRequestStateMachine.ResponseStreamState.Action { + func toRequestAction() -> HTTPRequestStateMachine.Action { + switch self { + case .read: + return .read + case .wait: + return .wait + } + } +} + extension HTTPRequestStateMachine: CustomStringConvertible { var description: String { switch self.state { From f64ae5f3436071ed98e92a97499d22d2e13ffc5f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 7 Jul 2021 12:24:43 +0200 Subject: [PATCH 3/4] Fixes a bug --- .../HTTPRequestStateMachine+Demand.swift | 15 +++++++---- .../HTTPRequestStateMachine.swift | 6 ++++- .../HTTPRequestStateMachineTests+XCTest.swift | 1 + .../HTTPRequestStateMachineTests.swift | 27 +++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift index afad6f51d..7908fb8c4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift @@ -61,13 +61,18 @@ extension HTTPRequestStateMachine { } } - mutating func channelReadComplete() -> CircularBuffer { + mutating func channelReadComplete() -> CircularBuffer? { switch self.state { case .waitingForBytes(let buffer): - var newBuffer = buffer - newBuffer.removeAll(keepingCapacity: true) - self.state = .waitingForReadOrDemand(newBuffer) - return buffer + if buffer.isEmpty { + self.state = .waitingForRead(buffer) + return nil + } else { + var newBuffer = buffer + newBuffer.removeAll(keepingCapacity: true) + self.state = .waitingForReadOrDemand(newBuffer) + return buffer + } case .waitingForRead, .waitingForDemand, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 1531a612f..96f55c858 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -426,7 +426,11 @@ struct HTTPRequestStateMachine { return self.avoidingStateMachineCoW { (state) -> Action in let buffer = streamState.channelReadComplete() state = .running(requestState, .receivingBody(head, streamState)) - return .forwardResponseBodyParts(buffer) + if let buffer = buffer { + return .forwardResponseBodyParts(buffer) + } else { + return .wait + } } case .modifying: diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift index 87c4de590..cc8b22a0b 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift @@ -38,6 +38,7 @@ extension HTTPRequestStateMachineTests { ("testRequestIsNotSendUntilChannelIsWritable", testRequestIsNotSendUntilChannelIsWritable), ("testConnectionBecomesInactiveWhileWaitingForWritable", testConnectionBecomesInactiveWhileWaitingForWritable), ("testResponseReadingWithBackpressure", testResponseReadingWithBackpressure), + ("testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar", testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar), ("testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly", testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly), ("testCancellingARequestInStateInitializedKeepsTheConnectionAlive", testCancellingARequestInStateInitializedKeepsTheConnectionAlive), ("testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive", testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive), diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index 5aac21079..8614d9767 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -279,6 +279,33 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) } + func testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar() { + var state = HTTPRequestStateMachine(isChannelWritable: true) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) + + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + let part0 = ByteBuffer(bytes: 0...3) + let part1 = ByteBuffer(bytes: 4...7) + let part2 = ByteBuffer(bytes: 8...11) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0, part1]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([part2]))) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + } + func testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") From a4fba69e475c63028d219edfff8b48dbc1a89737 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 7 Jul 2021 12:51:47 +0200 Subject: [PATCH 4/4] Code review --- .../HTTPRequestStateMachine+Demand.swift | 6 ++---- .../HTTPRequestStateMachine.swift | 17 ++++------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift index 7908fb8c4..0014a6167 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift @@ -40,8 +40,8 @@ extension HTTPRequestStateMachine { private var state: State - init(expectingBody: Bool) { - self.state = .waitingForBytes(CircularBuffer(initialCapacity: expectingBody ? 16 : 0)) + init() { + self.state = .waitingForBytes(CircularBuffer(initialCapacity: 16)) } mutating func receivedBodyPart(_ body: ByteBuffer) { @@ -137,8 +137,6 @@ extension HTTPRequestStateMachine { mutating func end() -> CircularBuffer { switch self.state { case .waitingForBytes(let buffer): - // This should never happen. But we don't want to precondition this behavior. Let's just - // pass the read event on return buffer case .waitingForReadOrDemand, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 96f55c858..424f42d1a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -444,15 +444,6 @@ struct HTTPRequestStateMachine { return .wait } - var expectingBody: Bool = false - if let length = head.headers.first(name: "content-length").flatMap({ Int($0) }) { - if length > 0 { - expectingBody = true - } - } else if head.headers.contains(name: "transfer-encoding") { - expectingBody = true - } - switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves") @@ -460,7 +451,7 @@ struct HTTPRequestStateMachine { case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead): self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .init(expectingBody: expectingBody)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) @@ -468,19 +459,19 @@ struct HTTPRequestStateMachine { if head.status.code >= 300 { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .init(expectingBody: expectingBody)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: true) } else { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .producing), - .receivingBody(head, .init(expectingBody: expectingBody)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) } case .running(.endSent, .waitingForHead): - self.state = .running(.endSent, .receivingBody(head, .init(expectingBody: expectingBody))) + self.state = .running(.endSent, .receivingBody(head, .init())) return .forwardResponseHead(head, pauseRequestBodyStream: false) case .running(_, .receivingBody), .running(_, .endReceived), .finished: