Skip to content

Commit 61cc272

Browse files
committed
Create Task API -> 0.7.0
1 parent 2985121 commit 61cc272

File tree

11 files changed

+873
-686
lines changed

11 files changed

+873
-686
lines changed

AsyncQueue.podspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'AsyncQueue'
3-
s.version = '0.6.1'
3+
s.version = '0.7.0'
44
s.license = 'MIT'
55
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
66
s.homepage = 'https://github.com/dfed/swift-async-queue'

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ func testFIFOQueueOrdering() async {
5151
actor Counter {
5252
nonisolated
5353
func incrementAndAssertCountEquals(_ expectedCount: Int) {
54-
queue.enqueue {
54+
Task(enqueuedOn: queue) {
5555
await self.increment()
5656
let incrementedCount = await self.count
5757
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
5858
}
5959
}
6060

6161
func flushQueue() async {
62-
await queue.enqueueAndWait { }
62+
await Task(enqueuedOn: queue) {}.value
6363
}
6464

6565
func increment() {
@@ -101,14 +101,14 @@ func testActorQueueOrdering() async {
101101

102102
nonisolated
103103
func incrementAndAssertCountEquals(_ expectedCount: Int) {
104-
queue.enqueue { myself in
104+
await Task(enqueuedOn: queue) { myself in
105105
myself.count += 1
106106
XCTAssertEqual(expectedCount, myself.count) // always succeeds
107107
}
108108
}
109109

110110
func flushQueue() async {
111-
await queue.enqueueAndWait { _ in }
111+
await Task(enqueuedOn: queue) {}.value
112112
}
113113

114114
private var count = 0
@@ -137,15 +137,15 @@ func testMainActorQueueOrdering() async {
137137
final class Counter {
138138
nonisolated
139139
func incrementAndAssertCountEquals(_ expectedCount: Int) {
140-
MainActorQueue.shared.enqueue {
140+
Task(enqueuedOn: MainActor.queue) {
141141
self.increment()
142142
let incrementedCount = self.count
143143
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
144144
}
145145
}
146146

147147
func flushQueue() async {
148-
await MainActorQueue.shared.enqueueAndWait { }
148+
await Task(enqueuedOn: MainActor.queue) { }.value
149149
}
150150

151151
func increment() {
@@ -181,7 +181,7 @@ To install swift-async-queue in your project with [Swift Package Manager](https:
181181

182182
```swift
183183
dependencies: [
184-
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.6.0"),
184+
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.7.0"),
185185
]
186186
```
187187

@@ -190,7 +190,7 @@ dependencies: [
190190
To install swift-async-queue in your project with [CocoaPods](http://cocoapods.org), add the following to your `Podfile`:
191191

192192
```
193-
pod 'AsyncQueue', '~> 0.6.0'
193+
pod 'AsyncQueue', '~> 0.7.0'
194194
```
195195

196196
## Contributing

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 227 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@
3535
///
3636
/// nonisolated
3737
/// public func log(_ message: String) {
38-
/// queue.enqueue { myself in
38+
/// Task(enqueuedOn: queue) { myself in
3939
/// myself.logs.append(message)
4040
/// }
4141
/// }
4242
///
4343
/// public func retrieveLogs() async -> [String] {
44-
/// await queue.enqueueAndWait { myself in myself.logs }
44+
/// await Task(enqueuedOn: queue) { myself in myself.logs }.value
4545
/// }
4646
///
4747
/// private let queue = ActorQueue<LogStore>()
@@ -80,6 +80,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
8080
actorTask.task,
8181
in: actorTask.executionContext
8282
)
83+
await actorTask.sempahore.signal()
8384
}
8485
}
8586
}
@@ -101,65 +102,246 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
101102
weakExecutionContext = actor
102103
}
103104

104-
/// Schedules an asynchronous task for execution and immediately returns.
105-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
106-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
107-
public func enqueue(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
108-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task))
109-
}
110-
111-
/// Schedules an asynchronous task and returns after the task is complete.
112-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
113-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
114-
/// - Returns: The value returned from the enqueued task.
115-
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
116-
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
117-
return await withUnsafeContinuation { continuation in
118-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
119-
continuation.resume(returning: await task(executionContext))
120-
})
121-
}
122-
}
123-
124-
/// Schedules an asynchronous throwing task and returns after the task is complete.
125-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
126-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
127-
/// - Returns: The value returned from the enqueued task.
128-
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
129-
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
130-
return try await withUnsafeThrowingContinuation { continuation in
131-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
132-
do {
133-
continuation.resume(returning: try await task(executionContext))
134-
} catch {
135-
continuation.resume(throwing: error)
136-
}
137-
})
138-
}
139-
}
140-
141-
// MARK: Private
105+
// MARK: Fileprivate
142106

143-
private let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
107+
fileprivate let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
144108

