FutureLib 1.0.1

FutureLib 1.0.1

测试已被测试
Lang语言 SwiftSwift
许可证 Apache 2
发布上次发布2016 年 4 月
SPM支持 SPM

Andreas Grosam 维护。



FutureLib 1.0.1

  • Andreas Grosam

FutureLib

FutureLib 是一个纯 Swift 2 库,实现 Futures & Promises,灵感来自 ScalaPromises/A+,类似于 Microsoft 的 Task Parallel Library (TPL) 中的 Cancellation in Managed Threads 的取消概念,使用 CancellationRequestCancellationToken

FutureLib 有助于您编写简洁且易于理解的代码来实现正确的异步程序,包括错误处理和取消。

特性

  • 采用异步“非阻塞”风格。
  • 支持任务组合。
  • 通过“取消令牌”支持强大的取消概念。
  • 极大地简化了异步代码中的错误处理。
  • 可以指定在特定的“执行上下文”上运行的延续。


内容

入门

以下几节展示了如何在简短的示例中使用 futures 和 promises。

什么是 Future?

Future表示异步函数的最终结果。比如说,计算出的值是类型T,那么异步函数会立即返回一个类型为Future<T>的值。

func doSomethingAsync() -> Future<Int>

当函数返回时,返回的Future还未完成 —— 但是在这里执行一个后台任务来计算值并最终完成Future。我们可以说,返回的Future是异步函数结果的占位符。

后台任务可能会失败。在这种情况下,Future将以一个错误完成。请注意,然而,异步函数本身不会抛出错误。

Future是一个占位符,用于表示未完成的计算结果。最终,它将使用错误完成。

为了表示这种结果,Future在内部使用枚举类型Try<T>Try是一种变体,或区分联合,其中包含错误。请注意,还有其他Swift库使用类似的类型,通常命名为Try。这个名字Try是从Scala借用的。

在FutureLib中,Try<T>可以包含类型为T的值或符合Swift协议ErrorType的值。

通常,我们从异步函数如上面提到的doSomethingAsync()中获取Future。为了检索结果,我们注册一个后续,当Future完成时它将被调用。然而,作为客户端,我们不能自己完成Future —— 它是某种只读的形式。

Future是只读的。我们无法直接完成它,我们只能检索其结果——它完成之后。

那么,后台任务是如何完成Future的?嗯,这将通过一个Promise来完成。

什么是Promise?

使用Promise,我们可以完成一个Future。通常,Promise将由后台任务创建并在最终时解决。Promise有一个且只有一个相关的Future。Promise可以用计算出的值或错误来解决。用Try来解决Promise会立即以同样的值完成其相关的Future。

Promise将被后台任务创建并解决。解决Promise会立即相应地完成其Future。

下面的示例展示了如何使用Promise以及如何将关联的Future返回给调用者。在这个示例中,一个包含完成处理器的函数会被包裹在一个返回Future的函数中。

public func doSomethingAsync -> Future<Int> {
    // First, create a promise:
    let promise = Promise<Int>()

    // Start the asynchronous work:
    doSomethingAsyncWithCompletion { (data, error) -> Void in
        if let e = error {
            promise.reject(e)
        }
        else {
            promise.fulfill(data!)
        }
    }

    // Return the pending future:
    return promise.future!
}

检索Future的值

一旦我们有了Future,我们如何从Future中获取值——或者说是错误?而且我们何时应该尝试获取它?

很清楚,我们只能在Future被用计算出的值或错误完成之后获取值。

有阻塞和非阻塞变体来获取Future的结果。阻塞变体很少使用。

阻塞访问

func get() throws -> T

value方法会在Future完成之前阻塞当前线程。如果Future已完成并成功,它将返回其结果的成功值,否则它将抛出错误的值。然而,由于该方法会阻塞当前线程,所以不推荐使用。它可能仅在单元测试或其他测试代码中是有用的。

非阻塞访问

var result: Try<ValueType>?

如果未来状态完成则返回其结果,否则返回nil。当已知未来已被完成时,该属性有时很有用。

获取结果的非阻塞方法中最灵活和实用的方式是使用延续

使用延续的非阻塞访问

为了非阻塞地从未来中检索结果,我们可以使用一个延续。延续是一个闭包,它将与为该未来定义的某些方法一起注册。当未来完成时,将调用延续。

延续有几种变体,包括与不同签名的组合器一起注册的那些。大多数延续都有一个参数result作为Try<T>value作为Terror作为ErrorType,这将从未来的结果中相应地设置并作为参数传递。

注册延续的基本方法

注册延续的最基本方法是onComplete

onComplete

func onComplete<U>(f: Try<T> -> U)

onComplete方法注册一个延续,该延续在未来完成时将被调用。它接受未来作为其参数的result,作为Try<T>

