Tasker 0.12.0

Tasker 0.12.0

Ali Akhtarzada维护。



Tasker 0.12.0

  • Ali Akhtarzada

Tasker - 具有async await和url session管理的任务管理器

警告:目前这是一个alpha软件,并开放接受集中的建议

CocoaPods Version Carthage compatible Swift Build Status codecov license

完整API文档

Tasker是一个基于OperationQueue和GCD的任务管理器,具有拦截和反应的概念。 拦截器 允许您在任务执行之前修改任务,同时也允许您控制任务的执行(例如,批量处理它们、保持它们、取消它们等)。 反应器 允许您在将结果传递给调用者之前对任务完成做出反应(例如,运行一个作业、重新队列任务、取消任务等)。

Tasker还提供了对 URLSession 和异步/等待功能的任务管理

功能

  • 并发任务管理系统
  • 任务拦截和增强
  • 任务重新排队和反应功能
  • 专门的URLSession处理以拦截和向URLRequests添加反应器
  • 异步/等待功能
  • 在任务数组上进行异步/等待

快速查看

// Create a task object
class MyTask: Task {
    typealias SuccessValue = (data: Data?, response: URLResponse?)
    func execute(completion: @escaping CompletionCallback) {
        URLSession.shared.dataTask(with: request) { (data, response, error) in
            if let error = error {
                completion(.failure(error))
            } else {
                completion(.success(data, response))
            }
        }
    }
}

// Or create a task out of anything else
let t = task(someLongRunningFunction())

// Run it in one of three ways:

// Await
do {
    let a: MyTask.SuccessValue = try MyTask().await()
    let b = t.await()
} catch {
    // error
}

// Async
MyTask().async { result in
    // got result
}

// TaskManager
TaskManager.shared.add(task: MyTask()) {result in
    // got result
}

任务

任务是指需要执行的工作单元。它是基于协议的,并且有一个主要的函数名为 execute,当任务应该被执行时会被调用。当任务完成时,Task 的实现会调用一个带有 Result<T, Error> 的回调。

例如:

class DecodeImage: Task {
    ...
    func execute(completion: @escaping Result<DecodedData, Error>) {
        fetchImageData { data in
            do {
                let decodedData = data.decode()
                completion(.success(decodedData))
            } catch {
                completion(.failure(error))
            }
        }
    }
    ...
}

请注意,任务是一种 引用 类型。这是因为它们更容易推理,并且可以拦截和响应。如果我们创建了副本并在它们之间传递,你就必须非常小心地处理状态。当任务完成时,会调用 completion 回调,并传递任务的结果。

如果由于任何原因任务被取消,则会调用任务的 didCancel 函数,你可以自由地根据需要处理它。例如:

class DecodeImage: Task {
    ...
    var task: URLSessionTask
    func execute(completion: @escaping Result<Data, Error>) {
        task = URLSession.shared.dataTask(...)
    }
    func didCancel(with error: Error) {
        // Cancel your URLSessionTask
        task.cancel();
    }
    ...
}

创建任务

你可以使用多种方式创建任务。其中一种是通过实现 Task 协议,如上面所见。还有其他几种方式:

  1. AnyTask:可以使用这种类型从代码块中创建任务。
    AnyTask { 5 } // Task that returns 5, no failure value
    AnyTask { Result<Int, Error>.success(5) } // Task that returns Result<T, Error>
    AnyTask { cb in cb(.success(5)) } // Task that calls the done callback
  2. task(closingOver:):这将创建一个涵盖其值的任务。
    let t = task(closingOver: someLongOperation())
  3. task(executing:):从具有兼容 done 回调的函数创建任务。
    let f1: (() -> T) -> Void = //...
    let t1 = task(executing: f1)
    let f2: (() -> Result<T, Error>) -> Void = //...
    let t2 = task(executing: f2)

启动任务

要启动任务,你可以:

  1. 创建一个 TaskManager 实例。
  2. 使用提供的默认实例。
  3. 在它上调用 async(...)await(...)
// Create a task
let manager = TaskManager()
manager.add(task: DecodeImage())

// Use the default
TaskManager.shared.add(task: DecodeImage())

任务将立即启动。你也可以选择稍后启动,或在一个特定的时间间隔后启动。

// Start a task later:
let handle = TaskManager.shared.add(task: DecodeImage(), startImmediately: false)
// ...
handle.start()

// Start a task after some interval
TaskManager.shared.add(task: DecodeImage(), after: .seconds(30))

每当添加一个任务时,你都会得到一个可以启动或取消任务的句柄。你还可以查询任务的状态,每个任务都会被赋予一个递增的标识符。

拦截任务

每个任务在由 TaskManager 拥有的任务执行之前都会被拦截。你可以通过实现一个 Interceptor 来控制任务被拦截的方式,该拦截器只有一个方法。

func intercept<T: Task>(task: inout T, currentBatchCount: Int) -> InterceptCommand

拦截函数会带有一个即将执行的任务的引用。你可以修改任务的引用和返回的命令将决定任务的命运。其中一个参数是 currentBatchCount;这是因为在你可以对任务进行 hold 的操作,因此这告诉你有多少是被持有的。对于将事件批处理到分析系统等情况非常有用。有关 InterceptCommand 的详细信息,请参阅文档。

您在创建TaskManager对象时,只需传递一个拦截器数组即可启用拦截器。

对任务的响应

