TaskQueue
TaskQueue
基本上是一个 FIFO 队列,其中可以将 任务 排队以便执行。任务将并发执行,直至达到允许的最大数量。
任务 简单来说是一个没有抛出异常的、带有一个参数的非阻塞异步函数,该参数是一个完成处理程序,当任务完成时被调用。
特性
- 采用异步 "非阻塞" 任务的执行。
- 可以在任务执行过程中设置并发执行任务的最大数量。
- 采用 "屏障" 任务作为同步点,允许我们 "连接" 所有之前排队的任务。
- 任务队列可以被暂停和继续。
- 任务队列可以有 目标 任务队列,其中即将准备执行的任务将被排队,然后目标将负责执行任务(这又可能由另一个目标任务队列实际执行)。
- 任务和 TaskQueue 可以作为
NSOperation
和NSOperationQueue
的替代品。
利用屏障、暂停和继续功能、目标关系以及并发级别的控制,我们可以设计出复杂的系统,其中异步任务的执行可以通过外部条件、相互依赖和系统资源限制来控制。
描述
使用TaskQueue,我们可以控制在任务队列中运行的最多并发任务数。为了实现这一点,我们将任务放入任务队列。如果实际运行的任务数量少于最大值,入队的任务将立即执行。否则,它将被延迟,直到足够之前入队的任务完成。
在任何时候,我们都可以继续入队任务,同时保证最大运行任务数。此外,我们可以在任何时候更改最大并发任务数,任务队列将适应直到满足限制。
安装
注意
Swift 4.0, 3.2 和 3.1 需要略有不同的语法
对于 Swift 4 使用版本 >= 0.9.0。
为了与 Swift 3.2 兼容使用版本 0.8.0,与 Swift 3.1 兼容使用版本 0.7.0。
Carthage
添加
github "couchdeveloper/TaskQueue"
到您的 Cartfile。这对于 Swift 4 是合适的,否则按照上述说明指定版本约束。
在您的源文件中,按照以下方式导入库
import TaskQueue
CocoaPods
在您的Podfile中添加以下行
pod 'cdTaskQueue'
这对于 Swift 4 是合适的,否则按照上述说明指定版本约束。
在您的源文件中,按照以下方式导入库
import TaskQueue
注意,模块名称(TaskQueue
)与 Podspec 名称(cdTaskQueue
)不同。
SwiftPM
要使用 SwiftPM,请将以下内容添加到您的 Package.swift
.Package(url: "https://github.com/couchdeveloper/TaskQueue.git")
用法
假设有一个或多個異步 任務,我們希望以某种可控的方式執行它們。特別是,我們希望保證不超過一個設定的限制來同時執行這些任務。例如,許多時候,我們只想確保一次只有一個任務運行。
此外,我們在當某一組特定任務全部完成時希望獲得通知,然後進行下一步操作,例如,根據結果入隊進一步任務。
那麼,任務究竟是什么意思呢?
任務 是一個 Swift 函數或閉包,它異步執行並返回 Void
,並有一個 單一 的參數,完成針對者。完成針對者有一個單一參數,當任務完成時,將將計算出的 Result
(由底部操作計算)傳遞到該參數。
我們可以使用任何類型的 "Result",例如元組 (Value?, Error?)
或更方便的類型如 Result<T>
或 Try<T>
。
脫脈任務函數
func task(completion: @escaping (R)->()) {
...
}
其中 R
示例:(T?, Error?)
或 Result<T>
或 (Data?, Response?, Error?)
等。
注意,類型 R
可代表 Swift 元組,例如 (T?, Error?)
,並請注意 Swift 4 中存在語法變化。
注意事項
在 Swift 4 中,請考慮以下關於元組參數的變化
如果函數類型只有一個參數且該參數的類型為元組類型,則在寫函數的類型時必須將元湯類型括號化。例如,((Int, Int)) -> Void
是一種函數,該函數只接受一個元組類型(Int, Int)
的參數,並不返回任荷值。相反,沒有括號的(Int, Int) -> Void
是一種函數,該函數接受兩個 Int 參數,並不返回任荷值。同樣,因為Void
是一個類型別名,代表()
,所以函數類型(Void) -> Void
等於(()) -> ()
—— 一個接受一個空元組作為參數的函數。這些類型與() -> ()
不一樣——一個不接受任何參數的函數。這些類型也不相同於() -> ()
——一個不接 受任何參數的函數。
因此,这意味着,如果任务的完成处理程序的结果类型是Swift元组,例如(String?, Error?)
,则该任务必须具有以下签名:
func myTask(completion: @escaping ((String?, Error?))->()) {
...
}
现在,创建一个任务队列,我们可以向其中入队许多此类任务。我们可以在初始化器中控制最大并发执行的任务数量。
let taskQueue = TaskQueue(maxConcurrentTasks: 1)
// Create 8 tasks and let them run:
(0...8).forEach { _ in
taskQueue.enqueue(task: myTask) { (String?, Error?) in
...
}
}
请注意,任务的开始将被延迟,直到当前运行的任务数量低于允许的最大并发任务数量。
在上面的代码中,由于设置的最大并发任务数为1
,异步任务实际上是按顺序执行的。
使用屏障
一个屏障函数允许我们在TaskQueue
中创建一个同步点。当TaskQueue
遇到屏障函数时,它将延迟执行屏障函数和任何进一步的任务,直到所有在屏障之前入队的任务都已完成。在这时,屏障函数将独家执行。完成之后,TaskQueue
将恢复其正常的执行行为。
let taskQueue = TaskQueue(maxConcurrentTasks: 4)
// Create 8 tasks and let them run (max 4 will run concurrently):
(0...8).forEach { _ in
taskQueue.enqueue(task: myTask) { (String?, Error?) in
...
}
}
taskQueue.enqueueBarrier {
// This will execute exclusively on the task queue after all previously
// enqueued tasks have been completed.
print("All tasks finished")
}
// enqueue further tasks as you like
指定任务启动的.dispatch队列
尽管任务应该始终设计得与在哪个线程上调用它无关,但实际情况通常并非如此。幸运的是,我们可以在enqueue
函数中指定一个派发队列,在那里,如果存在这种限制,任务队列会启动任务。
如果没有指定队列,任务将在全局队列(DispatchQueue.global()
)上启动。
taskQueue.enqueue(task: myTask, queue: DispatchQueue.main) { Result<String> in
...
}
请注意,这只会影响任务的启动位置。任务完成时的完成处理程序将在任务完成时选择的任何线程或派发队列上执行。在TaskQueue
中没有方法可以指定完成处理程序的执行上下文。
从任何其他异步函数构建合适的任务函数
enqueue
函数的函数签名要求我们传递一个单参数completion
的任务函数并返回Void
。单个参数是完成处理程序,它是一个函数,接受单个参数或元组result
并返回Void
。
那么,如果我们异步函数没有这个签名怎么办,例如它有额外的参数,甚至返回一个结果呢?
看看这个来自URLSession
的异步函数:
dataTask(with url: URL,
completionHandler: @escaping (Data?, URLResponse?, Error?) -> Swift.Void)
-> URLSessionDataTask
在这里,除了完成处理程序,我们还有一个额外的参数url
,用于配置任务。它还有一个返回值,即创建的URLSessionTask
对象。
为了使用这个功能与 TaskQueue
结合,我们需要确保在队列中添加任务时,任务已经配置好,并具有正确的签名。通过将给定的函数应用 柯里化,我们可以实现这两个要求。
基本步骤如下
给定任何带有一个或多个附加参数且可能具有返回值的异步函数
func asyncFoo(param: T, completion: @escaping (Result)->()) -> U {
...
}
我们将其转换为如下
func task(param: T) -> (_ completion: @escaping (Result) -> ()) -> () {
return { completion in
let u = asyncFoo(param: param) { result in
completion(result)
}
// handle return value from asyncFoo, if any.
}
}
也就是说,我们把上面的 asyncFoo
函数转换成另一个函数,其参数仅包含配置参数,并返回一个 函数,该函数只有一个剩余参数,即完成处理器,例如:
((Result) -> ()) -> ()
.
此返回函数的签名必须适用于 TaskQueue
需要的任务函数。"Result" 可以是单个参数,例如 Result<T>
,也可以是任何元组,例如 (T?, Error?)
或 (T?, U?, Error?)
等。
注意,原始函数(此处为 asyncFoo
)的任何返回值,如果有的话,将被任务队列忽略。尽管如此,这些值应由任务函数的实现来处理。
您可能需要查看几遍这个片段来熟悉它 ;)
然后按照以下方式使用它
taskQueue.enqueue(task: task(param: "Param")) { result in
// handle result
...
}
这确保了任务将在排队时使用给定的参数进行“配置”。不过,执行将会延迟,直到任务队列准备好执行。
示例
在这里,我们用一个执行“GET”请求的 URLSessionTask
包装一个 任务 函数
func get(_ url: URL) -> (_ completion: @escaping ((Data?, URLResponse?, Error?)) -> ()) -> () {
return { completion in
URLSession.shared.dataTask(with: url) { data, response, error in
completion((data, response, error))
}.resume()
}
}
然后按照以下方式使用它
let taskQueue = TaskQueue(maxConcurrentTasks: 4)
taskQueue.enqueue(task: get(url)) { (data, response, error) in
// handle (data, response, error)
...
}
有一个 URL 列表,一次性将它们全部排队并按照任务队列中设置的约束执行
let urls = [ ... ]
let taskQueue = TaskQueue(maxConcurrentTasks: 1) // serialise the tasks
urls.forEach {
taskQueue.enqueue(task: get($0)) { (data, response, error) in
// handle (data, response, error)
...
}
}