diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift new file mode 100644 index 000000000..0014a6167 --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift @@ -0,0 +1,152 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +extension HTTPRequestStateMachine { + /// A sub state for receiving a response events. Stores whether the consumer has either signaled demand and whether the + /// channel has issued `read` events. + struct ResponseStreamState { + private enum State { + /// The state machines expects further writes to `channelRead`. The writes are appended to the buffer. + case waitingForBytes(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is + /// empty. It is preserved for performance reasons. + case waitingForReadOrDemand(CircularBuffer) + /// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons. + case waitingForRead(CircularBuffer) + /// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is + /// preserved for performance reasons. + case waitingForDemand(CircularBuffer) + + case modifying + } + + enum Action { + case read + case wait + } + + private var state: State + + init() { + self.state = .waitingForBytes(CircularBuffer(initialCapacity: 16)) + } + + mutating func receivedBodyPart(_ body: ByteBuffer) { + switch self.state { + case .waitingForBytes(var buffer): + self.state = .modifying + buffer.append(body) + self.state = .waitingForBytes(buffer) + + case .waitingForRead, + .waitingForDemand, + .waitingForReadOrDemand: + preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func channelReadComplete() -> CircularBuffer? { + switch self.state { + case .waitingForBytes(let buffer): + if buffer.isEmpty { + self.state = .waitingForRead(buffer) + return nil + } else { + var newBuffer = buffer + newBuffer.removeAll(keepingCapacity: true) + self.state = .waitingForReadOrDemand(newBuffer) + return buffer + } + + case .waitingForRead, + .waitingForDemand, + .waitingForReadOrDemand: + preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func demandMoreResponseBodyParts() -> Action { + switch self.state { + case .waitingForDemand(let buffer): + self.state = .waitingForBytes(buffer) + return .read + + case .waitingForReadOrDemand(let buffer): + self.state = .waitingForRead(buffer) + return .wait + + case .waitingForRead: + // if we are `waitingForRead`, no action needs to be taken. Demand was already signalled + // once we receive the next `read`, we will forward it, right away + return .wait + + case .waitingForBytes: + // if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive + // the next channelReadComplete we will forward all buffered data + return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func read() -> Action { + switch self.state { + case .waitingForBytes: + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return .read + + case .waitingForReadOrDemand(let buffer): + self.state = .waitingForDemand(buffer) + return .wait + + case .waitingForRead(let buffer): + self.state = .waitingForBytes(buffer) + return .read + + case .waitingForDemand: + // we have already received a read event. We will issue it as soon as we received demand + // from the consumer + return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func end() -> CircularBuffer { + switch self.state { + case .waitingForBytes(let buffer): + return buffer + + case .waitingForReadOrDemand, + .waitingForRead, + .waitingForDemand: + preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index 0e30e0fd1..424f42d1a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -36,6 +36,8 @@ struct HTTPRequestStateMachine { case finished /// The request has failed case failed(Error) + + case modifying } /// A sub state for a running request. More specifically for sending a request body. @@ -58,23 +60,10 @@ struct HTTPRequestStateMachine { } fileprivate enum ResponseState { - /// A sub state for receiving a response. Stores whether the consumer has either signaled demand for more data or - /// is busy consuming the so far forwarded bytes - enum ConsumerControlState { - /// the state machine is in this state once it has passed down a request head or body part. If a read event - /// occurs while in this state, the readPending flag will be set true. If the consumer signals more demand - /// by invoking `forwardMoreBodyParts`, the state machine will forward the read event. - case downstreamIsConsuming(readPending: Bool) - /// the state machine is in this state once the consumer has signaled more demand by invoking - /// `forwardMoreBodyParts`. If a read event occurs in this state the read event will be forwarded - /// immediately. - case downstreamHasDemand - } - /// A response head has not been received yet. case waitingForHead /// A response head has been received and we are ready to consume more data off the wire - case receivingBody(HTTPResponseHead, ConsumerControlState) + case receivingBody(HTTPResponseHead, ResponseStreamState) /// A response end has been received. We don't expect more bytes from the wire. case endReceived } @@ -84,8 +73,9 @@ struct HTTPRequestStateMachine { enum FinalStreamAction { /// Close the connection case close - /// Trigger a read event - case read + /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded, + /// as soon as we wrote the request end onto the wire. + case sendRequestEnd /// Do nothing. This is action is used, if the request failed, before we the request head was written onto the wire. /// This might happen if the request is cancelled, or the request failed the soundness check. case none @@ -93,18 +83,16 @@ struct HTTPRequestStateMachine { case sendRequestHead(HTTPRequestHead, startBody: Bool) case sendBodyPart(IOData) - /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded, - /// as soon as we wrote the request end onto the wire. In this case the succeedRequest property is set. - case sendRequestEnd(succeedRequest: FinalStreamAction?) + case sendRequestEnd case pauseRequestBodyStream case resumeRequestBodyStream case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool) - case forwardResponseBodyPart(ByteBuffer) + case forwardResponseBodyParts(CircularBuffer) case failRequest(Error, FinalStreamAction) - case succeedRequest(FinalStreamAction) + case succeedRequest(FinalStreamAction, CircularBuffer) case read case wait @@ -168,6 +156,9 @@ struct HTTPRequestStateMachine { self.state = .running(requestState, responseState) return .resumeRequestBodyStream + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -191,32 +182,9 @@ struct HTTPRequestStateMachine { ) self.state = .running(requestState, responseState) return .pauseRequestBodyStream - } - } - mutating func readEventCaught() -> Action { - switch self.state { - case .initialized, - .waitForChannelToBecomeWritable, - .running(_, .waitingForHead), - .running(_, .endReceived), - .finished, - .failed: - // If we are not in the middle of streaming the response body, we always want to get - // more data... - return .read - case .running(_, .receivingBody(_, .downstreamIsConsuming(readPending: true))): - // We have caught another `read` event already. We don't need to change the state and - // we should continue to wait for the consumer to call `forwardMoreBodyParts` - return .wait - case .running(let requestState, .receivingBody(let responseHead, .downstreamIsConsuming(readPending: false))): - self.state = .running(requestState, .receivingBody(responseHead, .downstreamIsConsuming(readPending: true))) - return .wait - case .running(_, .receivingBody(_, .downstreamHasDemand)): - // The consumer has signaled a demand for more response body bytes. If a `read` is - // caught, we pass it on right away. The state machines does not transition into another - // state. - return .read + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -233,6 +201,8 @@ struct HTTPRequestStateMachine { return .failRequest(error, .close) case .finished, .failed: preconditionFailure("If the request is finished or failed, we expect the connection state machine to remove the request immediately from its state. Thus this state is unreachable.") + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -294,6 +264,9 @@ struct HTTPRequestStateMachine { // We may still receive something, here because of potential race conditions with the // producing thread. return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -312,7 +285,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .waitingForHead) - return .sendRequestEnd(succeedRequest: nil) + return .sendRequestEnd case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .receivingBody(let head, let streamState)): assert(head.status.code < 300) @@ -324,7 +297,7 @@ struct HTTPRequestStateMachine { } self.state = .running(.endSent, .receivingBody(head, streamState)) - return .sendRequestEnd(succeedRequest: nil) + return .sendRequestEnd case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived): if let expected = expectedBodyLength, expected != sentBodyBytes { @@ -334,7 +307,7 @@ struct HTTPRequestStateMachine { } self.state = .finished - return .sendRequestEnd(succeedRequest: .some(.none)) + return .succeedRequest(.sendRequestEnd, .init()) case .failed: return .wait @@ -350,6 +323,9 @@ struct HTTPRequestStateMachine { // We may still receive something, here because of potential race conditions with the // producing thread. return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -359,14 +335,20 @@ struct HTTPRequestStateMachine { let error = HTTPClientError.cancelled self.state = .failed(error) return .failRequest(error, .none) + case .running: let error = HTTPClientError.cancelled self.state = .failed(error) return .failRequest(error, .close) + case .finished: return .wait + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -376,17 +358,87 @@ struct HTTPRequestStateMachine { let error = HTTPClientError.remoteConnectionClosed self.state = .failed(error) return .failRequest(error, .none) + case .finished: return .wait + case .failed: // don't overwrite error return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } // MARK: - Response - mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action { + mutating func read() -> Action { + switch self.state { + case .initialized, + .waitForChannelToBecomeWritable, + .running(_, .waitingForHead), + .running(_, .endReceived), + .finished, + .failed: + // If we are not in the middle of streaming the response body, we always want to get + // more data... + return .read + + case .running(let requestState, .receivingBody(let head, var streamState)): + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return self.avoidingStateMachineCoW { (state) -> Action in + let action = streamState.read() + state = .running(requestState, .receivingBody(head, streamState)) + return action.toRequestAction() + } + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func channelRead(_ part: HTTPClientResponsePart) -> Action { + switch part { + case .head(let head): + return self.receivedHTTPResponseHead(head) + case .body(let body): + return self.receivedHTTPResponseBodyPart(body) + case .end: + return self.receivedHTTPResponseEnd() + } + } + + mutating func channelReadComplete() -> Action { + switch self.state { + case .initialized, + .waitForChannelToBecomeWritable, + .running(_, .waitingForHead), + .running(_, .endReceived), + .finished, + .failed: + return .wait + + case .running(let requestState, .receivingBody(let head, var streamState)): + // This should never happen. But we don't want to precondition this behavior. Let's just + // pass the read event on + return self.avoidingStateMachineCoW { (state) -> Action in + let buffer = streamState.channelReadComplete() + state = .running(requestState, .receivingBody(head, streamState)) + if let buffer = buffer { + return .forwardResponseBodyParts(buffer) + } else { + return .wait + } + } + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + + private mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action { guard head.status.code >= 200 else { // we ignore any leading 1xx headers... No state change needed. return .wait @@ -399,7 +451,7 @@ struct HTTPRequestStateMachine { case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead): self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) @@ -407,25 +459,28 @@ struct HTTPRequestStateMachine { if head.status.code >= 300 { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: true) } else { self.state = .running( .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .producing), - .receivingBody(head, .downstreamIsConsuming(readPending: false)) + .receivingBody(head, .init()) ) return .forwardResponseHead(head, pauseRequestBodyStream: false) } case .running(.endSent, .waitingForHead): - self.state = .running(.endSent, .receivingBody(head, .downstreamIsConsuming(readPending: false))) + self.state = .running(.endSent, .receivingBody(head, .init())) return .forwardResponseHead(head, pauseRequestBodyStream: false) case .running(_, .receivingBody), .running(_, .endReceived), .finished: preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)") case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -437,23 +492,25 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response body, if we haven't received a head. Invalid state: \(self.state)") - case .running(let requestState, .receivingBody(let head, .downstreamHasDemand)): - self.state = .running(requestState, .receivingBody(head, .downstreamIsConsuming(readPending: false))) - return .forwardResponseBodyPart(body) - - case .running(_, .receivingBody(_, .downstreamIsConsuming)): - // the state doesn't need to be changed. we are already in the correct state. - // just forward the data. - return .forwardResponseBodyPart(body) + case .running(let requestState, .receivingBody(let head, var responseStreamState)): + return self.avoidingStateMachineCoW { (state) -> Action in + responseStreamState.receivedBodyPart(body) + state = .running(requestState, .receivingBody(head, responseStreamState)) + return .wait + } case .running(_, .endReceived), .finished: preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)") + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } - mutating func receivedHTTPResponseEnd() -> Action { + private mutating func receivedHTTPResponseEnd() -> Action { switch self.state { case .initialized, .waitForChannelToBecomeWritable: preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)") @@ -461,74 +518,67 @@ struct HTTPRequestStateMachine { case .running(_, .waitingForHead): preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)") - case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, let consumerState)) where head.status.code < 300: - self.state = .running( - .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), - .endReceived - ) + case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, var responseStreamState)) + where head.status.code < 300: - switch consumerState { - case .downstreamHasDemand, .downstreamIsConsuming(readPending: false): - return .wait - case .downstreamIsConsuming(readPending: true): - // If we have a received a read event before, we must ensure that the read event - // eventually gets onto the channel pipeline again. The end of the request gives - // us an opportunity for this clean up task. - // It is very unlikely that we can see this in the real world. If we have swallowed - // a read event we don't expect to receive further data from the channel incl. - // response ends. - - return .read + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .running( + .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), + .endReceived + ) + return .forwardResponseBodyParts(remainingBuffer) } - case .running(.streaming(_, _, let producerState), .receivingBody(let head, _)): + case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)): assert(head.status.code >= 300) assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)") - self.state = .finished - return .succeedRequest(.close) - - case .running(.endSent, .receivingBody(_, .downstreamIsConsuming(readPending: true))): - // If we have a received a read event before, we must ensure that the read event - // eventually gets onto the channel pipeline again. The end of the request gives - // us an opportunity for this clean up task. - // It is very unlikely that we can see this in the real world. If we have swallowed - // a read event we don't expect to receive further data from the channel incl. - // response ends. - self.state = .finished - return .succeedRequest(.read) - case .running(.endSent, .receivingBody(_, .downstreamIsConsuming(readPending: false))), - .running(.endSent, .receivingBody(_, .downstreamHasDemand)): - self.state = .finished - return .succeedRequest(.none) + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .finished + return .succeedRequest(.close, remainingBuffer) + } + + case .running(.endSent, .receivingBody(_, var responseStreamState)): + return self.avoidingStateMachineCoW { state -> Action in + let remainingBuffer = responseStreamState.end() + state = .finished + return .succeedRequest(.none, remainingBuffer) + } case .running(_, .endReceived), .finished: preconditionFailure("How can we receive a response end, if another one was already received. Invalid state: \(self.state)") + case .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } - mutating func forwardMoreBodyParts() -> Action { + mutating func demandMoreResponseBodyParts() -> Action { switch self.state { case .initialized, .running(_, .waitingForHead), .waitForChannelToBecomeWritable: preconditionFailure("The response is expected to only ask for more data after the response head was forwarded") - case .running(let requestState, .receivingBody(let head, .downstreamIsConsuming(readPending: false))): - self.state = .running(requestState, .receivingBody(head, .downstreamHasDemand)) - return .wait - case .running(let requestState, .receivingBody(let head, .downstreamIsConsuming(readPending: true))): - self.state = .running(requestState, .receivingBody(head, .downstreamIsConsuming(readPending: false))) - return .read - case .running(_, .receivingBody(_, .downstreamHasDemand)): - // We have received a request for more data before. Normally we only expect one request - // for more data, but a race can come into play here. - return .wait + + case .running(let requestState, .receivingBody(let head, var responseStreamState)): + return self.avoidingStateMachineCoW { state -> Action in + let action = responseStreamState.demandMoreResponseBodyParts() + state = .running(requestState, .receivingBody(head, responseStreamState)) + return action.toRequestAction() + } + case .running(_, .endReceived), .finished, .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -549,6 +599,9 @@ struct HTTPRequestStateMachine { case .finished, .failed: return .wait + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") } } @@ -568,6 +621,52 @@ struct HTTPRequestStateMachine { } } +extension HTTPRequestStateMachine { + /// So, uh...this function needs some explaining. + /// + /// While the state machine logic above is great, there is a downside to having all of the state machine data in + /// associated data on enumerations: any modification of that data will trigger copy on write for heap-allocated + /// data. That means that for _every operation on the state machine_ we will CoW our underlying state, which is + /// not good. + /// + /// The way we can avoid this is by using this helper function. It will temporarily set state to a value with no + /// associated data, before attempting the body of the function. It will also verify that the state machine never + /// remains in this bad state. + /// + /// A key note here is that all callers must ensure that they return to a good state before they exit. + /// + /// Sadly, because it's generic and has a closure, we need to force it to be inlined at all call sites, which is + /// not ideal. + @inline(__always) + private mutating func avoidingStateMachineCoW(_ body: (inout State) -> ReturnType) -> ReturnType { + self.state = .modifying + defer { + assert(!self.isModifying) + } + + return body(&self.state) + } + + private var isModifying: Bool { + if case .modifying = self.state { + return true + } else { + return false + } + } +} + +extension HTTPRequestStateMachine.ResponseStreamState.Action { + func toRequestAction() -> HTTPRequestStateMachine.Action { + switch self { + case .read: + return .read + case .wait: + return .wait + } + } +} + extension HTTPRequestStateMachine: CustomStringConvertible { var description: String { switch self.state { @@ -581,6 +680,8 @@ extension HTTPRequestStateMachine: CustomStringConvertible { return "HTTPRequestStateMachine(.finished, isWritable: \(self.isChannelWritable))" case .failed(let error): return "HTTPRequestStateMachine(.failed(\(error)), isWritable: \(self.isChannelWritable))" + case .modifying: + return "HTTPRequestStateMachine(.modifying, isWritable: \(self.isChannelWritable))" } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift index 95b6acdb8..cc8b22a0b 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift @@ -36,13 +36,15 @@ extension HTTPRequestStateMachineTests { ("testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200", testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200), ("testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerSendHeadWithStatus200", testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerSendHeadWithStatus200), ("testRequestIsNotSendUntilChannelIsWritable", testRequestIsNotSendUntilChannelIsWritable), + ("testConnectionBecomesInactiveWhileWaitingForWritable", testConnectionBecomesInactiveWhileWaitingForWritable), ("testResponseReadingWithBackpressure", testResponseReadingWithBackpressure), - ("testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree", testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree), + ("testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar", testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar), + ("testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly", testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly), ("testCancellingARequestInStateInitializedKeepsTheConnectionAlive", testCancellingARequestInStateInitializedKeepsTheConnectionAlive), ("testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive", testCancellingARequestBeforeBeingSendKeepsTheConnectionAlive), ("testCancellingARequestThatIsSent", testCancellingARequestThatIsSent), ("testRemoteSuddenlyClosesTheConnection", testRemoteSuddenlyClosesTheConnection), - ("testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore", testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore), + ("testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored", testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored), ("testResponseWithStatus1XXAreIgnored", testResponseWithStatus1XXAreIgnored), ("testReadTimeoutThatFiresToLateIsIgnored", testReadTimeoutThatFiresToLateIsIgnored), ("testCancellationThatIsInvokedToLateIsIgnored", testCancellationThatIsInvokedToLateIsIgnored), diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index 1c0bf48cf..8614d9767 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -26,10 +26,11 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelReadComplete(), .wait) } func testPOSTRequestWithWriterBackpressure() { @@ -56,13 +57,14 @@ class HTTPRequestStateMachineTests: XCTestCase { // once we receive a writable event again, we can allow the producer to produce more data XCTAssertEqual(state.writabilityChanged(writable: true), .resumeRequestBodyStream) XCTAssertEqual(state.requestStreamPartReceived(part3), .sendBodyPart(part3)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: nil)) + XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) + XCTAssertEqual(state.channelReadComplete(), .wait) } func testPOSTContentLengthIsTooLong() { @@ -106,15 +108,15 @@ class HTTPRequestStateMachineTests: XCTestCase { let part = IOData.byteBuffer(ByteBuffer(bytes: [0, 1, 2, 3])) XCTAssertEqual(state.requestStreamPartReceived(part), .sendBodyPart(part)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .movedPermanently) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: true)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: true)) XCTAssertEqual(state.writabilityChanged(writable: false), .wait) XCTAssertEqual(state.writabilityChanged(writable: true), .wait) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.close)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") @@ -132,14 +134,14 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(part), .sendBodyPart(part)) XCTAssertEqual(state.writabilityChanged(writable: false), .pauseRequestBodyStream) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .movedPermanently) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) XCTAssertEqual(state.writabilityChanged(writable: true), .wait) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.close)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init())) XCTAssertEqual(state.requestStreamPartReceived(part), .wait, "Expected to drop all stream data after having received a response head, with status >= 300") @@ -158,14 +160,14 @@ class HTTPRequestStateMachineTests: XCTestCase { // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2), .sendBodyPart(part2)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: .some(.none))) + XCTAssertEqual(state.requestStreamFinished(), .succeedRequest(.sendRequestEnd, .init())) } func testRequestBodyStreamIsContinuedIfServerSendHeadWithStatus200() { @@ -178,15 +180,15 @@ class HTTPRequestStateMachineTests: XCTestCase { // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11)) XCTAssertEqual(state.requestStreamPartReceived(part2), .sendBodyPart(part2)) - XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd(succeedRequest: nil)) + XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) } func testRequestIsFailedIfRequestBodySizeIsWrongEvenAfterServerRespondedWith200() { @@ -197,10 +199,10 @@ class HTTPRequestStateMachineTests: XCTestCase { let part0 = IOData.byteBuffer(ByteBuffer(bytes: 0...3)) XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .forwardResponseBodyParts(.init())) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) @@ -216,14 +218,14 @@ class HTTPRequestStateMachineTests: XCTestCase { let part0 = IOData.byteBuffer(ByteBuffer(bytes: 0...3)) XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0)) - // response is comming before having send all data + // response is coming before having send all data let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part1 = IOData.byteBuffer(ByteBuffer(bytes: 4...7)) XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1)) XCTAssertEqual(state.requestStreamFinished(), .failRequest(HTTPClientError.bodyLengthMismatch, .close)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .wait) } func testRequestIsNotSendUntilChannelIsWritable() { @@ -231,17 +233,25 @@ class HTTPRequestStateMachineTests: XCTestCase { let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .wait) - XCTAssertEqual(state.readEventCaught(), .read) + XCTAssertEqual(state.read(), .read) XCTAssertEqual(state.writabilityChanged(writable: true), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let responseBody = ByteBuffer(bytes: [1, 2, 3, 4]) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(responseBody), .forwardResponseBodyPart(responseBody)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.body(responseBody)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([responseBody]))) XCTAssertEqual(state.channelInactive(), .wait) } + func testConnectionBecomesInactiveWhileWaitingForWritable() { + var state = HTTPRequestStateMachine(isChannelWritable: false) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .wait) + XCTAssertEqual(state.channelInactive(), .failRequest(HTTPClientError.remoteConnectionClosed, .none)) + } + func testResponseReadingWithBackpressure() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") @@ -249,42 +259,82 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + let part0 = ByteBuffer(bytes: 0...3) + let part1 = ByteBuffer(bytes: 4...7) + let part2 = ByteBuffer(bytes: 8...11) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0, part1]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.read(), .wait, "Expected to be able to consume a second read event") + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + } + + func testChannelReadCompleteTriggersButNoBodyDataWasReceivedSoFar() { + var state = HTTPRequestStateMachine(isChannelWritable: true) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) + + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) let part1 = ByteBuffer(bytes: 4...7) let part2 = ByteBuffer(bytes: 8...11) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part1), .forwardResponseBodyPart(part1)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.readEventCaught(), .wait, "Expected to be able to consume a second read event") - XCTAssertEqual(state.forwardMoreBodyParts(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part2), .forwardResponseBodyPart(part2)) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.readEventCaught(), .read) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0, part1]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init([part2]))) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) } - func testResponseReadingWithBackpressureEndOfResponseSetsCaughtReadEventFree() { + func testResponseReadingWithBackpressureEndOfResponseAllowsReadEventsToTriggerDirectly() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) let part1 = ByteBuffer(bytes: 4...7) let part2 = ByteBuffer(bytes: 8...11) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part1), .forwardResponseBodyPart(part1)) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait, "Calling forward more bytes twice is okay") - XCTAssertEqual(state.readEventCaught(), .read) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part2), .forwardResponseBodyPart(part2)) - XCTAssertEqual(state.readEventCaught(), .wait) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.read)) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part0]))) + XCTAssertEqual(state.read(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .read) + XCTAssertEqual(state.channelRead(.body(part1)), .wait) + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part1]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait, "Calling forward more bytes twice is okay") + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.body(part2)), .wait) + XCTAssertEqual(state.read(), .read, "Calling `read` while we wait for a channelReadComplete doesn't crash") + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait, "Calling `demandMoreResponseBodyParts` while we wait for a channelReadComplete doesn't crash") + XCTAssertEqual(state.channelReadComplete(), .forwardResponseBodyParts(.init([part2]))) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.read(), .read) } func testCancellingARequestInStateInitializedKeepsTheConnectionAlive() { @@ -317,21 +367,21 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.requestStreamPartReceived(.byteBuffer(.init(bytes: 1...3))), .wait) } - func testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnore() { + func testReadTimeoutLeadsToFailureWithEverythingAfterBeingIgnored() { var state = HTTPRequestStateMachine(isChannelWritable: true) let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")])) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) let part0 = ByteBuffer(bytes: 0...3) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(part0), .forwardResponseBodyPart(part0)) + XCTAssertEqual(state.channelRead(.body(part0)), .wait) XCTAssertEqual(state.idleReadTimeoutTriggered(), .failRequest(HTTPClientError.readTimeout, .close)) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(ByteBuffer(bytes: 4...7)), .wait) - XCTAssertEqual(state.receivedHTTPResponseBodyPart(ByteBuffer(bytes: 8...11)), .wait) - XCTAssertEqual(state.forwardMoreBodyParts(), .wait) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .wait) + XCTAssertEqual(state.channelRead(.body(ByteBuffer(bytes: 4...7))), .wait) + XCTAssertEqual(state.channelRead(.body(ByteBuffer(bytes: 8...11))), .wait) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .wait) } func testResponseWithStatus1XXAreIgnored() { @@ -341,11 +391,13 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) + XCTAssertEqual(state.channelRead(.head(continueHead)), .wait) let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) } func testReadTimeoutThatFiresToLateIsIgnored() { @@ -354,12 +406,9 @@ class HTTPRequestStateMachineTests: XCTestCase { let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) - let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) - let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) XCTAssertEqual(state.idleReadTimeoutTriggered(), .wait, "A read timeout that fires to late must be ignored") } @@ -369,12 +418,9 @@ class HTTPRequestStateMachineTests: XCTestCase { let metadata = RequestFramingMetadata(connectionClose: false, body: .none) XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) - let continueHead = HTTPResponseHead(version: .http1_1, status: .continue) - XCTAssertEqual(state.receivedHTTPResponseHead(continueHead), .wait) - let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) - XCTAssertEqual(state.receivedHTTPResponseHead(responseHead), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) - XCTAssertEqual(state.receivedHTTPResponseEnd(), .succeedRequest(.none)) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.none, .init())) XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored") } @@ -394,10 +440,12 @@ extension HTTPRequestStateMachine.Action: Equatable { switch (lhs, rhs) { case (.sendRequestHead(let lhsHead, let lhsStartBody), .sendRequestHead(let rhsHead, let rhsStartBody)): return lhsHead == rhsHead && lhsStartBody == rhsStartBody + case (.sendBodyPart(let lhsData), .sendBodyPart(let rhsData)): return lhsData == rhsData - case (.sendRequestEnd(let lhsFinalAction), .sendRequestEnd(let rhsFinalAction)): - return lhsFinalAction == rhsFinalAction + + case (.sendRequestEnd, .sendRequestEnd): + return true case (.pauseRequestBodyStream, .pauseRequestBodyStream): return true @@ -406,18 +454,22 @@ extension HTTPRequestStateMachine.Action: Equatable { case (.forwardResponseHead(let lhsHead, let lhsPauseRequestBodyStream), .forwardResponseHead(let rhsHead, let rhsPauseRequestBodyStream)): return lhsHead == rhsHead && lhsPauseRequestBodyStream == rhsPauseRequestBodyStream - case (.forwardResponseBodyPart(let lhsData), .forwardResponseBodyPart(let rhsData)): + + case (.forwardResponseBodyParts(let lhsData), .forwardResponseBodyParts(let rhsData)): return lhsData == rhsData - case (.succeedRequest(let lhsFinalAction), .succeedRequest(let rhsFinalAction)): - return lhsFinalAction == rhsFinalAction + case (.succeedRequest(let lhsFinalAction, let lhsFinalBuffer), .succeedRequest(let rhsFinalAction, let rhsFinalBuffer)): + return lhsFinalAction == rhsFinalAction && lhsFinalBuffer == rhsFinalBuffer + case (.failRequest(_, let lhsFinalAction), .failRequest(_, let rhsFinalAction)): return lhsFinalAction == rhsFinalAction case (.read, .read): return true + case (.wait, .wait): return true + default: return false }