future.onComplete { result in
    // result is of type Try<T>
}

其中result是类型为Try<T>的,其中T是函数doSomethingAsync计算结果的类型。 result可能包含一个类型为T的值或一个符合协议ErrorType的错误。

几乎所有注册延续的方法都是以onComplete为基础实现的。

有几个方法可以获取结果的实际值

    let result:Try<Int> = ...
    switch result {
    case .Success(let value):
        print("Value: \(value)")
    case .Failure(let error):
        print("Error: \(error)")
    }

以下基本方法是onSuccessonFailure,当未来成功完成或出现错误时分别被调用。

onSuccess

func onSuccess<U>(f: T -> throws U)

使用onSuccess方法,我们注册一个延续,该延续在未来成功完成时将被调用。

future.onSuccess { value in
    // value is of type T
}

onFailure

func onFailure<U>(f: T -> U)

使用onFailure,我们注册一个延续,该延续在未来完成出现错误时将被调用。

future.onFailure { error in
    // error conforms to protocol `ErrorType`
}

组合器

延续也将与组合器一起注册。组合器是一个返回新未来的方法。组合器相当多,最有名的是mapflatMap。然而还有许多其他组合器是建立在基本组合器之上的。

使用组合器,我们可以组合两个或更多未来的结果,构建更复杂的异步模式和程序。

map

func map<U>(f: T throws -> U) -> Future<U>

map方法返回一个新未来,该未来将与函数fself的成功值应用的结果一起完成。如果self已完成错误状态,或者函数f引发错误,则返回的未来将使用相同的错误完成。当self失败时,将不会调用延续。

由于组合器如map的返回类型仍然是未来,我们可以以各种方式结合它们。例如:

fetchUserAsync(url).map { user in
    print("User: \(user)")
    return user.name()
}.map { name in
    print("Name: \(name)")
}
.onError { error in
    print("Error: \(error)")
}

注意,映射函数将与调用者异步调用!实际上整个表达式都是异步的!在这里,上述表达式的类型是 ,因为 返回

flatMap

func flatMap<U>(f: T throws -> Future<U>) -> Future<U>

flatMap 方法返回一个新的 future,它在应用函数 fself 的成功值后最终完成。如果 self 以错误完成,则返回的 future 也将以相同的错误完成。当 self 失败时不会调用后续操作。

例如

fetchUserAsync(url).flatMap { user in
    return fetchImageAsync(user.imageUrl)
}.map { image in
    dispatch_async(dispatch_get_main_queue()) {
        self.image = image
    }
}
.onError { error in
    print("Error: \(error)")
}

注意:有更简单的方法来指定续订应该执行的执行环境(这里的主调度队列)。

recover

func recover(f: ErrorType throws -> T) -> Future<T>`

recover 返回一个新的 future,当 self 失败时,该 future将以 self 的成功值或映射函数 f 的返回值完成。

recoverWith

func recoverWith(f: ErrorType throws -> Future<T>) -> Future<T>

recoverWith 返回一个新的 future,当 self 失败时,它将以 self 的成功值或映射函数 f 的延迟结果完成。

通常,当后续操作即使在之前的任务返回错误的情况下也需要执行时,需要 recoverrecoverWith。我们通过返回一个合适的值(可能指示此错误或使用默认值)从错误中“恢复”。

let future = computeString().recover { error in
    NSLog("Error: \(error)")
    return ""
}

filter

func filter(predicate: T throws -> Bool) -> Future<T>

filter 方法返回一个新的 future,如果将函数 predicate 应用于值并返回 true,则它将以 self 的成功值完成。否则,返回的 future 将以错误 FutureError.NoSuchElement 完成。如果 self 以错误完成或如果谓词引发错误,则返回的 future 将以相同的错误完成。

computeString().filter { str in

}

transform

func transform<U>(s: T throws -> U, f: ErrorType -> ErrorType)-> Future<U>

transform 返回一个新的 Future,在解释 self 的成功结果时,它将以函数 s 应用到错误值的 结果完成。如果 s 抛出错误,则返回的 future 将以相同的错误完成。

func transform<U>(f: Try<T> throws -> Try<U>) -> Future<U>

通过对 self 的结果应用特定的函数返回一个新的 Future。如果 'f' 抛出错误,则返回的 future 将以相同的错误完成。

func transformWith<U>(f: Try<T> throws -> Future<U>) -> Future<U>`

通过对指定的函数(该函数产生一个 Future)对 this Future 的结果应用特定的函数返回一个新的 Future。如果 'f’ 抛出错误,则返回的 future 将以相同的错误完成。

zip

func zip(other: Future<U>) -> Future<(T, U)>

zip 返回一个新的 future,它以 selfother 的成功值组成的元组完成。如果 selfother 以错误失败,则返回的 future 将以相同的错误完成。

