From 61cc272660bd6936a8bc05d2e40588b318403d20 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Wed, 2 Apr 2025 00:17:56 -0700 Subject: [PATCH 1/8] Create Task API -> 0.7.0 --- AsyncQueue.podspec | 2 +- README.md | 16 +- Sources/AsyncQueue/ActorQueue.swift | 272 ++++++++++--- Sources/AsyncQueue/FIFOQueue.swift | 248 ++++++++---- Sources/AsyncQueue/MainActorQueue.swift | 96 ----- Sources/AsyncQueue/Utilities/Delivery.swift | 77 ++++ .../AsyncQueue}/Utilities/Semaphore.swift | 10 +- Tests/AsyncQueueTests/ActorQueueTests.swift | 267 ++++++++----- Tests/AsyncQueueTests/FIFOQueueTests.swift | 357 +++++++++--------- .../AsyncQueueTests/MainActorQueueTests.swift | 200 ---------- Tests/AsyncQueueTests/SemaphoreTests.swift | 14 +- 11 files changed, 873 insertions(+), 686 deletions(-) delete mode 100644 Sources/AsyncQueue/MainActorQueue.swift create mode 100644 Sources/AsyncQueue/Utilities/Delivery.swift rename {Tests/AsyncQueueTests => Sources/AsyncQueue}/Utilities/Semaphore.swift (94%) delete mode 100644 Tests/AsyncQueueTests/MainActorQueueTests.swift diff --git a/AsyncQueue.podspec b/AsyncQueue.podspec index ede5e56..3ab8b8a 100644 --- a/AsyncQueue.podspec +++ b/AsyncQueue.podspec @@ -1,6 +1,6 @@ Pod::Spec.new do |s| s.name = 'AsyncQueue' - s.version = '0.6.1' + s.version = '0.7.0' s.license = 'MIT' s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.' s.homepage = 'https://github.com/dfed/swift-async-queue' diff --git a/README.md b/README.md index 128b3d3..5d8c5c9 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ func testFIFOQueueOrdering() async { actor Counter { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - queue.enqueue { + Task(enqueuedOn: queue) { await self.increment() let incrementedCount = await self.count XCTAssertEqual(incrementedCount, expectedCount) // always succeeds @@ -59,7 +59,7 @@ func testFIFOQueueOrdering() async { } func flushQueue() async { - await queue.enqueueAndWait { } + await Task(enqueuedOn: queue) {}.value } func increment() { @@ -101,14 +101,14 @@ func testActorQueueOrdering() async { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - queue.enqueue { myself in + await Task(enqueuedOn: queue) { myself in myself.count += 1 XCTAssertEqual(expectedCount, myself.count) // always succeeds } } func flushQueue() async { - await queue.enqueueAndWait { _ in } + await Task(enqueuedOn: queue) {}.value } private var count = 0 @@ -137,7 +137,7 @@ func testMainActorQueueOrdering() async { final class Counter { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - MainActorQueue.shared.enqueue { + Task(enqueuedOn: MainActor.queue) { self.increment() let incrementedCount = self.count XCTAssertEqual(incrementedCount, expectedCount) // always succeeds @@ -145,7 +145,7 @@ func testMainActorQueueOrdering() async { } func flushQueue() async { - await MainActorQueue.shared.enqueueAndWait { } + await Task(enqueuedOn: MainActor.queue) { }.value } func increment() { @@ -181,7 +181,7 @@ To install swift-async-queue in your project with [Swift Package Manager](https: ```swift dependencies: [ - .package(url: "https://github.com/dfed/swift-async-queue", from: "0.6.0"), + .package(url: "https://github.com/dfed/swift-async-queue", from: "0.7.0"), ] ``` @@ -190,7 +190,7 @@ dependencies: [ To install swift-async-queue in your project with [CocoaPods](http://cocoapods.org), add the following to your `Podfile`: ``` -pod 'AsyncQueue', '~> 0.6.0' +pod 'AsyncQueue', '~> 0.7.0' ``` ## Contributing diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index f6bf086..7231b35 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -35,13 +35,13 @@ /// /// nonisolated /// public func log(_ message: String) { -/// queue.enqueue { myself in +/// Task(enqueuedOn: queue) { myself in /// myself.logs.append(message) /// } /// } /// /// public func retrieveLogs() async -> [String] { -/// await queue.enqueueAndWait { myself in myself.logs } +/// await Task(enqueuedOn: queue) { myself in myself.logs }.value /// } /// /// private let queue = ActorQueue() @@ -80,6 +80,7 @@ public final class ActorQueue: @unchecked Sendable { actorTask.task, in: actorTask.executionContext ) + await actorTask.sempahore.signal() } } } @@ -101,55 +102,32 @@ public final class ActorQueue: @unchecked Sendable { weakExecutionContext = actor } - /// Schedules an asynchronous task for execution and immediately returns. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted. - public func enqueue(_ task: @escaping @Sendable (isolated ActorType) async -> Void) { - taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task)) - } - - /// Schedules an asynchronous task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T { - let executionContext = self.executionContext // Capture/retain the executionContext before suspending. - return await withUnsafeContinuation { continuation in - taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in - continuation.resume(returning: await task(executionContext)) - }) - } - } - - /// Schedules an asynchronous throwing task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T { - let executionContext = self.executionContext // Capture/retain the executionContext before suspending. - return try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in - do { - continuation.resume(returning: try await task(executionContext)) - } catch { - continuation.resume(throwing: error) - } - }) - } - } - - // MARK: Private + // MARK: Fileprivate - private let taskStreamContinuation: AsyncStream.Continuation + fileprivate let taskStreamContinuation: AsyncStream.Continuation /// The actor on whose isolated context our tasks run, force-unwrapped. /// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment. - private var executionContext: ActorType { + fileprivate var executionContext: ActorType { // Crashing here means that this queue is being sent tasks either before an execution context has been set, or // after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted // actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor. weakExecutionContext! } + + fileprivate struct ActorTask: Sendable { + init(executionContext: ActorType, task: @escaping @Sendable (isolated ActorType) async -> Void) { + self.executionContext = executionContext + self.task = task + } + + let executionContext: ActorType + let sempahore = Semaphore() + let task: @Sendable (isolated ActorType) async -> Void + } + + // MARK: Private + /// The actor on whose isolated context our tasks run. /// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue. /// @@ -157,9 +135,213 @@ public final class ActorQueue: @unchecked Sendable { /// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`. /// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method. private weak var weakExecutionContext: ActorType? +} - private struct ActorTask { - let executionContext: ActorType - let task: @Sendable (isolated ActorType) async -> Void +extension Task { + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn actorQueue: ActorQueue, + operation: @Sendable @escaping (isolated ActorType) async -> Success + ) where Failure == Never { + let delivery = Delivery() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await delivery.sendValue(operation(executionContext)) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return await delivery.getValue() + } + } + + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn actorQueue: ActorQueue, + operation: @escaping @Sendable (isolated ActorType) async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + do { + try await delivery.sendValue(operation(executionContext)) + } catch { + await delivery.sendFailure(error) + } + } + ) + + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return try await delivery.getValue() + } + } + + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn actorQueue: ActorQueue, + operation: @MainActor @escaping () async -> Success + ) where Failure == Never { + let delivery = Delivery() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await delivery.sendValue(operation()) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return await delivery.getValue() + } + } + + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn actorQueue: ActorQueue, + operation: @escaping @MainActor () async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + do { + try await delivery.sendValue(operation()) + } catch { + await delivery.sendFailure(error) + } + } + ) + + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return try await delivery.getValue() + } } } + +extension MainActor { + /// A global instance of an `ActorQueue`. + public static var queue: ActorQueue { + mainActorQueue + } +} + +private let mainActorQueue = { + let queue = ActorQueue() + queue.adoptExecutionContext(of: MainActor.shared) + return queue +}() diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index cc6f9ba..a6298c8 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -30,12 +30,13 @@ public final class FIFOQueue: Sendable { /// Instantiates a FIFO queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. public init(priority: TaskPriority? = nil) { - let (taskStream, taskStreamContinuation) = AsyncStream<@Sendable () async -> Void>.makeStream() + let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() self.taskStreamContinuation = taskStreamContinuation Task.detached(priority: priority) { - for await task in taskStream { - await task() + for await fifoTask in taskStream { + await fifoTask.task() + await fifoTask.sempahore.signal() } } } @@ -44,85 +45,202 @@ public final class FIFOQueue: Sendable { taskStreamContinuation.finish() } - // MARK: Public + // MARK: Fileprivate - /// Schedules an asynchronous task for execution and immediately returns. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. - /// - Parameter task: The task to enqueue. - public func enqueue(_ task: @escaping @Sendable () async -> Void) { - taskStreamContinuation.yield(task) + fileprivate struct FIFOTask: Sendable { + init(task: @escaping @Sendable () async -> Void) { + self.task = task + } + + let sempahore = Semaphore() + let task: @Sendable () async -> Void } - /// Schedules an asynchronous task for execution and immediately returns. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. - /// - Parameters: - /// - isolatedActor: The actor within which the task is isolated. - /// - task: The task to enqueue. - public func enqueue(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) { - taskStreamContinuation.yield { await task(isolatedActor) } - } + fileprivate let taskStreamContinuation: AsyncStream.Continuation +} - /// Schedules an asynchronous task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable () async -> T) async -> T { - await withUnsafeContinuation { continuation in - taskStreamContinuation.yield { - continuation.resume(returning: await task()) - } +extension Task { + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - fifoQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn fifoQueue: FIFOQueue, + operation: @Sendable @escaping () async -> Success + ) where Failure == Never { + let delivery = Delivery() + let task = FIFOQueue.FIFOTask { + await delivery.sendValue(operation()) + } + fifoQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return await delivery.getValue() } } - /// Schedules an asynchronous task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// /// - Parameters: - /// - isolatedActor: The actor within which the task is isolated. - /// - task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T { - await withUnsafeContinuation { continuation in - taskStreamContinuation.yield { - continuation.resume(returning: await task(isolatedActor)) + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - fifoQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn actorQueue: FIFOQueue, + operation: @escaping @Sendable () async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let task = FIFOQueue.FIFOTask { + do { + try await delivery.sendValue(operation()) + } catch { + await delivery.sendFailure(error) } } + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return try await delivery.getValue() + } } - /// Schedules an asynchronous throwing task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable () async throws -> T) async throws -> T { - try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield { - do { - continuation.resume(returning: try await task()) - } catch { - continuation.resume(throwing: error) - } - } + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - fifoQueue: The queue on which to enqueue the task. + /// - isolatedActor: The actor to which the operation is isolated. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn fifoQueue: FIFOQueue, + isolatedTo isolatedActor: ActorType, + operation: @Sendable @escaping (isolated ActorType) async -> Success + ) where Failure == Never { + let delivery = Delivery() + let task = FIFOQueue.FIFOTask { + await delivery.sendValue(operation(isolatedActor)) + } + fifoQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return await delivery.getValue() } } - /// Schedules an asynchronous throwing task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed. + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// /// - Parameters: - /// - isolatedActor: The actor within which the task is isolated. - /// - task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T { - try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield { - do { - continuation.resume(returning: try await task(isolatedActor)) - } catch { - continuation.resume(throwing: error) - } + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - fifoQueue: The queue on which to enqueue the task. + /// - isolatedActor: The actor to which the operation is isolated. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + enqueuedOn fifoQueue: FIFOQueue, + isolatedTo isolatedActor: ActorType, + operation: @Sendable @escaping (isolated ActorType) async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let task = FIFOQueue.FIFOTask { + do { + try await delivery.sendValue(operation(isolatedActor)) + } catch { + await delivery.sendFailure(error) } } + fifoQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await task.sempahore.wait() + return try await delivery.getValue() + } } - - // MARK: Private - - private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation } diff --git a/Sources/AsyncQueue/MainActorQueue.swift b/Sources/AsyncQueue/MainActorQueue.swift deleted file mode 100644 index b2828f6..0000000 --- a/Sources/AsyncQueue/MainActorQueue.swift +++ /dev/null @@ -1,96 +0,0 @@ -// MIT License -// -// Copyright (c) 2023 Dan Federman -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -/// A queue that enables enqueing ordered asynchronous tasks from a nonisolated context onto the `@MainActor`'s isolated context. -/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing. -/// This queue exhibits the execution behavior of an actor: tasks sent to this queue can re-enter the queue, and tasks may execute in non-FIFO order when a task suspends. -/// -/// A `MainActorQueue` ensures tasks sent from a nonisolated context to the `@MainActor`'s isolated context begin execution in order. -public final class MainActorQueue: Sendable { - - // MARK: Initialization - - /// Instantiates a main actor queue. - init() { - let (taskStream, taskStreamContinuation) = AsyncStream<@Sendable @MainActor () async -> Void>.makeStream() - self.taskStreamContinuation = taskStreamContinuation - - Task { @MainActor in - for await task in taskStream { - // In Swift 6, a `Task` enqueued from a global actor begins executing immediately on that global - // actor. Since we're running on the global main actor already, we can just dispatch a Task to - // get first-enqueued-first-start task execution. - Task { - await task() - } - } - } - } - - deinit { - taskStreamContinuation.finish() - } - - // MARK: Public - - /// The global `MainActorQueue` instance. - public static let shared = MainActorQueue() - - /// Schedules an asynchronous task for execution and immediately returns. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - public func enqueue(_ task: @escaping @Sendable @MainActor () async -> Void) { - taskStreamContinuation.yield(task) - } - - /// Schedules an asynchronous task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable @MainActor () async -> T) async -> T { - await withUnsafeContinuation { continuation in - taskStreamContinuation.yield { - continuation.resume(returning: await task()) - } - } - } - - /// Schedules an asynchronous throwing task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func enqueueAndWait(_ task: @escaping @Sendable @MainActor () async throws -> T) async throws -> T { - try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield { - do { - continuation.resume(returning: try await task()) - } catch { - continuation.resume(throwing: error) - } - } - } - } - - // MARK: Private - - private let taskStreamContinuation: AsyncStream<@Sendable @MainActor () async -> Void>.Continuation -} diff --git a/Sources/AsyncQueue/Utilities/Delivery.swift b/Sources/AsyncQueue/Utilities/Delivery.swift new file mode 100644 index 0000000..f6e2deb --- /dev/null +++ b/Sources/AsyncQueue/Utilities/Delivery.swift @@ -0,0 +1,77 @@ +// MIT License +// +// Copyright (c) 2025 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +actor Delivery { + func sendValue(_ value: Success) { + self.value = value + } + + func sendFailure(_ failure: Failure) { + self.failure = failure + } + + private var value: Success? { + didSet { + if let value { + valueContinuations.forEach { $0.resume(returning: value) } + valueContinuations.removeAll() + } + } + } + + private var failure: Failure? { + didSet { + if let failure { + valueContinuations.forEach { $0.resume(throwing: failure) } + valueContinuations.removeAll() + } + } + } + + private var valueContinuations: [UnsafeContinuation] = [] +} + +extension Delivery where Failure == Never { + func getValue() async -> Success { + if let value { + value + } else { + await withUnsafeContinuation { continuation in + valueContinuations.append(continuation) + } + } + } +} + +extension Delivery where Failure == any Error { + func getValue() async throws -> Success { + if let value { + value + } else if let failure { + throw failure + } else { + try await withUnsafeThrowingContinuation { continuation in + valueContinuations.append(continuation) + } + } + } +} diff --git a/Tests/AsyncQueueTests/Utilities/Semaphore.swift b/Sources/AsyncQueue/Utilities/Semaphore.swift similarity index 94% rename from Tests/AsyncQueueTests/Utilities/Semaphore.swift rename to Sources/AsyncQueue/Utilities/Semaphore.swift index bc99515..7c16b77 100644 --- a/Tests/AsyncQueueTests/Utilities/Semaphore.swift +++ b/Sources/AsyncQueue/Utilities/Semaphore.swift @@ -21,17 +21,17 @@ // SOFTWARE. /// A thread-safe semaphore implementation. -public actor Semaphore { +actor Semaphore { // MARK: Initialization - public init() {} + init() {} // MARK: Public /// Decrement the counting semaphore. If the resulting value is less than zero, this function waits for a signal to occur before returning. /// - Returns: Whether the call triggered a suspension @discardableResult - public func wait() async -> Bool { + func wait() async -> Bool { count -= 1 guard count < 0 else { // We don't need to wait because count is greater than or equal to zero. @@ -45,7 +45,7 @@ public actor Semaphore { } /// Increment the counting semaphore. If the previous value was less than zero, this function resumes a waiting thread before returning. - public func signal() { + func signal() { count += 1 guard !isWaiting else { // Continue waiting. @@ -59,7 +59,7 @@ public actor Semaphore { continuations.removeAll() } - public var isWaiting: Bool { + var isWaiting: Bool { count < 0 } diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index e8c702d..6e965f8 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -46,14 +46,14 @@ struct ActorQueueTests { #expect(weakCounter == nil) } - @Test func test_enqueue_retainsAdoptedActorUntilEnqueuedTasksComplete() async { + @Test func test_task_retainsAdoptedActorUntilEnqueuedTasksComplete() async { let systemUnderTest = ActorQueue() var counter: Counter? = Counter() weak var weakCounter = counter systemUnderTest.adoptExecutionContext(of: counter!) let semaphore = Semaphore() - systemUnderTest.enqueue { counter in + Task(enqueuedOn: systemUnderTest) { counter in await semaphore.wait() } @@ -62,9 +62,26 @@ struct ActorQueueTests { await semaphore.signal() } - @Test func test_enqueue_taskParameterIsAdoptedActor() async { + @Test func test_throwingTask_retainsAdoptedActorUntilEnqueuedTasksComplete() async { + let systemUnderTest = ActorQueue() + var counter: Counter? = Counter() + weak var weakCounter = counter + systemUnderTest.adoptExecutionContext(of: counter!) + + let semaphore = Semaphore() + Task(enqueuedOn: systemUnderTest) { counter in + await semaphore.wait() + try doWork() + } + + counter = nil + #expect(weakCounter != nil) + await semaphore.signal() + } + + @Test func test_task_taskParameterIsAdoptedActor() async { let semaphore = Semaphore() - systemUnderTest.enqueue { [storedCounter = counter] counter in + Task(enqueuedOn: systemUnderTest) { [storedCounter = counter] counter in #expect(counter === storedCounter) await semaphore.signal() } @@ -72,54 +89,148 @@ struct ActorQueueTests { await semaphore.wait() } - @Test func test_enqueueAndWait_taskParameterIsAdoptedActor() async { - await systemUnderTest.enqueueAndWait { [storedCounter = counter] counter in + @Test func test_throwingTask_taskParameterIsAdoptedActor() async { + let semaphore = Semaphore() + Task(enqueuedOn: systemUnderTest) { [storedCounter = counter] counter in #expect(counter === storedCounter) + await semaphore.signal() + try doWork() + } + + await semaphore.wait() + } + + @Test func test_task_sendsEventsInOrder() async throws { + let orderedTasks = (1...1_000).map { iteration in + Task(enqueuedOn: systemUnderTest) { counter in + counter.incrementAndExpectCount(equals: iteration) + } } + // Drain the queue + try #require(await orderedTasks.reversed().last?.value) } - @Test func test_enqueue_sendsEventsInOrder() async { - for iteration in 1...1_000 { - systemUnderTest.enqueue { counter in + @Test func test_throwingTask_sendsEventsInOrder() async throws { + let orderedTasks = (1...1_000).map { iteration in + Task(enqueuedOn: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: iteration) + try doWork() } } - await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ } + // Drain the queue + try #require(await orderedTasks.reversed().last?.value) } - @Test func test_enqueue_startsExecutionOfNextTaskAfterSuspension() async { - let systemUnderTest = ActorQueue() - let semaphore = Semaphore() + @TestingQueue + @Test func test_mainTask_sendsEventsInOrder() async throws { + let orderedTasks = (1...1_000).map { iteration in + Task(enqueuedOn: MainActor.queue) { + await counter.incrementAndExpectCount(equals: iteration) + } + } + // Drain the queue + try #require(await orderedTasks.reversed().last?.value) + } + + @TestingQueue + @Test func test_mainThrowingTask_sendsEventsInOrder() async throws { + let orderedTasks = (1...1_000).map { iteration in + Task(enqueuedOn: MainActor.queue) { + await counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + // Drain the queue + try #require(await orderedTasks.reversed().last?.value) + } + + @Test func test_task_startsExecutionOfNextTaskAfterSuspension() async { + let systemUnderTest = ActorQueue() + let semaphore = AsyncQueue.Semaphore() + systemUnderTest.adoptExecutionContext(of: semaphore) + + Task(enqueuedOn: systemUnderTest) { semaphore in + await semaphore.wait() + } + Task(enqueuedOn: systemUnderTest) { semaphore in + // Signal the semaphore from the actor queue. + // If the actor queue were FIFO, this test would hang since this code would never execute: + // we'd still be waiting for the prior `wait()` tasks to finish. + semaphore.signal() + } + await Task(enqueuedOn: systemUnderTest) { _ in /* Drain the queue */ }.value + } + + @Test func test_throwingTask_startsExecutionOfNextTaskAfterSuspension() async { + let systemUnderTest = ActorQueue() + let semaphore = AsyncQueue.Semaphore() systemUnderTest.adoptExecutionContext(of: semaphore) - systemUnderTest.enqueue { semaphore in + Task(enqueuedOn: systemUnderTest) { semaphore in await semaphore.wait() + try doWork() } - systemUnderTest.enqueue { semaphore in + Task(enqueuedOn: systemUnderTest) { semaphore in // Signal the semaphore from the actor queue. // If the actor queue were FIFO, this test would hang since this code would never execute: // we'd still be waiting for the prior `wait()` tasks to finish. semaphore.signal() + try doWork() } - await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { _ in /* Drain the queue */ }.value } - @Test func test_enqueueAndWait_allowsReentrancy() async { - await systemUnderTest.enqueueAndWait { [systemUnderTest] counter in - await systemUnderTest.enqueueAndWait { counter in + @Test func test_task_allowsReentrancy() async { + await Task(enqueuedOn: systemUnderTest) { [systemUnderTest] counter in + await Task(enqueuedOn: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: 1) - } + }.value counter.incrementAndExpectCount(equals: 2) - } + }.value } - @Test func test_enqueue_executesEnqueuedTasksAfterReceiverIsDeallocated() async { + @Test func test_throwingTask_allowsReentrancy() async throws { + try await Task(enqueuedOn: systemUnderTest) { [systemUnderTest] counter in + try doWork() + try await Task(enqueuedOn: systemUnderTest) { counter in + try doWork() + counter.incrementAndExpectCount(equals: 1) + }.value + try doWork() + counter.incrementAndExpectCount(equals: 2) + }.value + } + + @TestingQueue + @Test func test_mainTask_allowsReentrancy() async { + await Task(enqueuedOn: MainActor.queue) { [counter] in + await Task(enqueuedOn: MainActor.queue) { + await counter.incrementAndExpectCount(equals: 1) + }.value + await counter.incrementAndExpectCount(equals: 2) + }.value + } + + @TestingQueue + @Test func test_mainThrowingTask_allowsReentrancy() async throws { + try await Task(enqueuedOn: MainActor.queue) { [counter] in + try doWork() + try await Task(enqueuedOn: MainActor.queue) { + try doWork() + await counter.incrementAndExpectCount(equals: 1) + }.value + try doWork() + await counter.incrementAndExpectCount(equals: 2) + }.value + } + + @Test func test_task_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { var systemUnderTest: ActorQueue? = ActorQueue() systemUnderTest?.adoptExecutionContext(of: counter) let expectation = Expectation() - let semaphore = Semaphore() - systemUnderTest?.enqueue { counter in + let semaphore = AsyncQueue.Semaphore() + Task(enqueuedOn: try #require(systemUnderTest)) { counter in // Make the task wait. await semaphore.wait() counter.incrementAndExpectCount(equals: 1) @@ -134,83 +245,62 @@ struct ActorQueueTests { await expectation.fulfillment(withinSeconds: 30) } - @Test func test_enqueue_doesNotRetainTaskAfterExecution() async { - final class Reference: Sendable {} - final class ReferenceHolder: @unchecked Sendable { - init() { - reference = Reference() - weakReference = reference - } - private(set) var reference: Reference? - private(set) weak var weakReference: Reference? - - func clearReference() { - reference = nil - } - } - let referenceHolder = ReferenceHolder() - let asyncSemaphore = Semaphore() - let syncSemaphore = Semaphore() - let systemUnderTest = ActorQueue() - systemUnderTest.adoptExecutionContext(of: syncSemaphore) + @Test func test_throwingTask_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { + var systemUnderTest: ActorQueue? = ActorQueue() + systemUnderTest?.adoptExecutionContext(of: counter) let expectation = Expectation() - systemUnderTest.enqueue { [reference = referenceHolder.reference] syncSemaphore in - // Now that we've started the task and captured the reference, release the synchronous code. - syncSemaphore.signal() - // Wait for the synchronous setup to complete and the reference to be nil'd out. - await asyncSemaphore.wait() - // Retain the unsafe counter until the task is completed. - _ = reference - systemUnderTest.enqueue { _ in - // Signal that this task has cleaned up. - // This closure will not execute until the prior closure completes. - expectation.fulfill() - } + let semaphore = AsyncQueue.Semaphore() + Task(enqueuedOn: try #require(systemUnderTest)) { counter in + try doWork() + + // Make the task wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + expectation.fulfill() } - // Wait for the asynchronous task to start. - await syncSemaphore.wait() - referenceHolder.clearReference() - #expect(referenceHolder.weakReference != nil) - // Allow the enqueued task to complete. - await asyncSemaphore.signal() - // Make sure the task has completed. + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the enqueued tasks. + await semaphore.signal() await expectation.fulfillment(withinSeconds: 30) - - #expect(referenceHolder.weakReference == nil) } - @Test func test_enqueueAndWait_sendsEventsInOrder() async { - for iteration in 1...1_000 { - systemUnderTest.enqueue { counter in - counter.incrementAndExpectCount(equals: iteration) - } - - guard iteration % 25 == 0 else { - // Keep sending async events to the queue. - continue - } + @Test func test_task_canReturn() async { + let expectedValue = UUID() + let returnedValue = await Task(enqueuedOn: systemUnderTest) { _ in expectedValue }.value + #expect(expectedValue == returnedValue) + } - await systemUnderTest.enqueueAndWait { counter in - #expect(counter.count == iteration) - } + @Test func test_throwingTask_canReturn() async throws { + let expectedValue = UUID() + @Sendable func generateValue() throws -> UUID { + expectedValue } - await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ } + #expect(try await Task(enqueuedOn: systemUnderTest) { _ in try generateValue() }.value == expectedValue) } - @Test func test_enqueueAndWait_canReturn() async { - let expectedValue = UUID() - let returnedValue = await systemUnderTest.enqueueAndWait { _ in expectedValue } - #expect(expectedValue == returnedValue) + @Test func test_throwingTask_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await Task(enqueuedOn: systemUnderTest) { _ in throw expectedError }.value + } catch { + #expect(error as? TestError == expectedError) + } } - @Test func test_enqueueAndWait_canThrow() async { + @Test func test_mainThrowingTask_canThrow() async { struct TestError: Error, Equatable { private let identifier = UUID() } let expectedError = TestError() do { - try await systemUnderTest.enqueueAndWait { _ in throw expectedError } + try await Task(enqueuedOn: MainActor.queue) { throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } @@ -220,4 +310,13 @@ struct ActorQueueTests { private let systemUnderTest: ActorQueue private let counter: Counter + + @Sendable private func doWork() throws -> Void {} +} + +/// A global actor that runs off of `main`, where tests may otherwise deadlock due to waiting for `main` from `main`. +@globalActor +private struct TestingQueue { + fileprivate actor Shared {} + fileprivate static let shared = Shared() } diff --git a/Tests/AsyncQueueTests/FIFOQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift index 3fd8776..f78a5f5 100644 --- a/Tests/AsyncQueueTests/FIFOQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -29,49 +29,82 @@ struct FIFOQueueTests { // MARK: Behavior Tests - @Test func test_enqueue_sendsEventsInOrder() async { + @Test func test_task_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - systemUnderTest.enqueue { + Task(enqueuedOn: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueOn_sendsEventsInOrder() async { + @Test func test_taskIsolatedTo_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - systemUnderTest.enqueue(on: counter) { counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueue_enqueueOn_sendEventsInOrder() async { + @Test func test_throwingTask_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - if iteration % 2 == 0 { - systemUnderTest.enqueue { + Task(enqueuedOn: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test func test_throwingTaskIsolatedTo_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test func test_task_interleavedWithTaskIsolatedTo_andThrowing_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + let mod = iteration % 4 + if mod == 0 { + Task(enqueuedOn: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + } + } else if mod == 1 { + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + } + } else if mod == 2 { + Task(enqueuedOn: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) + try doWork() } } else { - systemUnderTest.enqueue(on: counter) { counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) + try doWork() } } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueue_executesAsyncBlocksAtomically() async { + @Test func test_task_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - systemUnderTest.enqueue { + Task(enqueuedOn: systemUnderTest) { let isWaiting = await semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by replacing `systemUnderTest.enqueue` above with `Task`. + // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -81,16 +114,16 @@ struct FIFOQueueTests { await semaphore.wait() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueOn_executesAsyncBlocksAtomically() async { + @Test func test_taskIsolatedTo_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - systemUnderTest.enqueue(on: semaphore) { semaphore in + Task(enqueuedOn: systemUnderTest, isolatedTo: semaphore) { semaphore in let isWaiting = semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by replacing `systemUnderTest.enqueue` above with `Task`. + // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -100,104 +133,120 @@ struct FIFOQueueTests { await semaphore.wait() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueue_isNotReentrant() async { - let counter = Counter() - systemUnderTest.enqueue { [systemUnderTest] in - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: 2) - } - await counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: 3) + @Test func test_throwingTask_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(enqueuedOn: systemUnderTest) { + let isWaiting = await semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + await semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + try doWork() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueOn_isNotReentrant() async { - let counter = Counter() - systemUnderTest.enqueue(on: counter) { [systemUnderTest] counter in - systemUnderTest.enqueue(on: counter) { counter in - counter.incrementAndExpectCount(equals: 2) - } - counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue(on: counter) { counter in - counter.incrementAndExpectCount(equals: 3) + @Test func test_throwingTaskIsolatedTo_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(enqueuedOn: systemUnderTest, isolatedTo: semaphore) { semaphore in + let isWaiting = semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + try doWork() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueAndWait_enqueue_areNotReentrant() async { + @Test func test_task_isNotReentrant() async { let counter = Counter() - await systemUnderTest.enqueueAndWait { [systemUnderTest] in - systemUnderTest.enqueue { + Task(enqueuedOn: systemUnderTest) { [systemUnderTest] in + Task(enqueuedOn: systemUnderTest) { await counter.incrementAndExpectCount(equals: 2) } await counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue { + Task(enqueuedOn: systemUnderTest) { await counter.incrementAndExpectCount(equals: 3) } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueAndWaitOn_enqueueOn_areNotReentrant() async { + @Test func test_taskIsolatedTo_isNotReentrant() async { let counter = Counter() - await systemUnderTest.enqueueAndWait(on: counter) { [systemUnderTest] counter in - systemUnderTest.enqueue(on: counter) { counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 2) } counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue(on: counter) { counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 3) } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueAndWait_enqueueOn_areNotReentrant() async { + @Test func test_throwingTask_isNotReentrant() async { let counter = Counter() - await systemUnderTest.enqueueAndWait { [systemUnderTest] in - systemUnderTest.enqueue(on: counter) { counter in - counter.incrementAndExpectCount(equals: 2) + Task(enqueuedOn: systemUnderTest) { [systemUnderTest] in + Task(enqueuedOn: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 2) + try doWork() } await counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue(on: counter) { counter in - counter.incrementAndExpectCount(equals: 3) + Task(enqueuedOn: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 3) + try doWork() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueueAndWaitOn_enqueue_areNotReentrant() async { + @Test func test_throwingTaskIsolatedTo_isNotReentrant() async throws { let counter = Counter() - await systemUnderTest.enqueueAndWait(on: counter) { [systemUnderTest] counter in - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: 2) + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 2) + try doWork() } counter.incrementAndExpectCount(equals: 1) - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: 3) + Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 3) + try doWork() } } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value } - @Test func test_enqueue_executesAfterReceiverIsDeallocated() async { + @Test func test_task_executesAfterQueueIsDeallocated() async throws { var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - systemUnderTest?.enqueue { + Task(enqueuedOn: try #require(systemUnderTest)) { // Make the queue wait. await semaphore.wait() await counter.incrementAndExpectCount(equals: 1) } - systemUnderTest?.enqueue { + Task(enqueuedOn: try #require(systemUnderTest)) { // This async task should not execute until the semaphore is released. await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -212,17 +261,17 @@ struct FIFOQueueTests { await expectation.fulfillment(withinSeconds: 30) } - @Test func test_enqueueOn_executesAfterReceiverIsDeallocated() async { + @Test func test_taskIsolatedTo_executesAfterQueueIsDeallocated() async throws { var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - systemUnderTest?.enqueue(on: counter) { counter in + Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in // Make the queue wait. await semaphore.wait() counter.incrementAndExpectCount(equals: 1) } - systemUnderTest?.enqueue(on: counter) { counter in + Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in // This async task should not execute until the semaphore is released. counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -237,153 +286,107 @@ struct FIFOQueueTests { await expectation.fulfillment(withinSeconds: 30) } - @Test func test_enqueue_doesNotRetainTaskAfterExecution() async { - final class Reference: Sendable {} - final class ReferenceHolder: @unchecked Sendable { - var reference: Reference? = Reference() + @Test func test_throwingTask_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = Expectation() + let semaphore = Semaphore() + Task(enqueuedOn: try #require(systemUnderTest)) { + // Make the queue wait. + await semaphore.wait() + await counter.incrementAndExpectCount(equals: 1) + try doWork() } - let referenceHolder = ReferenceHolder() - weak var weakReference = referenceHolder.reference - let asyncSemaphore = Semaphore() - let syncSemaphore = Semaphore() - systemUnderTest.enqueue { [reference = referenceHolder.reference] in - // Now that we've started the task and captured the reference, release the synchronous code. - await syncSemaphore.signal() - // Wait for the synchronous setup to complete and the reference to be nil'd out. - await asyncSemaphore.wait() - // Retain the unsafe counter until the task is completed. - _ = reference + Task(enqueuedOn: try #require(systemUnderTest)) { + // This async task should not execute until the semaphore is released. + await counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + try doWork() } - // Wait for the asynchronous task to start. - await syncSemaphore.wait() - referenceHolder.reference = nil - #expect(weakReference != nil) - // Allow the enqueued task to complete. - await asyncSemaphore.signal() - // Make sure the task has completed. - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - #expect(weakReference == nil) - } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() - @Test func test_enqueueOn_doesNotRetainTaskAfterExecution() async { - final class Reference: Sendable {} - final class ReferenceHolder: @unchecked Sendable { - var reference: Reference? = Reference() - } - let referenceHolder = ReferenceHolder() - weak var weakReference = referenceHolder.reference - let asyncSemaphore = Semaphore() - let syncSemaphore = Semaphore() - systemUnderTest.enqueue(on: syncSemaphore) { [reference = referenceHolder.reference] syncSemaphore in - // Now that we've started the task and captured the reference, release the synchronous code. - syncSemaphore.signal() - // Wait for the synchronous setup to complete and the reference to be nil'd out. - await asyncSemaphore.wait() - // Retain the unsafe counter until the task is completed. - _ = reference - } - // Wait for the asynchronous task to start. - await syncSemaphore.wait() - referenceHolder.reference = nil - #expect(weakReference != nil) - // Allow the enqueued task to complete. - await asyncSemaphore.signal() - // Make sure the task has completed. - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - #expect(weakReference == nil) + await expectation.fulfillment(withinSeconds: 30) } - @Test func test_enqueueAndWait_sendsEventsInOrder() async { + @Test func test_throwingTaskIsolatedTo_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() - for iteration in 1...1_000 { - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: iteration) - } - - guard iteration % 25 == 0 else { - // Keep sending async events to the queue. - continue - } - - await systemUnderTest.enqueueAndWait { - let count = await counter.count - #expect(count == iteration) - } + let expectation = Expectation() + let semaphore = Semaphore() + Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + // Make the queue wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + try doWork() } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - } - - @Test func test_enqueueAndWaitOn_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - systemUnderTest.enqueue { - await counter.incrementAndExpectCount(equals: iteration) - } - - guard iteration % 25 == 0 else { - // Keep sending async events to the queue. - continue - } - - await systemUnderTest.enqueueAndWait(on: counter) { counter in - let count = counter.count - #expect(count == iteration) - } + Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + // This async task should not execute until the semaphore is released. + counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + try doWork() } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await expectation.fulfillment(withinSeconds: 30) } - @Test func test_enqueueAndWait_canReturn() async { + @Test func test_task_canReturn() async { let expectedValue = UUID() - let returnedValue = await systemUnderTest.enqueueAndWait { expectedValue } + let returnedValue = await Task(enqueuedOn: systemUnderTest) { expectedValue }.value #expect(expectedValue == returnedValue) } - @Test func test_enqueueAndWait_throwing_canReturn() async throws { + @Test func test_taskIsolatedTo_canReturn() async { let expectedValue = UUID() - @Sendable func throwingMethod() throws {} - let returnedValue = try await systemUnderTest.enqueueAndWait { - try throwingMethod() - return expectedValue - } + let returnedValue = await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in expectedValue }.value #expect(expectedValue == returnedValue) } - @Test func test_enqueueAndWaitOn_canReturn() async { + @Test func test_throwingTask_canReturn() async throws { let expectedValue = UUID() - let returnedValue = await systemUnderTest.enqueueAndWait(on: Counter()) { _ in expectedValue } - #expect(expectedValue == returnedValue) + @Sendable func generateValue() throws -> UUID { + expectedValue + } + #expect(try await Task(enqueuedOn: systemUnderTest) { try generateValue() }.value == expectedValue) } - @Test func test_enqueueAndWaitOn_throwing_canReturn() async throws { + @Test func test_throwingTaskIsolatedTo_canReturn() async throws { let expectedValue = UUID() - @Sendable func throwingMethod() throws {} - let returnedValue = try await systemUnderTest.enqueueAndWait(on: Counter()) { _ in - try throwingMethod() - return expectedValue + @Sendable func generateValue() throws -> UUID { + expectedValue } - #expect(expectedValue == returnedValue) + #expect(try await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in try generateValue() }.value == expectedValue) } - @Test func test_enqueueAndWait_canThrow() async { + @Test func test_throwingTask_canThrow() async { struct TestError: Error, Equatable { private let identifier = UUID() } let expectedError = TestError() do { - try await systemUnderTest.enqueueAndWait { throw expectedError } + try await Task(enqueuedOn: systemUnderTest) { throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } } - @Test func test_enqueueAndWaitOn_canThrow() async { + @Test func test_throwingTaskIsolatedTo_canThrow() async { struct TestError: Error, Equatable { private let identifier = UUID() } let expectedError = TestError() do { - try await systemUnderTest.enqueueAndWait(on: Counter()) { _ in throw expectedError } + try await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } @@ -392,4 +395,6 @@ struct FIFOQueueTests { // MARK: Private private let systemUnderTest = FIFOQueue() + + @Sendable private func doWork() throws -> Void {} } diff --git a/Tests/AsyncQueueTests/MainActorQueueTests.swift b/Tests/AsyncQueueTests/MainActorQueueTests.swift deleted file mode 100644 index aa31ae7..0000000 --- a/Tests/AsyncQueueTests/MainActorQueueTests.swift +++ /dev/null @@ -1,200 +0,0 @@ -// MIT License -// -// Copyright (c) 2023 Dan Federman -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -import Foundation -import Testing - -@testable import AsyncQueue - -struct MainActorQueueTests { - // MARK: Behavior Tests - - @TestingQueue - @Test func test_shared_returnsSameInstance() async { - #expect(MainActorQueue.shared === MainActorQueue.shared) - } - - @TestingQueue - @Test func test_enqueue_executesOnMainThread() async { - let key: DispatchSpecificKey = .init() - DispatchQueue.main.setSpecific(key: key, value: ()) - systemUnderTest.enqueue { - #expect(DispatchQueue.main.getSpecific(key: key) != nil) - } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - } - - @TestingQueue - @Test func test_enqueue_sendsEventsInOrder() async { - for iteration in 1...1_000 { - systemUnderTest.enqueue { [counter] in - await counter.incrementAndExpectCount(equals: iteration) - } - } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - } - - @TestingQueue - @Test func test_enqueue_startsExecutionOfNextTaskAfterSuspension() async { - let semaphore = Semaphore() - - systemUnderTest.enqueue { - await semaphore.wait() - } - systemUnderTest.enqueue { - // Signal the semaphore from the actor queue. - // If the actor queue were FIFO, this test would hang since this code would never execute: - // we'd still be waiting for the prior `wait()` tasks to finish. - await semaphore.signal() - } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - } - - @TestingQueue - @Test func test_enqueueAndWait_executesOnMainThread() async { - let key: DispatchSpecificKey = .init() - DispatchQueue.main.setSpecific(key: key, value: ()) - await systemUnderTest.enqueueAndWait { - #expect(DispatchQueue.main.getSpecific(key: key) != nil) - } - } - - @TestingQueue - @Test func test_enqueueAndWait_allowsReentrancy() async { - await systemUnderTest.enqueueAndWait { [systemUnderTest, counter] in - await systemUnderTest.enqueueAndWait { [counter] in - await counter.incrementAndExpectCount(equals: 1) - } - await counter.incrementAndExpectCount(equals: 2) - } - } - - @TestingQueue - @Test func test_enqueue_doesNotRetainTaskAfterExecution() async { - final class Reference: Sendable {} - final class ReferenceHolder: @unchecked Sendable { - init() { - reference = Reference() - weakReference = reference - } - private(set) var reference: Reference? - private(set) weak var weakReference: Reference? - - func clearReference() { - reference = nil - } - } - let referenceHolder = ReferenceHolder() - let asyncSemaphore = Semaphore() - let syncSemaphore = Semaphore() - let systemUnderTest = ActorQueue() - systemUnderTest.adoptExecutionContext(of: syncSemaphore) - - let expectation = Expectation() - systemUnderTest.enqueue { [reference = referenceHolder.reference] syncSemaphore in - // Now that we've started the task and captured the reference, release the synchronous code. - syncSemaphore.signal() - // Wait for the synchronous setup to complete and the reference to be nil'd out. - await asyncSemaphore.wait() - // Retain the unsafe counter until the task is completed. - _ = reference - systemUnderTest.enqueue { _ in - // Signal that this task has cleaned up. - // This closure will not execute until the prior closure completes. - expectation.fulfill() - } - } - // Wait for the asynchronous task to start. - await syncSemaphore.wait() - referenceHolder.clearReference() - #expect(referenceHolder.weakReference != nil) - // Allow the enqueued task to complete. - await asyncSemaphore.signal() - // Make sure the task has completed. - await expectation.fulfillment(withinSeconds: 30) - - #expect(referenceHolder.weakReference == nil) - } - - @TestingQueue - @Test func test_enqueueAndWait_sendsEventsInOrder() async { - for iteration in 1...1_000 { - systemUnderTest.enqueue { [counter] in - await counter.incrementAndExpectCount(equals: iteration) - } - - guard iteration % 25 == 0 else { - // Keep sending async events to the queue. - continue - } - - await systemUnderTest.enqueueAndWait { [counter] in - let count = await counter.count - #expect(count == iteration) - } - } - await systemUnderTest.enqueueAndWait { /* Drain the queue */ } - } - - @TestingQueue - @Test func test_enqueueAndWait_canReturn() async { - let expectedValue = UUID() - let returnedValue = await systemUnderTest.enqueueAndWait { expectedValue } - #expect(expectedValue == returnedValue) - } - - @TestingQueue - @Test func test_enqueueAndWait_throwing_canReturn() async throws { - let expectedValue = UUID() - @Sendable func throwingMethod() throws {} - let returnedValue = try await systemUnderTest.enqueueAndWait { - try throwingMethod() - return expectedValue - } - #expect(expectedValue == returnedValue) - } - - @TestingQueue - @Test func test_enqueueAndWait_canThrow() async { - struct TestError: Error, Equatable { - private let identifier = UUID() - } - let expectedError = TestError() - do { - try await systemUnderTest.enqueueAndWait { throw expectedError } - } catch { - #expect(error as? TestError == expectedError) - } - } - - // MARK: Private - - private let systemUnderTest = MainActorQueue() - private let counter = Counter() -} - -/// A global actor that forces the above `MainActorQueueTests` to not run on `main`, where tests may otherwise deadlock due to waiting for `main` from `main`. -@globalActor -private struct TestingQueue { - fileprivate actor Shared {} - fileprivate static let shared = Shared() -} diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index c95a919..7ad2b05 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -22,6 +22,8 @@ import Testing +@testable import AsyncQueue + final class SemaphoreTests { // MARK: Initialization @@ -45,7 +47,7 @@ final class SemaphoreTests { 4. We must utilize a single actor's isolated context to avoid accidental interleaving when suspending to communicate across actor contexts. In order to ensure that we are executing the `wait()` calls before we call `signal()` _without awaiting a `wait()` call_, - we utilize the Semaphore's ordered execution context to enqueue ordered `Task`s similar to how an ActorQueue works. + we utilize the AsyncQueue.Semaphore's ordered execution context to enqueue ordered `Task`s similar to how an ActorQueue works. */ let iterationCount = 1_000 @@ -103,16 +105,16 @@ final class SemaphoreTests { // MARK: Private - private let systemUnderTest = Semaphore() + private let systemUnderTest = AsyncQueue.Semaphore() } -// MARK: - Semaphore Extension +// MARK: - AsyncQueue.Semaphore Extension -private extension Semaphore { +private extension AsyncQueue.Semaphore { /// Enqueues an asynchronous task and increments a counter after the task completes. /// This method suspends the caller until the asynchronous task has begun, ensuring ordered execution of enqueued tasks. /// - Parameter task: A unit of work that returns work to execute after the task completes and the count is incremented. - func enqueueAndCount(using counter: UnsafeCounter, _ task: @escaping @Sendable (isolated Semaphore) async -> (@Sendable (isolated Semaphore) -> Void)?) async { + func enqueueAndCount(using counter: UnsafeCounter, _ task: @escaping @Sendable (isolated AsyncQueue.Semaphore) async -> (@Sendable (isolated AsyncQueue.Semaphore) -> Void)?) async { // Await the start of the soon-to-be-enqueued `Task` with a continuation. await withCheckedContinuation { continuation in // Re-enter the semaphore's ordered context but don't wait for the result. @@ -126,7 +128,7 @@ private extension Semaphore { } } - func execute(_ task: @Sendable (isolated Semaphore) async throws -> Void) async rethrows { + func execute(_ task: @Sendable (isolated AsyncQueue.Semaphore) async throws -> Void) async rethrows { try await task(self) } } From 58dafe5d6c25eb177be29c1d1a756c23968f944d Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Wed, 2 Apr 2025 09:41:28 -0700 Subject: [PATCH 2/8] use @isolated(any) --- Sources/AsyncQueue/FIFOQueue.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index a6298c8..9b788ad 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -90,7 +90,7 @@ extension Task { public init( priority: TaskPriority? = nil, enqueuedOn fifoQueue: FIFOQueue, - operation: @Sendable @escaping () async -> Success + operation: @Sendable @escaping @isolated(any) () async -> Success ) where Failure == Never { let delivery = Delivery() let task = FIFOQueue.FIFOTask { @@ -133,7 +133,7 @@ extension Task { public init( priority: TaskPriority? = nil, enqueuedOn actorQueue: FIFOQueue, - operation: @escaping @Sendable () async throws -> Success + operation: @escaping @Sendable @isolated(any) () async throws -> Success ) where Failure == any Error { let delivery = Delivery() let task = FIFOQueue.FIFOTask { From 1e63d4c5d11b1faef242bd2ccee1a0d3f4c194a0 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 5 Apr 2025 01:45:07 -0700 Subject: [PATCH 3/8] Shorten and improve API --- README.md | 12 +- Sources/AsyncQueue/ActorQueue.swift | 12 +- Sources/AsyncQueue/FIFOQueue.swift | 34 ++++-- Tests/AsyncQueueTests/ActorQueueTests.swift | 56 ++++----- Tests/AsyncQueueTests/FIFOQueueTests.swift | 122 +++++++++++--------- 5 files changed, 133 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index 5d8c5c9..31f2633 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ func testFIFOQueueOrdering() async { actor Counter { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - Task(enqueuedOn: queue) { + Task(on: queue) { await self.increment() let incrementedCount = await self.count XCTAssertEqual(incrementedCount, expectedCount) // always succeeds @@ -59,7 +59,7 @@ func testFIFOQueueOrdering() async { } func flushQueue() async { - await Task(enqueuedOn: queue) {}.value + await Task(on: queue) {}.value } func increment() { @@ -101,14 +101,14 @@ func testActorQueueOrdering() async { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - await Task(enqueuedOn: queue) { myself in + await Task(on: queue) { myself in myself.count += 1 XCTAssertEqual(expectedCount, myself.count) // always succeeds } } func flushQueue() async { - await Task(enqueuedOn: queue) {}.value + await Task(on: queue) {}.value } private var count = 0 @@ -137,7 +137,7 @@ func testMainActorQueueOrdering() async { final class Counter { nonisolated func incrementAndAssertCountEquals(_ expectedCount: Int) { - Task(enqueuedOn: MainActor.queue) { + Task(on: MainActor.queue) { self.increment() let incrementedCount = self.count XCTAssertEqual(incrementedCount, expectedCount) // always succeeds @@ -145,7 +145,7 @@ func testMainActorQueueOrdering() async { } func flushQueue() async { - await Task(enqueuedOn: MainActor.queue) { }.value + await Task(on: MainActor.queue) { }.value } func increment() { diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 7231b35..2eac618 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -35,13 +35,13 @@ /// /// nonisolated /// public func log(_ message: String) { -/// Task(enqueuedOn: queue) { myself in +/// Task(on: queue) { myself in /// myself.logs.append(message) /// } /// } /// /// public func retrieveLogs() async -> [String] { -/// await Task(enqueuedOn: queue) { myself in myself.logs }.value +/// await Task(on: queue) { myself in myself.logs }.value /// } /// /// private let queue = ActorQueue() @@ -167,7 +167,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn actorQueue: ActorQueue, + on actorQueue: ActorQueue, operation: @Sendable @escaping (isolated ActorType) async -> Success ) where Failure == Never { let delivery = Delivery() @@ -213,7 +213,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn actorQueue: ActorQueue, + on actorQueue: ActorQueue, operation: @escaping @Sendable (isolated ActorType) async throws -> Success ) where Failure == any Error { let delivery = Delivery() @@ -264,7 +264,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn actorQueue: ActorQueue, + on actorQueue: ActorQueue, operation: @MainActor @escaping () async -> Success ) where Failure == Never { let delivery = Delivery() @@ -310,7 +310,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn actorQueue: ActorQueue, + on actorQueue: ActorQueue, operation: @escaping @MainActor () async throws -> Success ) where Failure == any Error { let delivery = Delivery() diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index 9b788ad..b715c2a 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -89,12 +89,13 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn fifoQueue: FIFOQueue, - operation: @Sendable @escaping @isolated(any) () async -> Success + on fifoQueue: FIFOQueue, + @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success ) where Failure == Never { let delivery = Delivery() + let executeOnce = UnsafeClosureHolder(operation: operation) let task = FIFOQueue.FIFOTask { - await delivery.sendValue(operation()) + await delivery.sendValue(executeOnce.operation()) } fifoQueue.taskStreamContinuation.yield(task) self.init(priority: priority) { @@ -132,13 +133,14 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn actorQueue: FIFOQueue, - operation: @escaping @Sendable @isolated(any) () async throws -> Success + on actorQueue: FIFOQueue, + @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success ) where Failure == any Error { let delivery = Delivery() + let executeOnce = UnsafeThrowingClosureHolder(operation: operation) let task = FIFOQueue.FIFOTask { do { - try await delivery.sendValue(operation()) + try await delivery.sendValue(executeOnce.operation()) } catch { await delivery.sendFailure(error) } @@ -180,7 +182,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn fifoQueue: FIFOQueue, + on fifoQueue: FIFOQueue, isolatedTo isolatedActor: ActorType, operation: @Sendable @escaping (isolated ActorType) async -> Success ) where Failure == Never { @@ -225,7 +227,7 @@ extension Task { @discardableResult public init( priority: TaskPriority? = nil, - enqueuedOn fifoQueue: FIFOQueue, + on fifoQueue: FIFOQueue, isolatedTo isolatedActor: ActorType, operation: @Sendable @escaping (isolated ActorType) async throws -> Success ) where Failure == any Error { @@ -244,3 +246,19 @@ extension Task { } } } + +private struct UnsafeClosureHolder: @unchecked Sendable { + init(operation: sending @escaping @isolated(any) () async -> Success) { + self.operation = operation + } + + let operation: @isolated(any) () async -> Success +} + +private struct UnsafeThrowingClosureHolder: @unchecked Sendable { + init(operation: sending @escaping @isolated(any) () async throws -> Success) { + self.operation = operation + } + + let operation: @isolated(any) () async throws -> Success +} diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 6e965f8..6dae561 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -53,7 +53,7 @@ struct ActorQueueTests { systemUnderTest.adoptExecutionContext(of: counter!) let semaphore = Semaphore() - Task(enqueuedOn: systemUnderTest) { counter in + Task(on: systemUnderTest) { counter in await semaphore.wait() } @@ -69,7 +69,7 @@ struct ActorQueueTests { systemUnderTest.adoptExecutionContext(of: counter!) let semaphore = Semaphore() - Task(enqueuedOn: systemUnderTest) { counter in + Task(on: systemUnderTest) { counter in await semaphore.wait() try doWork() } @@ -81,7 +81,7 @@ struct ActorQueueTests { @Test func test_task_taskParameterIsAdoptedActor() async { let semaphore = Semaphore() - Task(enqueuedOn: systemUnderTest) { [storedCounter = counter] counter in + Task(on: systemUnderTest) { [storedCounter = counter] counter in #expect(counter === storedCounter) await semaphore.signal() } @@ -91,7 +91,7 @@ struct ActorQueueTests { @Test func test_throwingTask_taskParameterIsAdoptedActor() async { let semaphore = Semaphore() - Task(enqueuedOn: systemUnderTest) { [storedCounter = counter] counter in + Task(on: systemUnderTest) { [storedCounter = counter] counter in #expect(counter === storedCounter) await semaphore.signal() try doWork() @@ -102,7 +102,7 @@ struct ActorQueueTests { @Test func test_task_sendsEventsInOrder() async throws { let orderedTasks = (1...1_000).map { iteration in - Task(enqueuedOn: systemUnderTest) { counter in + Task(on: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: iteration) } } @@ -112,7 +112,7 @@ struct ActorQueueTests { @Test func test_throwingTask_sendsEventsInOrder() async throws { let orderedTasks = (1...1_000).map { iteration in - Task(enqueuedOn: systemUnderTest) { counter in + Task(on: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: iteration) try doWork() } @@ -124,7 +124,7 @@ struct ActorQueueTests { @TestingQueue @Test func test_mainTask_sendsEventsInOrder() async throws { let orderedTasks = (1...1_000).map { iteration in - Task(enqueuedOn: MainActor.queue) { + Task(on: MainActor.queue) { await counter.incrementAndExpectCount(equals: iteration) } } @@ -135,7 +135,7 @@ struct ActorQueueTests { @TestingQueue @Test func test_mainThrowingTask_sendsEventsInOrder() async throws { let orderedTasks = (1...1_000).map { iteration in - Task(enqueuedOn: MainActor.queue) { + Task(on: MainActor.queue) { await counter.incrementAndExpectCount(equals: iteration) try doWork() } @@ -149,16 +149,16 @@ struct ActorQueueTests { let semaphore = AsyncQueue.Semaphore() systemUnderTest.adoptExecutionContext(of: semaphore) - Task(enqueuedOn: systemUnderTest) { semaphore in + Task(on: systemUnderTest) { semaphore in await semaphore.wait() } - Task(enqueuedOn: systemUnderTest) { semaphore in + Task(on: systemUnderTest) { semaphore in // Signal the semaphore from the actor queue. // If the actor queue were FIFO, this test would hang since this code would never execute: // we'd still be waiting for the prior `wait()` tasks to finish. semaphore.signal() } - await Task(enqueuedOn: systemUnderTest) { _ in /* Drain the queue */ }.value + await Task(on: systemUnderTest) { _ in /* Drain the queue */ }.value } @Test func test_throwingTask_startsExecutionOfNextTaskAfterSuspension() async { @@ -166,23 +166,23 @@ struct ActorQueueTests { let semaphore = AsyncQueue.Semaphore() systemUnderTest.adoptExecutionContext(of: semaphore) - Task(enqueuedOn: systemUnderTest) { semaphore in + Task(on: systemUnderTest) { semaphore in await semaphore.wait() try doWork() } - Task(enqueuedOn: systemUnderTest) { semaphore in + Task(on: systemUnderTest) { semaphore in // Signal the semaphore from the actor queue. // If the actor queue were FIFO, this test would hang since this code would never execute: // we'd still be waiting for the prior `wait()` tasks to finish. semaphore.signal() try doWork() } - await Task(enqueuedOn: systemUnderTest) { _ in /* Drain the queue */ }.value + await Task(on: systemUnderTest) { _ in /* Drain the queue */ }.value } @Test func test_task_allowsReentrancy() async { - await Task(enqueuedOn: systemUnderTest) { [systemUnderTest] counter in - await Task(enqueuedOn: systemUnderTest) { counter in + await Task(on: systemUnderTest) { [systemUnderTest] counter in + await Task(on: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: 1) }.value counter.incrementAndExpectCount(equals: 2) @@ -190,9 +190,9 @@ struct ActorQueueTests { } @Test func test_throwingTask_allowsReentrancy() async throws { - try await Task(enqueuedOn: systemUnderTest) { [systemUnderTest] counter in + try await Task(on: systemUnderTest) { [systemUnderTest] counter in try doWork() - try await Task(enqueuedOn: systemUnderTest) { counter in + try await Task(on: systemUnderTest) { counter in try doWork() counter.incrementAndExpectCount(equals: 1) }.value @@ -203,8 +203,8 @@ struct ActorQueueTests { @TestingQueue @Test func test_mainTask_allowsReentrancy() async { - await Task(enqueuedOn: MainActor.queue) { [counter] in - await Task(enqueuedOn: MainActor.queue) { + await Task(on: MainActor.queue) { [counter] in + await Task(on: MainActor.queue) { await counter.incrementAndExpectCount(equals: 1) }.value await counter.incrementAndExpectCount(equals: 2) @@ -213,9 +213,9 @@ struct ActorQueueTests { @TestingQueue @Test func test_mainThrowingTask_allowsReentrancy() async throws { - try await Task(enqueuedOn: MainActor.queue) { [counter] in + try await Task(on: MainActor.queue) { [counter] in try doWork() - try await Task(enqueuedOn: MainActor.queue) { + try await Task(on: MainActor.queue) { try doWork() await counter.incrementAndExpectCount(equals: 1) }.value @@ -230,7 +230,7 @@ struct ActorQueueTests { let expectation = Expectation() let semaphore = AsyncQueue.Semaphore() - Task(enqueuedOn: try #require(systemUnderTest)) { counter in + Task(on: try #require(systemUnderTest)) { counter in // Make the task wait. await semaphore.wait() counter.incrementAndExpectCount(equals: 1) @@ -251,7 +251,7 @@ struct ActorQueueTests { let expectation = Expectation() let semaphore = AsyncQueue.Semaphore() - Task(enqueuedOn: try #require(systemUnderTest)) { counter in + Task(on: try #require(systemUnderTest)) { counter in try doWork() // Make the task wait. @@ -270,7 +270,7 @@ struct ActorQueueTests { @Test func test_task_canReturn() async { let expectedValue = UUID() - let returnedValue = await Task(enqueuedOn: systemUnderTest) { _ in expectedValue }.value + let returnedValue = await Task(on: systemUnderTest) { _ in expectedValue }.value #expect(expectedValue == returnedValue) } @@ -279,7 +279,7 @@ struct ActorQueueTests { @Sendable func generateValue() throws -> UUID { expectedValue } - #expect(try await Task(enqueuedOn: systemUnderTest) { _ in try generateValue() }.value == expectedValue) + #expect(try await Task(on: systemUnderTest) { _ in try generateValue() }.value == expectedValue) } @Test func test_throwingTask_canThrow() async { @@ -288,7 +288,7 @@ struct ActorQueueTests { } let expectedError = TestError() do { - try await Task(enqueuedOn: systemUnderTest) { _ in throw expectedError }.value + try await Task(on: systemUnderTest) { _ in throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } @@ -300,7 +300,7 @@ struct ActorQueueTests { } let expectedError = TestError() do { - try await Task(enqueuedOn: MainActor.queue) { throw expectedError }.value + try await Task(on: MainActor.queue) { throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } diff --git a/Tests/AsyncQueueTests/FIFOQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift index f78a5f5..3b05747 100644 --- a/Tests/AsyncQueueTests/FIFOQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -32,43 +32,55 @@ struct FIFOQueueTests { @Test func test_task_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @MainActor + @Test func test_task_sendsEventsInOrderInLocalContext() async { + var count = 0 + for iteration in 1...1_000 { + Task(on: systemUnderTest) { + count += 1 + #expect(iteration == count) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_taskIsolatedTo_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTask_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTaskIsolatedTo_sendsEventsInOrder() async { let counter = Counter() for iteration in 1...1_000 { - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_task_interleavedWithTaskIsolatedTo_andThrowing_sendsEventsInOrder() async { @@ -76,35 +88,35 @@ struct FIFOQueueTests { for iteration in 1...1_000 { let mod = iteration % 4 if mod == 0 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) } } else if mod == 1 { - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) } } else if mod == 2 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: iteration) try doWork() } } else { - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: iteration) try doWork() } } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_task_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { let isWaiting = await semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + // You can prove this to yourself by deleting `on: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -114,16 +126,16 @@ struct FIFOQueueTests { await semaphore.wait() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_taskIsolatedTo_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - Task(enqueuedOn: systemUnderTest, isolatedTo: semaphore) { semaphore in + Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in let isWaiting = semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + // You can prove this to yourself by deleting `on: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -133,16 +145,16 @@ struct FIFOQueueTests { await semaphore.wait() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTask_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { let isWaiting = await semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + // You can prove this to yourself by deleting `on: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -153,16 +165,16 @@ struct FIFOQueueTests { try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTaskIsolatedTo_executesAsyncBlocksAtomically() async { let semaphore = Semaphore() for _ in 1...1_000 { - Task(enqueuedOn: systemUnderTest, isolatedTo: semaphore) { semaphore in + Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in let isWaiting = semaphore.isWaiting // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `enqueuedOn: systemUnderTest` above. + // You can prove this to yourself by deleting `on: systemUnderTest` above. #expect(!isWaiting) // Signal the semaphore before or after we wait – let the scheduler decide. Task { @@ -173,67 +185,67 @@ struct FIFOQueueTests { try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_task_isNotReentrant() async { let counter = Counter() - Task(enqueuedOn: systemUnderTest) { [systemUnderTest] in - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { [systemUnderTest] in + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: 2) } await counter.incrementAndExpectCount(equals: 1) - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: 3) } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_taskIsolatedTo_isNotReentrant() async { let counter = Counter() - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 2) } counter.incrementAndExpectCount(equals: 1) - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 3) } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTask_isNotReentrant() async { let counter = Counter() - Task(enqueuedOn: systemUnderTest) { [systemUnderTest] in - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { [systemUnderTest] in + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: 2) try doWork() } await counter.incrementAndExpectCount(equals: 1) - Task(enqueuedOn: systemUnderTest) { + Task(on: systemUnderTest) { await counter.incrementAndExpectCount(equals: 3) try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_throwingTaskIsolatedTo_isNotReentrant() async throws { let counter = Counter() - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 2) try doWork() } counter.incrementAndExpectCount(equals: 1) - Task(enqueuedOn: systemUnderTest, isolatedTo: counter) { counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in counter.incrementAndExpectCount(equals: 3) try doWork() } } - await Task(enqueuedOn: systemUnderTest) { /* Drain the queue */ }.value + await Task(on: systemUnderTest) { /* Drain the queue */ }.value } @Test func test_task_executesAfterQueueIsDeallocated() async throws { @@ -241,12 +253,12 @@ struct FIFOQueueTests { let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - Task(enqueuedOn: try #require(systemUnderTest)) { + Task(on: try #require(systemUnderTest)) { // Make the queue wait. await semaphore.wait() await counter.incrementAndExpectCount(equals: 1) } - Task(enqueuedOn: try #require(systemUnderTest)) { + Task(on: try #require(systemUnderTest)) { // This async task should not execute until the semaphore is released. await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -266,12 +278,12 @@ struct FIFOQueueTests { let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in // Make the queue wait. await semaphore.wait() counter.incrementAndExpectCount(equals: 1) } - Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in // This async task should not execute until the semaphore is released. counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -291,13 +303,13 @@ struct FIFOQueueTests { let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - Task(enqueuedOn: try #require(systemUnderTest)) { + Task(on: try #require(systemUnderTest)) { // Make the queue wait. await semaphore.wait() await counter.incrementAndExpectCount(equals: 1) try doWork() } - Task(enqueuedOn: try #require(systemUnderTest)) { + Task(on: try #require(systemUnderTest)) { // This async task should not execute until the semaphore is released. await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -318,13 +330,13 @@ struct FIFOQueueTests { let counter = Counter() let expectation = Expectation() let semaphore = Semaphore() - Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in // Make the queue wait. await semaphore.wait() counter.incrementAndExpectCount(equals: 1) try doWork() } - Task(enqueuedOn: try #require(systemUnderTest), isolatedTo: counter) { counter in + Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in // This async task should not execute until the semaphore is released. counter.incrementAndExpectCount(equals: 2) expectation.fulfill() @@ -342,13 +354,13 @@ struct FIFOQueueTests { @Test func test_task_canReturn() async { let expectedValue = UUID() - let returnedValue = await Task(enqueuedOn: systemUnderTest) { expectedValue }.value + let returnedValue = await Task(on: systemUnderTest) { expectedValue }.value #expect(expectedValue == returnedValue) } @Test func test_taskIsolatedTo_canReturn() async { let expectedValue = UUID() - let returnedValue = await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in expectedValue }.value + let returnedValue = await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in expectedValue }.value #expect(expectedValue == returnedValue) } @@ -357,7 +369,7 @@ struct FIFOQueueTests { @Sendable func generateValue() throws -> UUID { expectedValue } - #expect(try await Task(enqueuedOn: systemUnderTest) { try generateValue() }.value == expectedValue) + #expect(try await Task(on: systemUnderTest) { try generateValue() }.value == expectedValue) } @Test func test_throwingTaskIsolatedTo_canReturn() async throws { @@ -365,7 +377,7 @@ struct FIFOQueueTests { @Sendable func generateValue() throws -> UUID { expectedValue } - #expect(try await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in try generateValue() }.value == expectedValue) + #expect(try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in try generateValue() }.value == expectedValue) } @Test func test_throwingTask_canThrow() async { @@ -374,7 +386,7 @@ struct FIFOQueueTests { } let expectedError = TestError() do { - try await Task(enqueuedOn: systemUnderTest) { throw expectedError }.value + try await Task(on: systemUnderTest) { throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } @@ -386,7 +398,7 @@ struct FIFOQueueTests { } let expectedError = TestError() do { - try await Task(enqueuedOn: systemUnderTest, isolatedTo: Semaphore()) { _ in throw expectedError }.value + try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in throw expectedError }.value } catch { #expect(error as? TestError == expectedError) } From 21076d4f07b13674b97616db48e46122e1f96fcf Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 7 Apr 2025 20:18:10 -0700 Subject: [PATCH 4/8] Update documentation --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 31f2633..ac89276 100644 --- a/README.md +++ b/README.md @@ -127,9 +127,9 @@ func testActorQueueOrdering() async { ### Sending ordered asynchronous tasks to the `@MainActor` from a nonisolated context -Use a `MainActorQueue` to send ordered asynchronous tasks to the `@MainActor`’s isolated context from nonisolated or synchronous contexts. Tasks sent to this queue type are guaranteed to begin executing in the order in which they are enqueued. Like an `ActorQueue`, execution order is guaranteed only until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the enqueued task. A `MainActorQueue` executes tasks within its adopted actor’s isolated context, resulting in `MainActorQueue` task execution having the same properties as a `@MainActor`'s' code execution: code between suspension points is executed atomically, and tasks sent to a single `MainActorQueue` can await results from the queue without deadlocking. +Use `MainActor.queue` to send ordered asynchronous tasks to the `@MainActor`’s isolated context from nonisolated or synchronous contexts. Tasks sent to this queue type are guaranteed to begin executing in the order in which they are enqueued. Like an `ActorQueue`, execution order is guaranteed only until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the enqueued task. A `MainActor.queue` executes tasks within its adopted actor’s isolated context, resulting in `MainActor.queue` task execution having the same properties as a `@MainActor`'s' code execution: code between suspension points is executed atomically, and tasks sent to a single `MainActor.queue` can await results from the queue without deadlocking. -A `MainActorQueue` can easily execute asynchronous tasks from a nonisolated context in FIFO order: +A `MainActor.queue` can easily execute asynchronous tasks from a nonisolated context in FIFO order: ```swift @MainActor func testMainActorQueueOrdering() async { From adbaae75930da744cf5132a32aedc08b193c2fb5 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 7 Apr 2025 20:22:33 -0700 Subject: [PATCH 5/8] Improve priority propagation --- Sources/AsyncQueue/ActorQueue.swift | 24 ++++++++++++++++++------ Sources/AsyncQueue/FIFOQueue.swift | 14 +++----------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 2eac618..3feee5b 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -56,17 +56,20 @@ public final class ActorQueue: @unchecked Sendable { // MARK: Initialization /// Instantiates an actor queue. + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. public init() { let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() self.taskStreamContinuation = taskStreamContinuation func beginExecuting( _ operation: sending @escaping (isolated ActorType) async -> Void, - in context: isolated ActorType + in context: isolated ActorType, + priority: TaskPriority? ) { // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor. // Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution. - Task { + Task(priority: priority) { await operation(context) } } @@ -78,7 +81,8 @@ public final class ActorQueue: @unchecked Sendable { // Await switching to the ActorType context. await beginExecuting( actorTask.task, - in: actorTask.executionContext + in: actorTask.executionContext, + priority: actorTask.priority ) await actorTask.sempahore.signal() } @@ -116,13 +120,19 @@ public final class ActorQueue: @unchecked Sendable { } fileprivate struct ActorTask: Sendable { - init(executionContext: ActorType, task: @escaping @Sendable (isolated ActorType) async -> Void) { + init( + executionContext: ActorType, + priority: TaskPriority?, + task: @escaping @Sendable (isolated ActorType) async -> Void + ) { self.executionContext = executionContext + self.priority = priority self.task = task } let executionContext: ActorType let sempahore = Semaphore() + let priority: TaskPriority? let task: @Sendable (isolated ActorType) async -> Void } @@ -160,8 +170,6 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. /// - actorQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult @@ -173,6 +181,7 @@ extension Task { let delivery = Delivery() let task = ActorQueue.ActorTask( executionContext: actorQueue.executionContext, + priority: priority, task: { executionContext in await delivery.sendValue(operation(executionContext)) } @@ -219,6 +228,7 @@ extension Task { let delivery = Delivery() let task = ActorQueue.ActorTask( executionContext: actorQueue.executionContext, + priority: priority, task: { executionContext in do { try await delivery.sendValue(operation(executionContext)) @@ -270,6 +280,7 @@ extension Task { let delivery = Delivery() let task = ActorQueue.ActorTask( executionContext: actorQueue.executionContext, + priority: priority, task: { executionContext in await delivery.sendValue(operation()) } @@ -316,6 +327,7 @@ extension Task { let delivery = Delivery() let task = ActorQueue.ActorTask( executionContext: actorQueue.executionContext, + priority: priority, task: { executionContext in do { try await delivery.sendValue(operation()) diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index b715c2a..71a6b7e 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -82,13 +82,10 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( - priority: TaskPriority? = nil, on fifoQueue: FIFOQueue, @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success ) where Failure == Never { @@ -98,7 +95,7 @@ extension Task { await delivery.sendValue(executeOnce.operation()) } fifoQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init { await task.sempahore.wait() return await delivery.getValue() } @@ -126,13 +123,10 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( - priority: TaskPriority? = nil, on actorQueue: FIFOQueue, @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success ) where Failure == any Error { @@ -146,7 +140,7 @@ extension Task { } } actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init { await task.sempahore.wait() return try await delivery.getValue() } @@ -174,8 +168,6 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. /// - isolatedActor: The actor to which the operation is isolated. /// - operation: The operation to perform. @@ -191,7 +183,7 @@ extension Task { await delivery.sendValue(operation(isolatedActor)) } fifoQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init { await task.sempahore.wait() return await delivery.getValue() } From 92e77aeb1424e31ad8fcf8495383f44660e3eb46 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 7 Apr 2025 20:30:51 -0700 Subject: [PATCH 6/8] Better task draining --- Tests/AsyncQueueTests/ActorQueueTests.swift | 42 +++++++++++---------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 6dae561..76081b4 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -101,47 +101,51 @@ struct ActorQueueTests { } @Test func test_task_sendsEventsInOrder() async throws { - let orderedTasks = (1...1_000).map { iteration in - Task(on: systemUnderTest) { counter in + var lastTask: Task? + (1...1_000).forEach { iteration in + lastTask = Task(on: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: iteration) } } // Drain the queue - try #require(await orderedTasks.reversed().last?.value) + try await #require(lastTask).value } @Test func test_throwingTask_sendsEventsInOrder() async throws { - let orderedTasks = (1...1_000).map { iteration in - Task(on: systemUnderTest) { counter in + var lastTask: Task? + (1...1_000).forEach { iteration in + lastTask = Task(on: systemUnderTest) { counter in counter.incrementAndExpectCount(equals: iteration) try doWork() } } // Drain the queue - try #require(await orderedTasks.reversed().last?.value) + try await #require(lastTask).value } @TestingQueue @Test func test_mainTask_sendsEventsInOrder() async throws { - let orderedTasks = (1...1_000).map { iteration in - Task(on: MainActor.queue) { + var lastTask: Task? + (1...1_000).forEach { iteration in + lastTask = Task(on: MainActor.queue) { await counter.incrementAndExpectCount(equals: iteration) } } // Drain the queue - try #require(await orderedTasks.reversed().last?.value) + try await #require(lastTask).value } @TestingQueue @Test func test_mainThrowingTask_sendsEventsInOrder() async throws { - let orderedTasks = (1...1_000).map { iteration in - Task(on: MainActor.queue) { + var lastTask: Task? + (1...1_000).forEach { iteration in + lastTask = Task(on: MainActor.queue) { await counter.incrementAndExpectCount(equals: iteration) try doWork() } } // Drain the queue - try #require(await orderedTasks.reversed().last?.value) + try await #require(lastTask).value } @Test func test_task_startsExecutionOfNextTaskAfterSuspension() async { @@ -149,35 +153,35 @@ struct ActorQueueTests { let semaphore = AsyncQueue.Semaphore() systemUnderTest.adoptExecutionContext(of: semaphore) - Task(on: systemUnderTest) { semaphore in + let firstTask = Task(on: systemUnderTest) { semaphore in await semaphore.wait() } - Task(on: systemUnderTest) { semaphore in + let secondTask = Task(on: systemUnderTest) { semaphore in // Signal the semaphore from the actor queue. // If the actor queue were FIFO, this test would hang since this code would never execute: // we'd still be waiting for the prior `wait()` tasks to finish. semaphore.signal() } - await Task(on: systemUnderTest) { _ in /* Drain the queue */ }.value + (_, _) = await (firstTask.value, secondTask.value) } - @Test func test_throwingTask_startsExecutionOfNextTaskAfterSuspension() async { + @Test func test_throwingTask_startsExecutionOfNextTaskAfterSuspension() async throws { let systemUnderTest = ActorQueue() let semaphore = AsyncQueue.Semaphore() systemUnderTest.adoptExecutionContext(of: semaphore) - Task(on: systemUnderTest) { semaphore in + let firstTask = Task(on: systemUnderTest) { semaphore in await semaphore.wait() try doWork() } - Task(on: systemUnderTest) { semaphore in + let secondTask = Task(on: systemUnderTest) { semaphore in // Signal the semaphore from the actor queue. // If the actor queue were FIFO, this test would hang since this code would never execute: // we'd still be waiting for the prior `wait()` tasks to finish. semaphore.signal() try doWork() } - await Task(on: systemUnderTest) { _ in /* Drain the queue */ }.value + (_, _) = try await (firstTask.value, secondTask.value) } @Test func test_task_allowsReentrancy() async { From 984885777ef193057ea51536d819b67b32f72cc1 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 7 Apr 2025 20:34:20 -0700 Subject: [PATCH 7/8] More tests --- Tests/AsyncQueueTests/ActorQueueTests.swift | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 76081b4..9be05cc 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -310,6 +310,22 @@ struct ActorQueueTests { } } + @Test func test_mainTask_executesOnMainActor() async { + @MainActor + func executesOnMainActor() {} + await Task(on: MainActor.queue) { + executesOnMainActor() + }.value + } + + @Test func test_mainThrowingTask_executesOnMainActor() async throws { + @MainActor + func executesOnMainActor() throws {} + try await Task(on: MainActor.queue) { + try executesOnMainActor() + }.value + } + // MARK: Private private let systemUnderTest: ActorQueue From 43041c154847eee080243c9a015f098a8e5a222d Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 7 Apr 2025 20:38:39 -0700 Subject: [PATCH 8/8] Better comments --- Sources/AsyncQueue/ActorQueue.swift | 2 -- Sources/AsyncQueue/FIFOQueue.swift | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 3feee5b..ef0780d 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -56,8 +56,6 @@ public final class ActorQueue: @unchecked Sendable { // MARK: Initialization /// Instantiates an actor queue. - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. public init() { let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() self.taskStreamContinuation = taskStreamContinuation diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index 71a6b7e..159015b 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -211,7 +211,7 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: - /// - priority: The priority of the task. + /// - priority: The priority of the queue. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. /// - isolatedActor: The actor to which the operation is isolated.