@@ -61,7 +61,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
61
61
self . taskStreamContinuation = taskStreamContinuation
62
62
63
63
func beginExecuting(
64
- _ operation: sending @escaping ( isolated ActorType) async -> Void ,
64
+ _ operation: sending @escaping ( isolated ActorType) async -> Sendable ,
65
65
in context: isolated ActorType
66
66
) {
67
67
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
@@ -80,6 +80,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
80
80
actorTask. task,
81
81
in: actorTask. executionContext
82
82
)
83
+ await actorTask. sempahore. signal ( )
83
84
}
84
85
}
85
86
}
@@ -101,65 +102,243 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
101
102
weakExecutionContext = actor
102
103
}
103
104
104
- /// Schedules an asynchronous task for execution and immediately returns.
105
- /// The scheduled task will not execute until all prior tasks have completed or suspended.
106
- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
107
- public func enqueue( _ task: @escaping @Sendable ( isolated ActorType) async -> Void ) {
108
- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext, task: task) )
109
- }
105
+ // MARK: Fileprivate
110
106
111
- /// Schedules an asynchronous task and returns after the task is complete.
112
- /// The scheduled task will not execute until all prior tasks have completed or suspended.
113
- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
114
- /// - Returns: The value returned from the enqueued task.
115
- public func enqueueAndWait< T: Sendable > ( _ task: @escaping @Sendable ( isolated ActorType) async -> T ) async -> T {
116
- let executionContext = self . executionContext // Capture/retain the executionContext before suspending.
117
- return await withUnsafeContinuation { continuation in
118
- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext) { executionContext in
119
- continuation. resume ( returning: await task ( executionContext) )
120
- } )
121
- }
122
- }
123
-
124
- /// Schedules an asynchronous throwing task and returns after the task is complete.
125
- /// The scheduled task will not execute until all prior tasks have completed or suspended.
126
- /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
127
- /// - Returns: The value returned from the enqueued task.
128
- public func enqueueAndWait< T: Sendable > ( _ task: @escaping @Sendable ( isolated ActorType) async throws -> T ) async throws -> T {
129
- let executionContext = self . executionContext // Capture/retain the executionContext before suspending.
130
- return try await withUnsafeThrowingContinuation { continuation in
131
- taskStreamContinuation. yield ( ActorTask ( executionContext: executionContext) { executionContext in
132
- do {
133
- continuation. resume ( returning: try await task ( executionContext) )
134
- } catch {
135
- continuation. resume ( throwing: error)
136
- }
137
- } )
138
- }
139
- }
140
-
141
- // MARK: Private
142
-
143
- private let taskStreamContinuation : AsyncStream < ActorTask > . Continuation
107
+ fileprivate let taskStreamContinuation : AsyncStream < ActorTask > . Continuation
144
108
145
109
/// The actor on whose isolated context our tasks run, force-unwrapped.
146
110
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
147
- private var executionContext : ActorType {
111
+ fileprivate var executionContext : ActorType {
148
112
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
149
113
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
150
114
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
151
115
weakExecutionContext!
152
116
}
117
+
118
+ fileprivate struct ActorTask : Sendable {
119
+ init (
120
+ executionContext: ActorType ,
121
+ task: @escaping @Sendable ( isolated ActorType) async -> Void
122
+ ) {
123
+ self . executionContext = executionContext
124
+ self . task = task
125
+ }
126
+
127
+ let executionContext : ActorType
128
+ let sempahore = Semaphore ( )
129
+ let task : @Sendable ( isolated ActorType) async -> Void
130
+ }
131
+
132
+ // MARK: Private
133
+
153
134
/// The actor on whose isolated context our tasks run.
154
135
/// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue.
155
136
///
156
137
/// We will assume this execution context always exists for the lifecycle of the queue because:
157
138
/// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`.
158
139
/// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method.
159
140
private weak var weakExecutionContext : ActorType ?
141
+ }
160
142
161
- private struct ActorTask {
162
- let executionContext : ActorType
163
- let task : @Sendable ( isolated ActorType) async -> Void
143
+ extension Task {
144
+ /// Runs the given nonthrowing operation asynchronously
145
+ /// as part of a new top-level task on behalf of the current actor.
146
+ /// The operation will not execute until all prior tasks have
147
+ /// completed or suspended.
148
+ ///
149
+ /// Use this function when creating asynchronous work
150
+ /// that operates on behalf of the synchronous function that calls it.
151
+ /// Like `Task.detached(priority:operation:)`,
152
+ /// this function creates a separate, top-level task.
153
+ /// Unlike `Task.detached(priority:operation:)`,
154
+ /// the task created by `Task.init(priority:operation:)`
155
+ /// inherits the priority and actor context of the caller,
156
+ /// so the operation is treated more like an asynchronous extension
157
+ /// to the synchronous operation.
158
+ ///
159
+ /// You need to keep a reference to the task
160
+ /// if you want to cancel it by calling the `Task.cancel()` method.
161
+ /// Discarding your reference to a detached task
162
+ /// doesn't implicitly cancel that task,
163
+ /// it only makes it impossible for you to explicitly cancel the task.
164
+ ///
165
+ /// - Parameters:
166
+ /// - priority: The priority of the task.
167
+ /// Pass `nil` to use the priority from `Task.currentPriority`.
168
+ /// - actorQueue: The queue on which to enqueue the task.
169
+ /// - operation: The operation to perform.
170
+ @discardableResult
171
+ public init < ActorType: Actor > (
172
+ priority: TaskPriority ? = nil ,
173
+ enqueuedOn actorQueue: ActorQueue < ActorType > ,
174
+ operation: @Sendable @escaping ( isolated ActorType) async -> Success
175
+ ) where Failure == Never {
176
+ let delivery = Delivery < Success , Failure > ( )
177
+ let task = ActorQueue< ActorType> . ActorTask(
178
+ executionContext: actorQueue. executionContext,
179
+ task: { executionContext in
180
+ await delivery. sendValue ( operation ( executionContext) )
181
+ }
182
+ )
183
+ actorQueue. taskStreamContinuation. yield ( task)
184
+ self . init ( priority: priority) {
185
+ await task. sempahore. wait ( )
186
+ return await delivery. getValue ( )
187
+ }
188
+ }
189
+
190
+ /// Runs the given throwing operation asynchronously
191
+ /// as part of a new top-level task on behalf of the current actor.
192
+ /// The operation will not execute until all prior tasks have
193
+ /// completed or suspended.
194
+ ///
195
+ /// Use this function when creating asynchronous work
196
+ /// that operates on behalf of the synchronous function that calls it.
197
+ /// Like `Task.detached(priority:operation:)`,
198
+ /// this function creates a separate, top-level task.
199
+ /// Unlike `Task.detached(priority:operation:)`,
200
+ /// the task created by `Task.init(priority:operation:)`
201
+ /// inherits the priority and actor context of the caller,
202
+ /// so the operation is treated more like an asynchronous extension
203
+ /// to the synchronous operation.
204
+ ///
205
+ /// You need to keep a reference to the task
206
+ /// if you want to cancel it by calling the `Task.cancel()` method.
207
+ /// Discarding your reference to a detached task
208
+ /// doesn't implicitly cancel that task,
209
+ /// it only makes it impossible for you to explicitly cancel the task.
210
+ ///
211
+ /// - Parameters:
212
+ /// - priority: The priority of the task.
213
+ /// Pass `nil` to use the priority from `Task.currentPriority`.
214
+ /// - actorQueue: The queue on which to enqueue the task.
215
+ /// - operation: The operation to perform.
216
+ @discardableResult
217
+ public init < ActorType: Actor > (
218
+ priority: TaskPriority ? = nil ,
219
+ enqueuedOn actorQueue: ActorQueue < ActorType > ,
220
+ operation: @escaping @Sendable ( isolated ActorType) async throws -> Success
221
+ ) where Failure == any Error {
222
+ let delivery = Delivery < Success , Failure > ( )
223
+ let task = ActorQueue< ActorType> . ActorTask(
224
+ executionContext: actorQueue. executionContext,
225
+ task: { executionContext in
226
+ do {
227
+ try await delivery. sendValue ( operation ( executionContext) )
228
+ } catch {
229
+ await delivery. sendFailure ( error)
230
+ }
231
+ }
232
+ )
233
+
234
+ actorQueue. taskStreamContinuation. yield ( task)
235
+ self . init ( priority: priority) {
236
+ await task. sempahore. wait ( )
237
+ return try await delivery. getValue ( )
238
+ }
239
+ }
240
+
241
+ /// Runs the given nonthrowing operation asynchronously
242
+ /// as part of a new top-level task on behalf of the current actor.
243
+ /// The operation will not execute until all prior tasks have
244
+ /// completed or suspended.
245
+ ///
246
+ /// Use this function when creating asynchronous work
247
+ /// that operates on behalf of the synchronous function that calls it.
248
+ /// Like `Task.detached(priority:operation:)`,
249
+ /// this function creates a separate, top-level task.
250
+ /// Unlike `Task.detached(priority:operation:)`,
251
+ /// the task created by `Task.init(priority:operation:)`
252
+ /// inherits the priority and actor context of the caller,
253
+ /// so the operation is treated more like an asynchronous extension
254
+ /// to the synchronous operation.
255
+ ///
256
+ /// You need to keep a reference to the task
257
+ /// if you want to cancel it by calling the `Task.cancel()` method.
258
+ /// Discarding your reference to a detached task
259
+ /// doesn't implicitly cancel that task,
260
+ /// it only makes it impossible for you to explicitly cancel the task.
261
+ ///
262
+ /// - Parameters:
263
+ /// - priority: The priority of the task.
264
+ /// Pass `nil` to use the priority from `Task.currentPriority`.
265
+ /// - actorQueue: The queue on which to enqueue the task.
266
+ /// - operation: The operation to perform.
267
+ @discardableResult
268
+ public init (
269
+ priority: TaskPriority ? = nil ,
270
+ enqueuedOn actorQueue: ActorQueue < MainActor > ,
271
+ operation: @MainActor @escaping ( ) async -> Success
272
+ ) where Failure == Never {
273
+ let delivery = Delivery < Success , Failure > ( )
274
+ let task = ActorQueue< MainActor> . ActorTask(
275
+ executionContext: actorQueue. executionContext,
276
+ task: { executionContext in
277
+ await delivery. sendValue ( operation ( ) )
278
+ }
279
+ )
280
+ actorQueue. taskStreamContinuation. yield ( task)
281
+ self . init ( priority: priority) {
282
+ await task. sempahore. wait ( )
283
+ return await delivery. getValue ( )
284
+ }
285
+ }
286
+
287
+ /// Runs the given throwing operation asynchronously
288
+ /// as part of a new top-level task on behalf of the current actor.
289
+ /// The operation will not execute until all prior tasks have
290
+ /// completed or suspended.
291
+ ///
292
+ /// Use this function when creating asynchronous work
293
+ /// that operates on behalf of the synchronous function that calls it.
294
+ /// Like `Task.detached(priority:operation:)`,
295
+ /// this function creates a separate, top-level task.
296
+ /// Unlike `Task.detached(priority:operation:)`,
297
+ /// the task created by `Task.init(priority:operation:)`
298
+ /// inherits the priority and actor context of the caller,
299
+ /// so the operation is treated more like an asynchronous extension
300
+ /// to the synchronous operation.
301
+ ///
302
+ /// You need to keep a reference to the task
303
+ /// if you want to cancel it by calling the `Task.cancel()` method.
304
+ /// Discarding your reference to a detached task
305
+ /// doesn't implicitly cancel that task,
306
+ /// it only makes it impossible for you to explicitly cancel the task.
307
+ ///
308
+ /// - Parameters:
309
+ /// - priority: The priority of the task.
310
+ /// Pass `nil` to use the priority from `Task.currentPriority`.
311
+ /// - actorQueue: The queue on which to enqueue the task.
312
+ /// - operation: The operation to perform.
313
+ @discardableResult
314
+ public init (
315
+ priority: TaskPriority ? = nil ,
316
+ enqueuedOn actorQueue: ActorQueue < MainActor > ,
317
+ operation: @escaping @MainActor ( ) async throws -> Success
318
+ ) where Failure == any Error {
319
+ let delivery = Delivery < Success , Failure > ( )
320
+ let task = ActorQueue< MainActor> . ActorTask(
321
+ executionContext: actorQueue. executionContext,
322
+ task: { executionContext in
323
+ do {
324
+ try await delivery. sendValue ( operation ( ) )
325
+ } catch {
326
+ await delivery. sendFailure ( error)
327
+ }
328
+ }
329
+ )
330
+
331
+ actorQueue. taskStreamContinuation. yield ( task)
332
+ self . init ( priority: priority) {
333
+ await task. sempahore. wait ( )
334
+ return try await delivery. getValue ( )
335
+ }
164
336
}
165
337
}
338
+
339
+ /// A global instance of an `ActorQueue<MainActor>`.
340
+ public let mainActorQueue = {
341
+ let queue = ActorQueue < MainActor > ( )
342
+ queue. adoptExecutionContext ( of: MainActor . shared)
343
+ return queue
344
+ } ( )
0 commit comments