Skip to content

Commit c5297ef

Browse files
committed
Create Task API -> 0.7.0
1 parent 2985121 commit c5297ef

File tree

10 files changed

+970
-584
lines changed

10 files changed

+970
-584
lines changed

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ func testFIFOQueueOrdering() async {
5151
actor Counter {
5252
nonisolated
5353
func incrementAndAssertCountEquals(_ expectedCount: Int) {
54-
queue.enqueue {
54+
Task(enqueuedOn: queue) {
5555
await self.increment()
5656
let incrementedCount = await self.count
5757
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
5858
}
5959
}
6060

6161
func flushQueue() async {
62-
await queue.enqueueAndWait { }
62+
await Task(enqueuedOn: queue) {}.value
6363
}
6464

6565
func increment() {
@@ -101,14 +101,14 @@ func testActorQueueOrdering() async {
101101

102102
nonisolated
103103
func incrementAndAssertCountEquals(_ expectedCount: Int) {
104-
queue.enqueue { myself in
104+
await Task(enqueuedOn: queue) { myself in
105105
myself.count += 1
106106
XCTAssertEqual(expectedCount, myself.count) // always succeeds
107107
}
108108
}
109109

110110
func flushQueue() async {
111-
await queue.enqueueAndWait { _ in }
111+
await Task(enqueuedOn: queue) {}.value
112112
}
113113

114114
private var count = 0
@@ -137,15 +137,15 @@ func testMainActorQueueOrdering() async {
137137
final class Counter {
138138
nonisolated
139139
func incrementAndAssertCountEquals(_ expectedCount: Int) {
140-
MainActorQueue.shared.enqueue {
140+
Task(enqueuedOn: MainActor.queue) {
141141
self.increment()
142142
let incrementedCount = self.count
143143
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
144144
}
145145
}
146146

147147
func flushQueue() async {
148-
await MainActorQueue.shared.enqueueAndWait { }
148+
await Task(enqueuedOn: MainActor.queue) { }.value
149149
}
150150

151151
func increment() {
@@ -181,7 +181,7 @@ To install swift-async-queue in your project with [Swift Package Manager](https:
181181

182182
```swift
183183
dependencies: [
184-
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.6.0"),
184+
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.7.0"),
185185
]
186186
```
187187

@@ -190,7 +190,7 @@ dependencies: [
190190
To install swift-async-queue in your project with [CocoaPods](http://cocoapods.org), add the following to your `Podfile`:
191191

192192
```
193-
pod 'AsyncQueue', '~> 0.6.0'
193+
pod 'AsyncQueue', '~> 0.7.0'
194194
```
195195

196196
## Contributing

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 221 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
6161
self.taskStreamContinuation = taskStreamContinuation
6262

6363
func beginExecuting(
64-
_ operation: sending @escaping (isolated ActorType) async -> Void,
64+
_ operation: sending @escaping (isolated ActorType) async -> Sendable,
6565
in context: isolated ActorType
6666
) {
6767
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
@@ -80,6 +80,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
8080
actorTask.task,
8181
in: actorTask.executionContext
8282
)
83+
await actorTask.sempahore.signal()
8384
}
8485
}
8586
}
@@ -101,65 +102,241 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
101102
weakExecutionContext = actor
102103
}
103104

104-
/// Schedules an asynchronous task for execution and immediately returns.
105-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
106-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
107-
public func enqueue(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
108-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task))
109-
}
105+
// MARK: Fileprivate
110106

111-
/// Schedules an asynchronous task and returns after the task is complete.
112-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
113-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
114-
/// - Returns: The value returned from the enqueued task.
115-
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
116-
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
117-
return await withUnsafeContinuation { continuation in
118-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
119-
continuation.resume(returning: await task(executionContext))
120-
})
121-
}
122-
}
123-
124-
/// Schedules an asynchronous throwing task and returns after the task is complete.
125-
/// The scheduled task will not execute until all prior tasks have completed or suspended.
126-
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
127-
/// - Returns: The value returned from the enqueued task.
128-
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
129-
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
130-
return try await withUnsafeThrowingContinuation { continuation in
131-
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
132-
do {
133-
continuation.resume(returning: try await task(executionContext))
134-
} catch {
135-
continuation.resume(throwing: error)
136-
}
137-
})
138-
}
139-
}
140-
141-
// MARK: Private
142-
143-
private let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
107+
fileprivate let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
144108

145109
/// The actor on whose isolated context our tasks run, force-unwrapped.
146110
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
147-
private var executionContext: ActorType {
111+
fileprivate var executionContext: ActorType {
148112
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
149113
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
150114
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
151115
weakExecutionContext!
152116
}
117+
118+
fileprivate struct ActorTask: Sendable {
119+
let executionContext: ActorType
120+
let sempahore = Semaphore()
121+
let task: @Sendable (isolated ActorType) async -> Void
122+
}
123+
124+
// MARK: Private
125+
153126
/// The actor on whose isolated context our tasks run.
154127
/// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue.
155128
///
156129
/// We will assume this execution context always exists for the lifecycle of the queue because:
157130
/// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`.
158131
/// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method.
159132
private weak var weakExecutionContext: ActorType?
133+
}
160134

161-
private struct ActorTask {
162-
let executionContext: ActorType
163-
let task: @Sendable (isolated ActorType) async -> Void
135+
extension Task {
136+
/// Runs the given nonthrowing operation asynchronously
137+
/// as part of a new top-level task on behalf of the current actor.
138+
/// The operation will not execute until all prior tasks have
139+
/// completed or suspended.
140+
///
141+
/// Use this function when creating asynchronous work
142+
/// that operates on behalf of the synchronous function that calls it.
143+
/// Like `Task.detached(priority:operation:)`,
144+
/// this function creates a separate, top-level task.
145+
/// Unlike `Task.detached(priority:operation:)`,
146+
/// the task created by `Task.init(priority:operation:)`
147+
/// inherits the priority and actor context of the caller,
148+
/// so the operation is treated more like an asynchronous extension
149+
/// to the synchronous operation.
150+
///
151+
/// You need to keep a reference to the task
152+
/// if you want to cancel it by calling the `Task.cancel()` method.
153+
/// Discarding your reference to a detached task
154+
/// doesn't implicitly cancel that task,
155+
/// it only makes it impossible for you to explicitly cancel the task.
156+
///
157+
/// - Parameters:
158+
/// - priority: The priority of the task.
159+
/// Pass `nil` to use the priority from `Task.currentPriority`.
160+
/// - actorQueue: The queue on which to enqueue the task.
161+
/// - operation: The operation to perform.
162+
@discardableResult
163+
public init<ActorType: Actor>(
164+
priority: TaskPriority? = nil,
165+
enqueuedOn actorQueue: ActorQueue<ActorType>,
166+
operation: @Sendable @escaping (isolated ActorType) async -> Success
167+
) where Failure == Never {
168+
let delivery = Delivery<Success, Failure>()
169+
let task = ActorQueue<ActorType>.ActorTask(
170+
executionContext: actorQueue.executionContext,
171+
task: { executionContext in
172+
await delivery.sendValue(operation(executionContext))
173+
}
174+
)
175+
actorQueue.taskStreamContinuation.yield(task)
176+
self.init(priority: priority) {
177+
await task.sempahore.wait()
178+
return await delivery.getValue()
179+
}
180+
}
181+
182+
/// Runs the given throwing operation asynchronously
183+
/// as part of a new top-level task on behalf of the current actor.
184+
/// The operation will not execute until all prior tasks have
185+
/// completed or suspended.
186+
///
187+
/// Use this function when creating asynchronous work
188+
/// that operates on behalf of the synchronous function that calls it.
189+
/// Like `Task.detached(priority:operation:)`,
190+
/// this function creates a separate, top-level task.
191+
/// Unlike `Task.detached(priority:operation:)`,
192+
/// the task created by `Task.init(priority:operation:)`
193+
/// inherits the priority and actor context of the caller,
194+
/// so the operation is treated more like an asynchronous extension
195+
/// to the synchronous operation.
196+
///
197+
/// You need to keep a reference to the task
198+
/// if you want to cancel it by calling the `Task.cancel()` method.
199+
/// Discarding your reference to a detached task
200+
/// doesn't implicitly cancel that task,
201+
/// it only makes it impossible for you to explicitly cancel the task.
202+
///
203+
/// - Parameters:
204+
/// - priority: The priority of the task.
205+
/// Pass `nil` to use the priority from `Task.currentPriority`.
206+
/// - actorQueue: The queue on which to enqueue the task.
207+
/// - operation: The operation to perform.
208+
@discardableResult
209+
public init<ActorType: Actor>(
210+
priority: TaskPriority? = nil,
211+
enqueuedOn actorQueue: ActorQueue<ActorType>,
212+
operation: @escaping @Sendable (isolated ActorType) async throws -> Success
213+
) where Failure == any Error {
214+
let delivery = Delivery<Success, Failure>()
215+
let task = ActorQueue<ActorType>.ActorTask(
216+
executionContext: actorQueue.executionContext,
217+
task: { executionContext in
218+
do {
219+
try await delivery.sendValue(operation(executionContext))
220+
} catch {
221+
await delivery.sendFailure(error)
222+
}
223+
}
224+
)
225+
226+
actorQueue.taskStreamContinuation.yield(task)
227+
self.init(priority: priority) {
228+
await task.sempahore.wait()
229+
return try await delivery.getValue()
230+
}
231+
}
232+
233+
/// Runs the given nonthrowing operation asynchronously
234+
/// as part of a new top-level task on behalf of the current actor.
235+
/// The operation will not execute until all prior tasks have
236+
/// completed or suspended.
237+
///
238+
/// Use this function when creating asynchronous work
239+
/// that operates on behalf of the synchronous function that calls it.
240+
/// Like `Task.detached(priority:operation:)`,
241+
/// this function creates a separate, top-level task.
242+
/// Unlike `Task.detached(priority:operation:)`,
243+
/// the task created by `Task.init(priority:operation:)`
244+
/// inherits the priority and actor context of the caller,
245+
/// so the operation is treated more like an asynchronous extension
246+
/// to the synchronous operation.
247+
///
248+
/// You need to keep a reference to the task
249+
/// if you want to cancel it by calling the `Task.cancel()` method.
250+
/// Discarding your reference to a detached task
251+
/// doesn't implicitly cancel that task,
252+
/// it only makes it impossible for you to explicitly cancel the task.
253+
///
254+
/// - Parameters:
255+
/// - priority: The priority of the task.
256+
/// Pass `nil` to use the priority from `Task.currentPriority`.
257+
/// - actorQueue: The queue on which to enqueue the task.
258+
/// - operation: The operation to perform.
259+
@discardableResult
260+
public init(
261+
priority: TaskPriority? = nil,
262+
enqueuedOn actorQueue: ActorQueue<MainActor>,
263+
operation: @MainActor @escaping () async -> Success
264+
) where Failure == Never {
265+
let delivery = Delivery<Success, Failure>()
266+
let task = ActorQueue<MainActor>.ActorTask(
267+
executionContext: actorQueue.executionContext,
268+
task: { executionContext in
269+
await delivery.sendValue(operation())
270+
}
271+
)
272+
actorQueue.taskStreamContinuation.yield(task)
273+
self.init(priority: priority) {
274+
await task.sempahore.wait()
275+
return await delivery.getValue()
276+
}
277+
}
278+
279+
/// Runs the given throwing operation asynchronously
280+
/// as part of a new top-level task on behalf of the current actor.
281+
/// The operation will not execute until all prior tasks have
282+
/// completed or suspended.
283+
///
284+
/// Use this function when creating asynchronous work
285+
/// that operates on behalf of the synchronous function that calls it.
286+
/// Like `Task.detached(priority:operation:)`,
287+
/// this function creates a separate, top-level task.
288+
/// Unlike `Task.detached(priority:operation:)`,
289+
/// the task created by `Task.init(priority:operation:)`
290+
/// inherits the priority and actor context of the caller,
291+
/// so the operation is treated more like an asynchronous extension
292+
/// to the synchronous operation.
293+
///
294+
/// You need to keep a reference to the task
295+
/// if you want to cancel it by calling the `Task.cancel()` method.
296+
/// Discarding your reference to a detached task
297+
/// doesn't implicitly cancel that task,
298+
/// it only makes it impossible for you to explicitly cancel the task.
299+
///
300+
/// - Parameters:
301+
/// - priority: The priority of the task.
302+
/// Pass `nil` to use the priority from `Task.currentPriority`.
303+
/// - actorQueue: The queue on which to enqueue the task.
304+
/// - operation: The operation to perform.
305+
@discardableResult
306+
public init(
307+
priority: TaskPriority? = nil,
308+
enqueuedOn actorQueue: ActorQueue<MainActor>,
309+
operation: @escaping @MainActor () async throws -> Success
310+
) where Failure == any Error {
311+
let delivery = Delivery<Success, Failure>()
312+
let task = ActorQueue<MainActor>.ActorTask(
313+
executionContext: actorQueue.executionContext,
314+
task: { executionContext in
315+
do {
316+
try await delivery.sendValue(operation())
317+
} catch {
318+
await delivery.sendFailure(error)
319+
}
320+
}
321+
)
322+
323+
actorQueue.taskStreamContinuation.yield(task)
324+
self.init(priority: priority) {
325+
await task.sempahore.wait()
326+
return try await delivery.getValue()
327+
}
164328
}
165329
}
330+
331+
extension MainActor {
332+
/// A global instance of an `ActorQueue<MainActor>`.
333+
public static var queue: ActorQueue<MainActor> {
334+
mainActorQueue
335+
}
336+
}
337+
338+
private let mainActorQueue = {
339+
let queue = ActorQueue<MainActor>()
340+
queue.adoptExecutionContext(of: MainActor.shared)
341+
return queue
342+
}()

0 commit comments

Comments
 (0)