当任务执行完成后,就需要调用reactor。reactor允许您在需要时重新处理任务,或者取消您认为不值得完成的任务。通过在创建时将reactor数组传递给TaskManager对象,您可以启用reactor(就像拦截器一样)。

首先询问reactor是否应该执行。它会收到刚刚完成的任务、该任务的执行结果,以及拥有它的任务句柄。如果Reactor.shouldExecute(...)返回true,则调用Reactor.execute(...)函数。

每个reactor都可以有自己的ReactorConfiguration,它可以控制反应期间的TaskManager的行为。例如,如果您在执行reactor时不想执行更多任务,或者您希望在reactor完成后重新执行引起这种反应的任务。

取消任务

每当您将任务添加到任务管理器中,都会返回一个对该任务唯一的句柄。您可以使用这个句柄来取消任务。

let handle = TaskManager.shared.add(task: DecodeImage(), startImmediately: false)
// ...
handle.cancel()

随后将调用Task.didCancel(...),并传递TaskError.cancelled。该方法也可能是由于其他错误而调用的结果。

反应后重新拦截任务

默认情况下,拦截器对任务只执行一次。在您想要重新运行拦截器的情况下,原因可能是反应导致任务的重新入队。在这种情况下,任务是否被重新拦截取决于reactor配置。您可以设置ReactorConfiguration.reinterceptOnRequeue来控制这种行为。

Async/Await

Tasker自带Async await功能。您可以通过在任务上调用扩展函数asyncawait来直接同步或异步地执行您创建的Task

do {
    let data = try DecodeImage().await()
} catch {
    //...
}

或者

DecodeImage().async { result in
    print(result)
}

As free functions over closures.

async can be called on any expression as an @autoclosure

async(loadVideoFileSync()) { result in
   switch result {
   case let .success(videoFile):
       break
   case let .failure(error):
       break
}

And await can be called on any closure that has the following signatures

  • @escaping (@escaping (T) -> Void) -> Void
  • @escaping (@escaping () -> Void) -> Void
  • @escaping (@escaping (Result<T, Error>) -> Void) -> Void

Each of the closures has a single parameter. One that returns a T which is the result of the await operation. One that retuns nothing, i.e. await returns Void and one that returns a Result<T, Error> object. If the result is a failure, then await throws.

例如:

let number = try? await { (done: @escaping (Int) -> Void) -> Void in
    DispatchQueue.global(qos: .unspecified).async {
        // Do some long running calculation
        done(5) // return 5
    }
}

 XCTAssertEqual(number, 5)

On an array of tasks

There are extensions available on the Array type that you can use to call async or await on an array of tasks of the same type.

E.g

var tasks: [Task] = []
for i in 0..<10 {
    tasks.append(AnyTask { i })
}
let results = try! tasks.await()

You can choose to execute the tasks in order or not.

URLTaskManager

The idea behind the URLTaskManager is that it creates a URLSession object that is tied to a TaskManager so that you can call interceptors and reactors on URLRequests. This is very useful, for example, if you need to add headers to all your requests that are going out. You simply add an interceptor that inserts an authorization header. If, for example, you need to refresh any tokens after a URLRequest fails, a reactor can be very helpful - requeue the task, fetch a new authentication token, and done.

To create a URLTaskManager you can optionally pass in a URLSessionConfiguration (just as you would when creating a URLSession object), and an array of URLTaskInterceptor s and URLTaskReactor s

let urlTaskManager = URLTaskManager(interceptors: [MyInterceptor()], reactors: [Myreactors()])

And then you use the internal URLSession object to make requests

urlTaskManager.session.dataTask(with: URL(...)) { data, response, error in
    // the URLTask that's associated with this will be intercepted and reacted to
    // by MyInterceptor and MyReactor
}

Intercepting and reacting

URLTaskManager 与类型 URLTask 一起工作,其中封装了一个 URLRequest 对象。 URLTask 是你可以用 URLTaskInterceptor 来拦截或用 URLTaskReactor 来响应的 Task 对象

在这里我们创建了一个任务拦截器,其中包含一个假设的用户对象,并在请求即将发出时为用户对象添加一个授权头

class Interceptor: URLTaskInterceptor {
    // ...
    func intercept(task: inout URLTask, currentBatchCount _: Int) -> InterceptCommand {
        task
            .request
            .addValue(self.user.authorization, forHTTPHeaderField: "Authorization")
        return .execute
    }
}

这里有一个反应器,它也包含一个假设的用户对象,并在结果是一个 401 失败时执行 OAuth2 令牌刷新操作

class Reactor: URLTaskReactor {
    func execute(done: @escaping (Error?) -> Void) {
        // For example one could refresh the authorization tokens here
        user.refreshAuthorizationToken { result in
            switch result {
            case .success:
                done(nil)
            case let .failure(value):
                done(value)
            }
        }

        func shouldExecute(after result: URLT.Result, from task: URLTask, with _: Handle) -> Bool {
            // One can return true if there's a 401 UNAUTHORIZED http response code.
            if case let .success(value) = result {
                return value.(response as? HTTPURLResponse)?.statusCode == 401
            }
            return false
        }
    }

调试和日志记录

日志记录设施主要为了调试目的由库提供。要启用它们,你需要向共享日志记录器添加一个传输

Logger.shared.addTransport { print($0) }

上面的例子将所有消息记录到 stdout。如果你想调试某些部分,还可以应用一些过滤器。过滤器围绕标签进行,共享日志记录器使用了一些预定义的标签。请参阅文档中的 LogTags