Skip to content

Commit fcb0f21

Browse files
authored
Buffer channelReads in ResponseStreamState until the next channelReadComplete (#388)
- `ConsumerControlState` was replaced with `ResponseStreamState` in `HTTPRequestStateMachine` - `channelRead`s are buffered up to a `channelReadComplete`. - on `channelReadComplete` all buffered body parts are forwarded to the consumer
1 parent cd7c804 commit fcb0f21

File tree

4 files changed

+493
-186
lines changed

4 files changed

+493
-186
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
17+
extension HTTPRequestStateMachine {
18+
/// A sub state for receiving a response events. Stores whether the consumer has either signaled demand and whether the
19+
/// channel has issued `read` events.
20+
struct ResponseStreamState {
21+
private enum State {
22+
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
23+
case waitingForBytes(CircularBuffer<ByteBuffer>)
24+
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
25+
/// empty. It is preserved for performance reasons.
26+
case waitingForReadOrDemand(CircularBuffer<ByteBuffer>)
27+
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
28+
case waitingForRead(CircularBuffer<ByteBuffer>)
29+
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
30+
/// preserved for performance reasons.
31+
case waitingForDemand(CircularBuffer<ByteBuffer>)
32+
33+
case modifying
34+
}
35+
36+
enum Action {
37+
case read
38+
case wait
39+
}
40+
41+
private var state: State
42+
43+
init() {
44+
self.state = .waitingForBytes(CircularBuffer(initialCapacity: 16))
45+
}
46+
47+
mutating func receivedBodyPart(_ body: ByteBuffer) {
48+
switch self.state {
49+
case .waitingForBytes(var buffer):
50+
self.state = .modifying
51+
buffer.append(body)
52+
self.state = .waitingForBytes(buffer)
53+
54+
case .waitingForRead,
55+
.waitingForDemand,
56+
.waitingForReadOrDemand:
57+
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
58+
59+
case .modifying:
60+
preconditionFailure("Invalid state: \(self.state)")
61+
}
62+
}
63+
64+
mutating func channelReadComplete() -> CircularBuffer<ByteBuffer>? {
65+
switch self.state {
66+
case .waitingForBytes(let buffer):
67+
if buffer.isEmpty {
68+
self.state = .waitingForRead(buffer)
69+
return nil
70+
} else {
71+
var newBuffer = buffer
72+
newBuffer.removeAll(keepingCapacity: true)
73+
self.state = .waitingForReadOrDemand(newBuffer)
74+
return buffer
75+
}
76+
77+
case .waitingForRead,
78+
.waitingForDemand,
79+
.waitingForReadOrDemand:
80+
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
81+
82+
case .modifying:
83+
preconditionFailure("Invalid state: \(self.state)")
84+
}
85+
}
86+
87+
mutating func demandMoreResponseBodyParts() -> Action {
88+
switch self.state {
89+
case .waitingForDemand(let buffer):
90+
self.state = .waitingForBytes(buffer)
91+
return .read
92+
93+
case .waitingForReadOrDemand(let buffer):
94+
self.state = .waitingForRead(buffer)
95+
return .wait
96+
97+
case .waitingForRead:
98+
// if we are `waitingForRead`, no action needs to be taken. Demand was already signalled
99+
// once we receive the next `read`, we will forward it, right away
100+
return .wait
101+
102+
case .waitingForBytes:
103+
// if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive
104+
// the next channelReadComplete we will forward all buffered data
105+
return .wait
106+
107+
case .modifying:
108+
preconditionFailure("Invalid state: \(self.state)")
109+
}
110+
}
111+
112+
mutating func read() -> Action {
113+
switch self.state {
114+
case .waitingForBytes:
115+
// This should never happen. But we don't want to precondition this behavior. Let's just
116+
// pass the read event on
117+
return .read
118+
119+
case .waitingForReadOrDemand(let buffer):
120+
self.state = .waitingForDemand(buffer)
121+
return .wait
122+
123+
case .waitingForRead(let buffer):
124+
self.state = .waitingForBytes(buffer)
125+
return .read
126+
127+
case .waitingForDemand:
128+
// we have already received a read event. We will issue it as soon as we received demand
129+
// from the consumer
130+
return .wait
131+
132+
case .modifying:
133+
preconditionFailure("Invalid state: \(self.state)")
134+
}
135+
}
136+
137+
mutating func end() -> CircularBuffer<ByteBuffer> {
138+
switch self.state {
139+
case .waitingForBytes(let buffer):
140+
return buffer
141+
142+
case .waitingForReadOrDemand,
143+
.waitingForRead,
144+
.waitingForDemand:
145+
preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
146+
147+
case .modifying:
148+
preconditionFailure("Invalid state: \(self.state)")
149+
}
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)