Future 序列和序列扩展

firstCompleted

func firstCompleted() -> Future<T>

firstCompleted 返回一个新的 Future,它将以 self 中第一个完成的 future 的结果完成。

traverse

func traverse<U>(task: T throws -> Future<U>) -> Future<[U]>

对于任意的 T 序列,异步方法 traverse将对序列中的每个值应用函数 task(因此得到一个任务序列),然后一旦所有任务成功完成后,用一个U 的数组完成返回的future。

let ids = [14, 34, 28]
ids.traverse { id in
    return fetchUser(id)
}.onSuccess { users in
    // user is of type [User]
}

任务将并发执行,除非指定了一个执行上下文,它定义了一些并发约束(例如,将并发任务的数量限制为固定数量)。

序列

func sequence() -> Future<[T]>

对于 Future<T> 的未来序列,方法 sequence 返回一个新的future Future<[T]>,它完成时以一个T 的数组,数组中的每个元素是 self 中相应未来的成功值,顺序相同。

[
    fetchUser(14),
    fetchUser(34),
    fetchUser(28)
].sequence { users in
    // user is of type [User]
}

结果

func results() -> Future<Try<T>>

对于 Future<T> 的未来序列,方法 result 返回一个新的future,它以一个 Try<T> 的数组完成,数组中的每个元素对应于 self 中相应未来的结果,顺序相同。

[
    fetchUser(14),
    fetchUser(34),
    fetchUser(28)
].results { results in
    // results is of type [Try<User>]
}

折叠

func fold<U>(initial: U, combine T throws -> U) -> Future<U>

对于 Future<T> 的未来序列,它会返回一个新的 Future<U>,这个future将完成由函数 combine 重复应用于 self 中每个未来的成功值和由 initial 初始化的累积值的组合结果。

也就是说,它将 SequenceOf<Future<T>> 转换为 Future<U>,其结果是在每个future的成功值的组合值。

combine 方法将在所有的future成功完成时异步按照顺序调用 self 中的future。请注意,未来底层的任务将并发执行,并且可能以任何顺序完成。

组合未来的示例

给几个返回未来的异步函数

func task1() -> Future<Int> {...}
func task2(value: Int = 0) -> Future<Int> {...}
func task3(value: Int = 0) -> Future<Int> {...}
组合未来的示例 1a

假设我们想要以这种方式链它们,即后续任务从上一个任务获取输入作为结果。最后,我们想打印最后一个任务的结果

task1().flatMap { arg1 in
    return task2(arg1).flatMap { arg2 in
        return task3(arg2).map { arg3 in
            print("Result: \(arg3)")
        }
    }
}
组合未来的示例 1b

当第一个任务成功完成后,执行下一个任务 - 以此类推

task1()
.flatMap { arg1 in
    return task2(arg1)
}
.flatMap { arg2 in
    return task3(arg2)
}
.map { arg3 in
    print("Result: \(arg3)")
}

任务相互独立,但它们要求它们以顺序调用。

组合未来的示例 1c

这是示例 1b,形式更简洁

task1()
.flatMap(f: task2)
.flatMap(f: task3)
.map {
    print("Result: \($0)")
}
组合未来的示例 2

现在,假设我们想要并发计算任务1、任务2和任务3的值,然后将这三个计算出的值作为参数传递给任务4

func task4(arg1: Int, arg2: Int, arg3: Int) -> Future<Int> {...}

let f1 = task1()
let f2 = task2()
let f3 = task3()

f1.flatMap { arg1 in
    return f2.flatMap { arg2 in
        return f3.flatMap { arg3 in
            return task4(arg1, arg2:arg2, arg3:arg3)
            .map { value in
                print("Result: \(value)")
            }
        }
    }
}

不幸的是,我们无法像第一例那样简化代码。我们可以在应用某些操作符时改进它,这些操作符就像语法糖一样,使代码更容易理解。其他语言有如 do-notationFor-Comprehensions 这样的特殊结构,以便使这种结构更容易理解。

指定回调将执行的执行上下文

上述注册的延续将并发执行,我们不应假设回调最终将在哪种执行环境中执行。然而,有一种方法可以通过附加额外的参数来显式指定执行环境,这个参数应用于所有注册延续的方法的 执行上下文

例如,定义一个基于 GCD 的执行上下文,该上下文使用一个底层的串行调度队列,将闭包异步提交到指定的队列,并指定服务质量类

let queue = dispatch_queue_create("sync queue",
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL,
            QOS_CLASS_USER_INITIATED, 0))

let ec = GCDAsyncExecutionContext(queue)

然后,将这个执行上下文传递给参数 ec

future.onSuccess(ec: ec) { value in
    // we are executing on the queue "sync queue"
    let data = value.0
    let response = value.1
    ...
}