145109
/// The actor on whose isolated context our tasks run, force-unwrapped.
146110
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
147-
private var executionContext: ActorType {
111+
fileprivate var executionContext: ActorType {
148112
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
149113
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
150114
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
151115
weakExecutionContext!
152116
}
117+
118+
fileprivate struct ActorTask: Sendable {
119+
init(executionContext: ActorType, task: @escaping @Sendable (isolated ActorType) async -> Void) {
120+
self.executionContext = executionContext
121+
self.task = task
122+
}
123+
124+
let executionContext: ActorType
125+
let sempahore = Semaphore()
126+
let task: @Sendable (isolated ActorType) async -> Void
127+
}
128+
129+
// MARK: Private
130+
153131
/// The actor on whose isolated context our tasks run.
154132
/// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue.
155133
///
156134
/// We will assume this execution context always exists for the lifecycle of the queue because:
157135
/// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`.
158136
/// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method.
159137
private weak var weakExecutionContext: ActorType?
138+
}
160139

161-
private struct ActorTask {
162-
let executionContext: ActorType
163-
let task: @Sendable (isolated ActorType) async -> Void
140+
extension Task {
141+
/// Runs the given nonthrowing operation asynchronously
142+
/// as part of a new top-level task on behalf of the current actor.
143+
/// The operation will not execute until all prior tasks have
144+
/// completed or suspended.
145+
///
146+
/// Use this function when creating asynchronous work
147+
/// that operates on behalf of the synchronous function that calls it.
148+
/// Like `Task.detached(priority:operation:)`,
149+
/// this function creates a separate, top-level task.
150+
/// Unlike `Task.detached(priority:operation:)`,
151+
/// the task created by `Task.init(priority:operation:)`
152+
/// inherits the priority and actor context of the caller,
153+
/// so the operation is treated more like an asynchronous extension
154+
/// to the synchronous operation.
155+
///
156+
/// You need to keep a reference to the task
157+
/// if you want to cancel it by calling the `Task.cancel()` method.
158+
/// Discarding your reference to a detached task
159+
/// doesn't implicitly cancel that task,
160+
/// it only makes it impossible for you to explicitly cancel the task.
161+
///
162+
/// - Parameters:
163+
/// - priority: The priority of the task.
164+
/// Pass `nil` to use the priority from `Task.currentPriority`.
165+
/// - actorQueue: The queue on which to enqueue the task.
166+
/// - operation: The operation to perform.
167+
@discardableResult
168+
public init<ActorType: Actor>(
169+
priority: TaskPriority? = nil,
170+
enqueuedOn actorQueue: ActorQueue<ActorType>,
171+
operation: @Sendable @escaping (isolated ActorType) async -> Success
172+
) where Failure == Never {
173+
let delivery = Delivery<Success, Failure>()
174+
let task = ActorQueue<ActorType>.ActorTask(
175+
executionContext: actorQueue.executionContext,
176+
task: { executionContext in
177+
await delivery.sendValue(operation(executionContext))
178+
}
179+
)
180+
actorQueue.taskStreamContinuation.yield(task)
181+
self.init(priority: priority) {
182+
await task.sempahore.wait()
183+
return await delivery.getValue()
184+
}
185+
}
186+
187+
/// Runs the given throwing operation asynchronously
188+
/// as part of a new top-level task on behalf of the current actor.
189+
/// The operation will not execute until all prior tasks have
190+
/// completed or suspended.
191+
///
192+
/// Use this function when creating asynchronous work
193+
/// that operates on behalf of the synchronous function that calls it.
194+
/// Like `Task.detached(priority:operation:)`,
195+
/// this function creates a separate, top-level task.
196+
/// Unlike `Task.detached(priority:operation:)`,
197+
/// the task created by `Task.init(priority:operation:)`
198+
/// inherits the priority and actor context of the caller,
199+
/// so the operation is treated more like an asynchronous extension
200+
/// to the synchronous operation.
201+
///
202+
/// You need to keep a reference to the task
203+
/// if you want to cancel it by calling the `Task.cancel()` method.
204+
/// Discarding your reference to a detached task
205+
/// doesn't implicitly cancel that task,
206+
/// it only makes it impossible for you to explicitly cancel the task.
207+
///
208+
/// - Parameters:
209+
/// - priority: The priority of the task.
210+
/// Pass `nil` to use the priority from `Task.currentPriority`.
211+
/// - actorQueue: The queue on which to enqueue the task.
212+
/// - operation: The operation to perform.
213+
@discardableResult
214+
public init<ActorType: Actor>(
215+
priority: TaskPriority? = nil,
216+
enqueuedOn actorQueue: ActorQueue<ActorType>,
217+
operation: @escaping @Sendable (isolated ActorType) async throws -> Success
218+
) where Failure == any Error {
219+
let delivery = Delivery<Success, Failure>()
220+
let task = ActorQueue<ActorType>.ActorTask(
221+
executionContext: actorQueue.executionContext,
222+
task: { executionContext in
223+
do {
224+
try await delivery.sendValue(operation(executionContext))
225+
} catch {
226+
await delivery.sendFailure(error)
227+
}
228+
}
229+
)
230+
231+
actorQueue.taskStreamContinuation.yield(task)
232+
self.init(priority: priority) {
233+
await task.sempahore.wait()
234+
return try await delivery.getValue()
235+
}
236+
}
237+
238+
/// Runs the given nonthrowing operation asynchronously
239+
/// as part of a new top-level task on behalf of the current actor.
240+
/// The operation will not execute until all prior tasks have
241+
/// completed or suspended.
242+
///
243+
/// Use this function when creating asynchronous work
244+
/// that operates on behalf of the synchronous function that calls it.
245+
/// Like `Task.detached(priority:operation:)`,
246+
/// this function creates a separate, top-level task.
247+
/// Unlike `Task.detached(priority:operation:)`,
248+
/// the task created by `Task.init(priority:operation:)`
249+
/// inherits the priority and actor context of the caller,
250+
/// so the operation is treated more like an asynchronous extension
251+
/// to the synchronous operation.
252+
///
253+
/// You need to keep a reference to the task
254+
/// if you want to cancel it by calling the `Task.cancel()` method.
255+
/// Discarding your reference to a detached task
256+
/// doesn't implicitly cancel that task,
257+
/// it only makes it impossible for you to explicitly cancel the task.
258+
///
259+
/// - Parameters:
260+
/// - priority: The priority of the task.
261+
/// Pass `nil` to use the priority from `Task.currentPriority`.
262+
/// - actorQueue: The queue on which to enqueue the task.
263+
/// - operation: The operation to perform.
264+
@discardableResult
265+
public init(
266+
priority: TaskPriority? = nil,
267+
enqueuedOn actorQueue: ActorQueue<MainActor>,
268+
operation: @MainActor @escaping () async -> Success
269+
) where Failure == Never {
270+
let delivery = Delivery<Success, Failure>()
271+
let task = ActorQueue<MainActor>.ActorTask(
272+
executionContext: actorQueue.executionContext,
273+
task: { executionContext in
274+
await delivery.sendValue(operation())
275+
}
276+
)
277+
actorQueue.taskStreamContinuation.yield(task)
278+
self.init(priority: priority) {
279+
await task.sempahore.wait()
280+
return await delivery.getValue()
281+
}
282+
}
283+
284+
/// Runs the given throwing operation asynchronously
285+
/// as part of a new top-level task on behalf of the current actor.
286+
/// The operation will not execute until all prior tasks have
287+
/// completed or suspended.
288+
///
289+
/// Use this function when creating asynchronous work
290+
/// that operates on behalf of the synchronous function that calls it.
291+
/// Like `Task.detached(priority:operation:)`,
292+
/// this function creates a separate, top-level task.
293+
/// Unlike `Task.detached(priority:operation:)`,
294+
/// the task created by `Task.init(priority:operation:)`
295+
/// inherits the priority and actor context of the caller,
296+
/// so the operation is treated more like an asynchronous extension
297+
/// to the synchronous operation.
298+
///
299+
/// You need to keep a reference to the task
300+
/// if you want to cancel it by calling the `Task.cancel()` method.
301+
/// Discarding your reference to a detached task
302+
/// doesn't implicitly cancel that task,
303+
/// it only makes it impossible for you to explicitly cancel the task.
304+
///
305+
/// - Parameters:
306+
/// - priority: The priority of the task.
307+
/// Pass `nil` to use the priority from `Task.currentPriority`.
308+
/// - actorQueue: The queue on which to enqueue the task.
309+
/// - operation: The operation to perform.
310+
@discardableResult
311+
public init(
312+
priority: TaskPriority? = nil,
313+
enqueuedOn actorQueue: ActorQueue<MainActor>,
314+
operation: @escaping @MainActor () async throws -> Success
315+
) where Failure == any Error {
316+
let delivery = Delivery<Success, Failure>()
317+
let task = ActorQueue<MainActor>.ActorTask(
318+
executionContext: actorQueue.executionContext,
319+
task: { executionContext in
320+
do {
321+
try await delivery.sendValue(operation())
322+
} catch {
323+
await delivery.sendFailure(error)
324+
}
325+
}
326+
)
327+
328+
actorQueue.taskStreamContinuation.yield(task)
329+
self.init(priority: priority) {
330+
await task.sempahore.wait()
331+
return try await delivery.getValue()
332+
}
164333
}
165334
}
335+
336+
extension MainActor {
337+
/// A global instance of an `ActorQueue<MainActor>`.
338+
public static var queue: ActorQueue<MainActor> {
339+
mainActorQueue
340+
}
341+
}
342+
343+
private let mainActorQueue = {
344+
let queue = ActorQueue<MainActor>()
345+
queue.adoptExecutionContext(of: MainActor.shared)
346+
return queue
347+
}()

0 commit comments

Comments
 (0)