当未来完成时,现在它将异步地将给定的闭包提交到调度队列。

如果我们现在使用这个执行上下文注册多个延续,当未来完成后,所有延续都将额似同时提交,但由于队列是串行的,因此它们将按照提交的顺序顺序执行。

请注意,延续将在 执行上下文总是 执行。如果没有显式指定执行上下文,将隐式给出一个 私有 上下文,这意味着我们不应假设回调在何地以及何时执行。

可以根据不同的风味和许多具体的底层执行环境创建执行上下文。更多内容请参阅“执行上下文”章节。

取消延续

一旦注册了延续,它可以通过 取消令牌 机制“取消注册”

首先创建一个 取消请求,当需要时可以向其发送“取消”信号

let cr = CancellationRequest()

然后从取消请求中获取取消令牌,未来可以监控这个令牌以检查是否请求了取消

let ct = cr.token

这个取消令牌将成为向任何注册延续的函数传递的额外参数。我们可以为多个延续或任何需要取消令牌的地方共享同一个令牌

future.onSuccess(ct: ct) { value in
    ...
}
future.onFailure(ct: ct) { error in
    ...
}

内部,未来将为每个注册了取消令牌的延续注册一个“取消处理器”。在请求取消时将调用取消处理器。取消处理器简单地“取消注册”之前已注册的延续。如果发生这种情况,并且延续接收《Try<T>ErrorType 作为参数,延续也将以相应的错误调用,即带有一个设置为 CancellationError.Cancelled 的错误的错误。

我们稍后可以请求取消

cr.cancel()

在请求取消并且未来尚未完成时,接收成功值的延续,例如使用onSuccess注册的闭包,将被取消注册并随后释放。

另一方面,接收 Try 或错误值的延续,例如使用onCompleteonFailure 注册的延续,将首先取消注册,然后使用相应的参数调用,即设置错误为 CancellationError.Cancelled。尽管如此,如果未来尚未完成,由于取消请求,它可能不会完成。也就是说,当完成处理器执行时,相应的未来可能尚未完成。

future.onFailure(ct: ct) { error in
    if CancellationError.Cancelled.isEqual(error) {
        // the continuation has been cancelled
    }
}

CancellationRequestCancellationToken 构建了一种强大而灵活的方法来实现取消机制,该方法可以在其他领域和其他库中使用。

将带有完成处理器的异步函数包装成返回对应未来的函数

传统上,系统库和第三方库通过完成处理器传递异步函数的结果。使用未来作为传递结果的方法只是另一种选择。然而,为了释放这些带有完成处理器的函数的未来的潜能,我们需要将这些函数转换为返回未来的函数。这是完全可能的 - 同时也非常简单。

在这里,出现了 Promise 的身影!

例如,使用一个扩展 NSURLSession,它使用 NSURLSessionDataTask 执行一个非常基本的 GET 请求,可以取消的取消令牌。不侧重于“工业级”的实现,目的是演示如何使用承诺 — 以及取消令牌。

从一个返回 Future<T> 的异步函数中获取未来

func get(
    url: NSURL,
    cancellationToken: CancellationTokenType = CancellationTokenNone())
    -> Future<(NSData, NSHTTPURLResponse)>
{
    // First, create a Promise with the appropriate type parameter:
    let promise = Promise<(NSData, NSHTTPURLResponse)>()

    // Define the session and its completion handler. If the request
    // failed, we reject the promise with the given error - otherwise
    // we fulfill it with a tuple of NSData and the response:
    let dataTask = self.dataTaskWithURL(url) {
    (data, response, error) -> Void in
        if let e = error {
            promise.reject(e)
        }
        else {
            promise.fulfill(data!, response as! NSHTTPURLResponse)
            // Note: "real" code would check the data for nil and
            // response for the correct type.
        }
    }

    // In case someone requested a cancellation, cancel the task:
    cancellationToken.onCancel {
        dataTask.cancel() // this will subsequently fail the task with
                          // a corresponding error, which will be used
                          // to reject the promise.
    }

    // start the task
    dataTask.resume()

    // Return the associated future from the new promise above. Note that
    // the property `future` returns a weak Future<T>, so we need to
    // explicitly unwrap it before we return it:
    return promise.future!
}

现在我们可以这样使用它

let cr = CancellationRequest()
session.get(url, cr.token)
.map { (data, response) in
    guard 200 == response.statusCode else {
        throw URLSessionError.InvalidStatusCode(code: response.statusCode)
    }
    guard response.MIMEType != nil &&
    !response.MIMEType!.lowercaseString.hasPrefix("application/json") else {
        throw URLSessionError.InvalidMIMEType(mimeType: response.MIMEType!)
    }
    ...
    let json = ...
    return json
}

安装