ReactiveKit 3.19.1

ReactiveKit 3.19.1

测试已测试
语言语言 SwiftSwift
许可证 MIT
发布最新发布2022年3月
SPM支持 SPM

Srdan Rasic 维护。



  • 作者
  • Srdan Rasic

ReactiveKit

Platform Build Status Twitter

ReactiveKit 是一个轻量级的 Swift 框架,用于响应式和函数式响应式编程,让您今朝便可进入响应式世界。

此框架兼容所有 Apple 平台和 Linux。如果您正在开发 iOS 或 macOS 应用程序,请务必检查提供 UIKit 和 AppKit 绑定、响应式代理和数据源的 Bond 框架。

ReactiveKit 目前正处于与 Apple 的 Combine 框架进行 API 对齐的过程中。类型和函数正在重命名,以便与 Combine 的类型相匹配。需要注意的是,ReactiveKit 不会成为 Combine 的即插即用替代品。目标是实现互操作性和平稳过渡。所有工作都以向后兼容的方式进行,并将在多个版本中逐步完成。查看 发布说明 来了解过渡过程。

本文将通过介绍其实现来介绍此框架。到本文结束时,您应该具备了相当好的理解,了解了框架是如何实现的,以及如何最佳地使用它。

要快速入门,请克隆项目并探索工作空间中的可用教程!

摘要

介绍

考虑文本框中的文本如何在用户输入他的名字时改变。每个输入的字母都会给我们一个新的状态。

---[J]---[Ji]---[Jim]--->

我们可以将这些状态变化看作是一系列事件。这相当类似于一个数组或列表,但是与它们不同的是,事件是在时间序列中产生的,而不是一次性地全部存储在内存中。

响应式编程背后的想法是,一切都可以表示为一个序列。让我们考虑另一个例子——网络请求。

---[Response]--->

网络请求的结果是响应。尽管我们只有一个响应,但我们仍可以将其视为一个序列。一个只有一个元素数组的数组仍是一个数组。

数组是有限的,因此它们有一个我们称之为大小的属性,这是衡量数组占用内存大小的指标。当我们在时间序列上谈论时,我们不知道它们在其整个生命周期中将产生多少事件。我们不知道用户将输入多少字母。然而,我们仍然想知道序列何时结束生成事件。

为了获得这些信息,我们可以引入一种特殊类型的事件——完成事件。这是一个标记序列结束的事件。在完成事件之后不应该再跟有事件。

我们将用垂直线来视觉化表示完成事件。

---[J]---[Ji]---[Jim]---|--->

完成事件之所以重要,是因为它告诉我们之前进行的事情现在结束了。我们可以在那个点完成工作,并释放可能在整个序列处理过程中使用的任何资源。

不幸的是,宇宙不是由秩序统治的,而是由混沌统治的。意料之外的事情会发生,我们必须预料到这一点。例如,网络请求可能会失败,那么我们可能接收到一个错误,而不是响应。

---!Error!--->

为了在我们的序列中表示错误,我们还将引入另一种类型的事件。我们将称之为错误事件。错误事件将在遇到未预料到的情况时生成。就像完成事件一样,错误事件也将表示序列的结束。在错误事件之后不应该再跟有事件。

让我们看看在ReactiveKit中事件是如何定义的。

extension Signal {

    /// An event of a sequence.
    public enum Event {

        /// An event that carries next element.
        case next(Element)

        /// An event that represents failure. Carries an error.
        case failed(Error)

        /// An event that marks the completion of a sequence.
        case completed
    }
}

它只是对三种类型事件的枚举。序列通常会具有零个或多个`.next事件,紧跟着是`.completed.failed事件。

那么对于序列呢?在ReactiveKit中它们被称为信号。以下是定义它们的协议。

/// Represents a sequence of events.
public protocol SignalProtocol {

  /// The type of elements generated by the signal.
  associatedtype Element

  /// The type of error that can terminate the signal.
  associatedtype Error: Swift.Error

  /// Register the given observer.
  /// - Parameter observer: A function that will receive events.
  /// - Returns: A disposable that can be used to cancel the observation.
  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable
}

信号表示事件序列。您可以在序列上做的事情最重要的是观察它生成的事件。事件是通过观察者接收的。观察者只是一个接受事件的函数。

/// Represents a type that receives events.
public typealias Observer<Element, Error: Swift.Error> = (Signal<Element, Error>.Event) -> Void

信号

我们已经看到了定义信号的协议,但我们怎么能实现它呢?让我们来实现一个基本的信号类型!

public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

  private let producer: (Observer<Element, Error>) -> Void

  public init(producer: @escaping (Observer<Element, Error>) -> Void) {
    self.producer = producer
  }

  public func observe(with observer: @escaping Observer<Element, Error>) {
    producer(observer)
  }
}

我们定义信号为一个具有一个属性的结构体——生产者。如您所见,生产者仅仅是一个接受观察者为参数的函数。当我们开始观察信号时,我们所做的是基本执行带有给定观察者的生产者。这正是信号如此简单的原因!

ReactiveKit中的信号实现几乎与我们所展示的相同。它还有一些附加功能,这些功能为我们提供了一些保证,稍后我们将讨论这些保证。

让我们创建一个信号实例,首先向观察者发送三个正整数,然后完成。

从视觉上看,它将看起来像这样

---[1]---[2]---[3]---|--->

在代码中,我们会这样做

let counter = Signal<Int, Never> { observer in

  // send first three positive integers
  observer(.next(1))
  observer(.next(2))
  observer(.next(3))

  // send completed event
  observer(.completed)
}

由于观察者只是一个接收事件的函数,我们只要在想要发送新事件时执行它即可。我们总是通过发送 .completed.failed 事件来最终确定序列,以便接收者知道事件生成何时结束。

ReactiveKit将观察者封装为一个具有各种辅助方法的结构体,以使发送事件变得更容易。以下是定义它的协议。

/// Represents a type that receives events.
public protocol ObserverProtocol {
    
    /// Type of elements being received.
    associatedtype Element
    
    /// Type of error that can be received.
    associatedtype Error: Swift.Error

    /// Send the event to the observer.
    func on(_ event: Signal<Element, Error>.Event)
}

我们之前引入的观察者基本上是 on(_:) 方法。ReactiveKit也提供了观察者的这个扩展

public extension ObserverProtocol {

    /// Convenience method to send `.next` event.
    public func receive(_ element: Element) {
        on(.next(element))
    }

    /// Convenience method to send `.failed` or `.completed` event.
    public func receive(completion: Subscribers.Completion<Error>) {
        switch completion {
        case .finished:
            on(.completed)
        case .failure(let error):
            on(.failed(error))
        }
    }

    /// Convenience method to send `.next` event followed by a `.completed` event.
    public func receive(lastElement element: Element) {
        receive(element)
        receive(completion: .finished)
    }
}

所以,我们可以用ReactiveKit像这样实现前面的例子

let counter = Signal<Int, Never> { observer in

  // send first three positive integers
  observer.receive(1)
  observer.receive(2)
  observer.receive(3)

  // send completed event
  observer.receive(completion: .finished)
}

当观察这样的信号时会发生什么?请记住,观察者是一个接收事件的函数,因此我们可以仅仅将闭包传递给我们的observe方法。

counter.observe(with: { event in
  print(event)
})

当然,我们将得到打印出的三个事件。

next(1)
next(2)
next(3)
completed

将异步调用封装到信号中

由于我们实现了Signal类型,我们可以轻松地将异步调用封装到信号中。让我们假设我们有一个异步函数,用于获取用户。

func getUser(completion: (Result<User, ClientError>) -> Void) -> URLSessionTask

该函数通过完成闭包和Result类型与结果通信,该类型的实例将包含一个用户或一个错误。要将此封装到信号中,我们只需在信号初始化器中的生产者闭包中调用该函数,并发送发生的相关事件。

func getUser() -> Signal<User, ClientError> {
  return Signal { observer in
    getUser(completion: { result in
      switch result {
      case .success(let user):
        observer.receive(user)
        observer.receive(completion: .finished)
      case .failure(let error):
        observer.receive(completion: .failure(error))
    })
    // return disposable, continue reading
  }
}

如果我们现在观察这个信号,我们将会收到用户和完成事件

---[User]---|--->

或错误

---!ClientError!--->

在代码中,获取用户将看起来像这样

let user = getUser()

user.observe { event in
  print(event) // prints ".next(user), .completed" in case of successful response
}

在这里我要问您一个重要的问题。获取用户请求何时执行,换句话说,异步函数 getUser(completion:) 何时被调用?请思考一下。

我们在传递给信号初始化器的生产者闭包中调用 getUser(completion:)。然而,当信号创建时,这个闭包并不会执行。这意味着代码 let user = getUser() 并没有触发请求,它仅仅创建了一个知道如何执行请求的信号。

当调用 observe(with:) 方法时,请求才会执行,因为这就是我们的生产者闭包执行的时刻。这也意味着,如果我们多次调用 observe(with:) 方法,我们将多次调用生产者,因此我们会多次执行请求。这是信号的一个非常强大的方面,我们将在稍后讨论 共享事件序列 时回到这一点。现在,只需记住,每次对 observe(with:) 的调用都意味着会导致事件重新生成。

处理信号

我们的示例函数 getUser(completion:) 返回一个 URLSessionTask 对象。我们通常不会考虑它,但 HTTP 请求可以被取消。当屏幕被关闭时,我们应该取消任何正在进行的请求。实现这一点的办法是调用 URLSessionTask 上的 cancel()` 方法,这个方法是用来请求的。我们如何用信号来处理这个情况呢?

如果你仔细阅读了代码示例,你可能已经注意到了我们没有正确地将我们的 Signal 遵守 SignalProtocol。该协议指定了 observe(with:) 方法返回一个称为 Disposable 的东西。一个可处置的对象可以取消信号的观察和任何底层任务。

让我给你介绍一下 ReactiveKit 中的可处置对象定义。

public protocol Disposable {

  /// Cancel the signal observation and any underlying tasks.
  func dispose()

  /// Returns `true` if already disposed.
  var isDisposed: Bool { get }
}

它有一个取消观察的方法和一个属性,可以告诉我们它是否已弃用。取消观察也被称为 弃用信号

有各种各样的 Disposable 实现,但让我们关注在信号创建中最常用的那个。当信号被弃用时,我们通常想要执行某些操作以清理资源或停止底层任务。那么,当信号被弃用时执行一个闭包会是一个更好的方法。让我们实现一个当它被弃用时执行给定闭包的可处置对象。我们将称之为 BlockDisposable

public final class BlockDisposable: Disposable {

  private var handler: (() -> Void)?

  public var isDisposed: Bool {
    return handler == nil
  }

  public init(_ handler: @escaping () -> Void) {
    self.handler = handler
  }

  public func dispose() {
    handler?()
    handler = nil
  }
}

很简单。当调用 dispose() 方法时,它只是执行给定的闭合。我们如何使用这样的可处置对象呢?嗯,我们需要改进我们的信号实现。

谁应该创建可处置对象?由于可处置对象代表了一种取消信号的方式,显然是创建信号的那个人,他也应该提供一个可以取消信号的可处置对象。为此,我们将重构信号生产者以返回一个可处置对象。此外,我们还将从 observe(with:) 方法返回该可处置对象,以便任何观察信号的人都可以取消观察。

public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

  private let producer: (Observer<Element, Error>) -> Disposable

  public init(producer: @escaping (Observer<Element, Error>) -> Disposable) {
    self.producer = producer
  }

  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    return producer(observer)
  }
}

这意味着当我们创建一个信号时,我们也必须提供一个可处置对象。让我们重构我们的异步函数包装信号以提供可处置对象。

func getUser() -> Signal<User, ClientError> {
  return Signal { observer in
    let task = getUser(completion: { result in
      switch result {
      case .success(let user):
        observer.receive(user)
        observer.receive(completion: .finished)
      case .failure(let error):
        observer.receive(completion: .failure(error))
    })

    return BlockDisposable {
      task.cancel()
    }
  }
}

我们只是返回一个在弃用时取消任务的 BlockDisposable 实例。然后,我们可以在观察信号时获取这个可处置对象。

let disposable = getUser().observe { event in
  print(event)
}

当我们的兴趣不再在信号事件上时,我们只需弃用可处置对象。这将取消观察并取消网络任务。

disposable.dispose()

对于 ReactiveKit 中 Signal 的实际实现,有额外的机制防止在信号弃用时发送事件,因此保证在信号弃用后不会收到任何事件。弃用信号后,生产者发送的任何事件都将被忽略。

In ReactiveKit, signals are automatically disposed when they terminate with either a .completed or .failed event.

变换信号

一切都很好,但为什么要这样做呢?它有哪些好处?现在我们来到了最有趣的部分——响应式编程中的信号操作符。

操作符是函数(即方法),可以将一个或多个信号转换成其他信号。对信号进行的基本操作之一是过滤。比如说,我们有一个城市名称的信号,但我们只想获取以字母P开头的名称。

filter(
---[Berlin]---[Paris]---[London]---[Porto]---|--->
)

--------------[Paris]--------------[Porto]---|--->

我们该如何实现这样的操作符呢?非常简单。

extension SignalProtocol {

  /// Emit only elements that pass `isIncluded` test.
  public func filter(_ isIncluded: @escaping (Element) -> Bool) -> Signal<Element, Error> {
    return Signal { observer in
      return self.observe { event in
        switch event {
        case .next(let element):
          if isIncluded(element) {
            observer.receive(element)
          }
        default:
          observer(event)
        }
      }
    }
  }
}

我们在SignalProtocol上编写了一个扩展方法,在其中创建一个信号。在创建的信号的生产者中,我们监控self——即我们要过滤的信号——只有通过测试的.next事件会被传播。我们还在默认情况下传播完成和失败事件。

我们是通过对信号调用该操作符来使用这个操作符。

cities.filter { $0.hasPrefix("P") }.observe { event in
  print(event) // prints .next("Paris"), .next("Porto"), .completed
}

信号的运算符有很多。ReactiveKit其实是一个信号运算符的集合。让我们看看另一个常用的操作符。

当我们观察信号时,我们通常不关心终止事件,我们只关心.next事件中的元素。我们可以编写一个只提供这些元素的运算符。

extension SignalProtocol {

  /// Register an observer that will receive elements from `.next` events of the signal.
  public func observeNext(with observer: @escaping (Element) -> Void) -> Disposable {
    return observe { event in
      if case .next(let element) = event {
        observer(element)
      }
    }
  }
}

这应该很直接——只需传播.next事件中的元素并忽略其他所有内容。现在我们可以这样做

cities.filter { $0.hasPrefix("P") }.observeNext { name in
  print(name) // prints "Paris", "Porto"
}

当您只对那些事件感兴趣时,ReactiveKit还提供了observeFailedobserveCompleted运算符。

在信号上编写运算符就像编写扩展方法一样简单。当您需要框架没有提供的功能时,只需要自己编写!ReactiveKit被编写得易于理解。每当您遇到问题时,只需查看实现细节

更多关于错误信息

我们已经看到信号可以因为错误而终止。在我们的getUser示例中,当网络请求失败时,我们发送.failed事件。因此,我们的Signal类型既是泛型的(泛型于它发送的元素和它可能失败的错误),但在某些情况下,信号是保证不会失败的,即它们永远不会发送错误。我们如何定义这种情况呢?

ReactiveKit提供了以下类型

/// An error type that cannot be instantiated. Used to make signals non-failable.
public enum Never: Error {
}

一个不包含任何符合Swift.Error协议的案例的枚举。因为它没有任何情况,我们永远不能创建它的实例。我们将使用这个技巧来在编译时保证信号不会失败。

例如,如果我们尝试

let signal = Signal<Int, Never> { observer in
  ...
  observer.failed(/* What do I send here? */)
  ...
}

我们将遇到墙,因为我们不能创建Never的实例,所以我们不能发送.failed事件。这是非常强大且重要的特性,因为每当您看到错误被特定为Never类型的信号时,您都可以安全地假设该信号不会失败——因为不能。

绑定只适用于安全(非失败)的信号。

创建简单的信号

您经常需要一个只发射一个元素后即完成的信号。为了创建它,请使用静态方法 just

let signal = Signal<Int, Never>.just(5)

这将为您生成以下信号

---5-|--->

如果您需要一个先发射多个元素然后完成的信号,您可以使用静态方法 sequence 将任何 Sequence 转换为一个信号。

let signal = Signal<Int, Never>.sequence([1, 2, 3])
---1-2-3-|--->

要创建一个不发送任何元素即完成的信号,可执行以下操作

let signal = Signal<Int, Never>.completed()
---|--->

要创建一个直接失败的信号,可执行以下操作

let signal = Signal<Int, MyError>.failed(MyError.someError)
---!someError!--->

您还可以创建一个永远不会发送事件(即永不终止的信号)。

let signal = Signal<Int, Never>.never()
------>

有时您需要一个在经过一定时间后发送特定元素的信号

let signal = Signal<Int, Never>(just: 5, after: 60)
---/60 seconds/---5-|-->

最后,当您需要一个每 interval 秒发送一个整数的信号时,请执行以下操作

let signal = Signal<Int, Never>(sequence: 0..., interval: 5)
---0---1---2---3---...>

处置在包中

在执行多次观察时,处理可处置对象可能会变得繁琐。为了简化操作,ReactiveKit 提供了一个名为 DisposeBag 的类型。它是一个容器,您可以将其可处置对象放入其中。在它被释放时,包将处理放入其中的所有可处置对象。

class Example {

  let bag = DisposeBag()

  init() {
    ...
    someSignal
      .observe { ... }
      .dispose(in: bag)

    anotherSignal
      .observe { ... }
      .dispose(in: bag)
    ...
  }
}

在示例中,我们不是处理可处置对象,而是通过调用可处置对象的 dispose(in:) 方法将它们放入一个包中。当包被释放时,可处置对象将自动被处理。注意,您也可以调用包上的 dispose() 来在任意时间处理其内容。

ReactiveKit 默认在 NSObject 及其子类提供了包。如果您正在做 iOS 或 macOS 开发,由于所有的 UIKit 对象都是 NSObject 的子类,您将免费获得视图控制器和其他 UIKit 对象上的 bag

extension NSObject {
  public var bag: DisposeBag { get }
}

如果您像我一样,不想担心处理问题,请查看 绑定

线程

默认情况下,观察者在事件发送的线程或队列上接收事件。

例如,如果我们有一个像这样创建的信号

let someImage = Signal<UIImage, Never> { observer in
  ...
  DispatchQueue.global(qos: .background).async {
    observer.receive(someImage)
  }
  ...
}

如果我们用它来更新图像视图

someImage
  .observeNext { image in
    imageView.image = image // called on background queue
  }
  .dispose(in: bag)

我们最终会得到奇怪的行为。我们将在后台队列上设置图像,而 UIImageView 实例不是线程安全的,就像 UIKit 的其他部分一样。

我们可以在另一个 async dispatch 到主队列中设置图像,但还有更好的方法。只需使用您希望观察者在该队列上调用的操作符 receive(on:)

someImage
  .receive(on: ExecutionContext.main)
  .observeNext { image in
    imageView.image = image // called on main queue
  }
  .dispose(in: bag)

还有另一个方面。您可能有一个在它被观察的任何线程或队列上执行一些慢速同步工作的信号。

let someData = Signal<Data, Never> { observer in
  ...
  let data = // synchronously load large file
  observer.receive(data)
  ...
}

然而,我们不想让观察这个信号阻塞 UI。

someData
  .observeNext { data in // blocks current thread
    display(data)
  }
  .dispose(in: bag)

我们希望在其他队列上执行加载操作。我们可以在加载上 async dispatch,但如果我们不能更改信号生产器闭包,因为它位于框架中或由于其他原因我们无法更改它,那么操作符 subscribe(on:) 就派上用场了。

someData
  .subscribe(on: ExecutionContext.global(qos: .background))
  .receive(on: ExecutionContext.main)
  .observeNext { data in // does not block current thread
    display(data)
  }
  .dispose(in: bag)

通过使用 subscribe(on:),我们定义信号生产者在哪里执行。我们通常将其与 receive(on:) 结合使用,以定义观察者接收事件的位置。

请注意,这些运算符是与执行上下文一起工作的。执行上下文只是一个线程或队列的简单抽象。您可以在 这里 看到它的实现方式。

绑定

绑定是一种带有额外福利的观察。大多数时候,您应该能够用绑定替换一个观察。考虑以下示例。假设我们有一个用户信号

let presentUserProfile: Signal<User, Never> = ...

并希望在用户发送到信号时显示一个个人资料屏幕。通常我们会这样做

presentUserProfile.receive(on: ExecutionContext.main).observeNext { [weak self] user in
  let profileViewController = ProfileViewController(user: user)
  self?.present(profileViewController, animated: true)
}.dispose(in: bag)

但这很丑!我们必须将所有内容dispatch到主队列,小心翼翼地避免创建 retain 循环,并确保从观察中获得的 disposable 被处理。

幸运的是,有一种更好的方法。我们可以创建一个内联绑定而不是观察。只需要这样做

presentUserProfile.bind(to: self) { me, user in
  let profileViewController = ProfileViewController(user: user)
  me.present(profileViewController, animated: true)
}

然后不用担心线程、retain 循环和处置,因为绑定会自动处理这一切。只需要将信号绑定到负责执行副作用的目标(在我们的例子中,绑到负责呈现个人资料视图控制器的视图控制器)。您提供的闭包将在信号发射元素时被调用,并且目标以及发送的元素作为参数传递。

绑定目标

您可以绑定到同时符合 DeallocatableBindingExecutionContextProvider 协议的目标。

实际上,您还可以绑定到仅符合 Deallocatable 协议的目标,但此时必须通过调用 bind(to:context:setter) 将执行上下境传递到更新目标的上下文。

符合 Deallocatable 协议的对象提供一个可以告诉我们对象何时被释放的信号。

public protocol Deallocatable: class {

  /// A signal that fires `completed` event when the receiver is deallocated.
  var deallocated: Signal<Void, Never> { get }
}

ReactiveKit 默认为 NSObject 和其子类提供对 Deallocatable 协议的遵循。

如何符合 Deallocatable?最简单的方法是符合 DisposeBagProvider

/// A type that provides a dispose bag.
/// `DisposeBagProvider` conforms to `Deallocatable` out of the box.
public protocol DisposeBagProvider: Deallocatable {

  /// A `DisposeBag` that can be used to dispose observations and bindings.
  var bag: DisposeBag { get }
}

extension DisposeBagProvider {

  public var deallocated: Signal<Void, Never> {
    return bag.deallocated
  }
}

如您所见,DisposeBagProvider 继承自 Deallocatable 并通过从袋中获取释放信号来实现它。所以您需要做的是在您的类型上提供 bag 属性。

BindingExecutionContextProvider 协议提供对象应更新的执行上下文。执行上下文就是一个dispatch队列或线程的包装。您可以在 这里 看到它的实现方式。

public protocol BindingExecutionContextProvider {

  /// An execution context used to deliver binding events.
  var bindingExecutionContext: ExecutionContext { get }
}

Bond 框架为各种 UIKit 对象提供了 BindingExecutionContextProvider 遵从,以便在确保主线程的情况下平滑绑定。

您可以通过提供执行上下文来符合此协议。

extension MyViewModel: BindingExecutionContextProvider {

  public var bindingExecutionContext: ExecutionContext {
    return .immediateOnMain
  }
}

ExecutionContext.immediateOnMain 如果当前线程是主线程,则同步执行,否则,它会对主队列进行异步调度。如果您想绑定到后台队列,请返回 .global(qos: .background) 代替。

请注意,更新 UIKit 或 AppKit 对象始终必须在主线程或队列中发生。

现在我们可以探索绑定实现的细节。

extension SignalProtocol where Error == Never {

  @discardableResult
  public func bind<Target: Deallocatable>(to target: Target, setter: @escaping (Target, Element) -> Void) -> Disposable
  where Target: BindingExecutionContextProvider
  {
    return take(until: target.deallocated)
      .observeIn(target.bindingExecutionContext)
      .observeNext { [weak target] element in
        if let target = target {
          setter(target, element)
        }
      }
  }
}

首先,注意 @discardableResult 注解。因为它表示我们可以安全地忽略返回的可释放对象,所以它在那里。绑定将在目标被释放时自动释放。这是通过 take(until:) 运算符来确保的。它会将事件从 self 传播到给定的信号完成 - 在我们的示例中,即直到 target.deallocated 信号完成。我们只是在正确的上下文中观察,并在下一个元素上使用提供的 setter 闭包更新目标。

请注意,绑定仅在不可失败的信号上实现。

绑定到一个属性

给定一个字符串信号 name,我们知道我们可以通过以下方式将其绑定到一个标签上:

name.bind(to: label) { label, name in
  label.text = name
}

但这不是很好吗?我们可以通过 Swift 4 中的键路径来完成!只需做

name.bind(to: label, keyPath: \.text)

这里的 target 和前面的示例中的相同,而 keyPath 是用于更新信号的每个新元素应该更新的属性的键路径!

如果您选择加入 Bond 框架,事情将更加简单

name.bind(to: label.reactive.text)

Bond 提供了一个名为 Bond 的类型,它充当绑定目标,我们可以使用它为各种属性创建响应式扩展。查看其文档以获取更多信息。

共享事件序列

每次我们观察一个信号时,我们都会执行它的生成器。考虑以下信号

let user = Signal { observer in
  print("Fetching user...")
  ...
}

如果我们现在做

user.observe { ... } // prints: Fetching user...
user.observe { ... } // prints: Fetching user...

生成器将被调用两次,并且用户将被获取两次。相同的动作可能没有在代码中引起注意,如下所示:

user.map { $0.name }.observe { ... } // prints: Fetching user...
user.map { $0.email }.observe { ... } // prints: Fetching user...

您可以将每个信号观察视为一个独立的过程。我们经常需要这种行为,但在某些情况下,我们可以通过共享一个序列到多个观察者来优化我们的代码。为了实现这一点,我们只需应用运算符 shareReplay(limit:) 即可。

let user = user.shareReplay(limit: 1)

user.map { $0.name }.observe { ... } // prints: Fetching user...
user.map { $0.email }.observe { ... } // Does not print anything, but still gets the user :)

参数 limit 指定了应该重放给观察者的元素(.next 事件)的数量。始终重放终端事件。通常,一个元素就是我们需要的所有。运算符 shareReplay(limit:) 是两个运算符的组合。为了理解它,我们将引入两个有趣的概念:subject 和可连接的信号。

主题

在文档开始时,我们使用 SignalProtocol 协议定义了信号。然后我们通过为每个观察值执行生产者闭包,实现了一个具体遵循该协议的 Signal 类型。生产者会将事件发送给方法 observe(with:) 中提供的观察者。

我们能否用不同的方式实现信号?让我们尝试创建另一种类型的信号——也是一种观察者。我们将其称为 Subject。下面是 ReactiveKit 提供的 Subject 简化实现。

open class Subject<Element, Error: Swift.Error>: SignalProtocol, ObserverProtocol {

  private var observers: [Observer<Element, Error>] = []

  open func on(_ event: Signal<Element, Error>.Event) {
    observers.forEach { $0(event) }
  }

  open func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    observers.append(observer)
    return /* a disposable that removes the observer from the array */
  }
}

我们这种新型信号,主题,它自身就是一个观察者,持有它所有观察者数组。当主题收到一个事件(当方法 on(_:) 被调用时),该事件会直接传播给所有已注册的观察者。观察这个主题意味着将给定的观察者添加到观察者数组中。

我们如何使用这种主题?

let name = Subject<String, Never>()

name.observeNext { name in print("Hi \(name)!") }

name.on(.next("Jim")) // prints: Hi Jim!

// ReactiveKit provides few extension toon the ObserverProtocol so we can also do:
name.send("Kathryn") // prints: Hi Kathryn!

name.send(completion: .finished)

注意:在使用 ReactiveKit 时,你应该实际使用 PassthroughSubject 而不是它。它具有与这里定义的 Subject 相同的行为和接口——只是一个不同的名称,以保持与 ReactiveX API 的一致性。

如你所见,我们没有生产者闭包,而是将事件发送给主题本身。然后主题将那些事件传递给它的观察者。

主题在需要将命令式世界的操作转换为反应式世界的信号时非常有用。例如,如果我们需要将视图控制器出现的作为信号。我们可以创建一个主题属性,然后从 viewDidAppear 覆盖函数中向它发送事件。这样的主题将代表视图控制器出现事件的信号。

class MyViewController: UIViewController {

  fileprivate let _viewDidAppear = PassthroughSubject<Void, Never>()

  override viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    _viewDidAppear.send()
  }
}

我们可以公开主题,但是这样任何人都可以向它发送事件。更好的方法是将其设置为 fileprivate ,就像我们做的那样,然后将其公开作为信号。建议将所有反应式扩展放在由 ReactiveKit 提供的 ReactiveExtensions 类型的扩展中。下面是如何操作:

extension ReactiveExtensions where Base: MyViewController {

  var viewDidAppear: Signal<Void, Never> {
    return base._viewDidAppear.toSignal() // convert Subject to Signal
  }
}

然后我们可以像这样使用我们的信号

myViewController.reactive.viewDidAppear.observeNext {
  print("view did appear")
}

主题代表一种被称为 热信号 的信号类型。它们被称为热信号,因为它们无论是否有已注册的观察者都会“发送”事件。相反,Signal 类型代表了一种称为 冷信号 的信号类型。这种类型的信号在我们给出将接收事件的观察者之前不会产生事件。

如实施例所示,观察主题仅给我们观察者注册后发送的事件。任何在观察者注册之前可能已经发送的事件都不会被观察者接收。有办法解决这个问题吗?是的,我们可以缓冲接收到的事件,然后将它们回放给新的观察者。让我们在一个子类中这样做。

public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element, Error> {

  private var buffer: [Signal<Element, Error>.Event] = []

  public override func on(_ event: Signal<Element, Error>.Event) {
    buffer.append(event)
    super.on(event)
  }

  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    buffer.forEach { observer($0) }
    return super.observe(with: observer)
  }
}

这又是 ReactiveKit 提供的 ReplaySubject 的简化版本,但它包含了阐明概念所需的一切。每当收到一个事件时,我们就将它放入缓冲区。当观察者注册时,我们然后 回放 我们在缓冲区中所有的事件。任何未来的事件将像在 Subject 中一样传播。

注意:ReplaySubject是ReactiveKit提供的一个支持限制缓冲区大小,使其不至于无限增长的类。通常,我们可以通过以ReplaySubject(bufferSize: 1)的方式实例化它,将其限制为只保存一个事件就足够了。缓冲区始终保留最新事件,并删除较旧的事件。

在这个过程中,你可能已经有个大概的概念,如何实现shareReplay运算符的行为。我们可以使用replay subject观察原始信号,然后再多次观察该subject。但为了将其作为一个运算符实现,并使其对用户透明,我们需要了解可连接的信号。

可连接信号

迄今为止,我们已看到两种类型的信号。一种是当观察者注册时才会产生事件的Signal,另一种是不论是否有观察者注册,都会产生事件的Subject。可连接信号将是我们将要实现的第三种信号。这种信号将在我们调用其上的connect()方法时开始产生事件。我们先定义一个协议。

/// Represents a signal that is started by calling `connect` on it.
public protocol ConnectableSignalProtocol: SignalProtocol {

  /// Start the signal.
  func connect() -> Disposable
}

我们将构建一个可作为其他任何类型信号包装的可连接信号。我们将利用subject来实现。

public final class ConnectableSignal<Source: SignalProtocol>: ConnectableSignalProtocol {

  private let source: Source
  private let subject: Subject<Source.Element, Source.Error>

  public init(source: Source, subject: Subject<Source.Element, Source.Error>) {
    self.source = source
    self.subject = subject
  }

  public func connect() -> Disposable {
    return source.observe(with: subject)
  }

  public func observe(with observer: @escaping Observer<Source.Element, Source.Error>) -> Disposable {
    return subject.observe(with: observer)
  }
}

这里我们需要两样东西:一个是我们将包装成可连接信号的原信号,另一个是将事件从源信号传播到可连接信号观察者的subject。我们将在初始化器中需要它们,并将它们作为属性保存。

观察可连接信号实际上就是观察底层subject。启动这个信号现在很简单 - 我们只需要使用subject开始观察源信号(记住 - subject本身也是一个观察者)。这将使事件从源流向subject注册的观察者。

我们现在拥有了实现shareReplay(limit:)的所有部件。让我们从replay(limit:)开始。

extension SignalProtocol {

  /// Ensure that all observers see the same sequence of elements. Connectable.
  public func replay(_ limit: Int = Int.max) -> ConnectableSignal<Self> {
    return ConnectableSignal(source: self, subject: ReplaySubject(bufferSize: limit))
  }
}

这是很容易的。创建一个带有ReplaySubjectConnectableSignal确保所有观察者都得到相同的事件序列,并且只观察原始信号一次。唯一的问题是,返回的信号是一个可连接信号,因此我们必须在它上面调用connect()来启动事件。

我们需要以某种方式将可连接信号转换成非连接信号。为此,我们需要在正确的时间调用connect,以及正确的时间调用dispose。正确的时间是什么时候?显然,正确连接的时间是在第一次观察时,正确dispose的时间是在最后一个观察者dispose时。

为了做到这一点,我们将保持一个引用计数。每次有新的观察者时,计数增加,而每次dispose时,计数减少。我们将在计数从0变为1时连接,在计数从1变为0时dispose。

public extension ConnectableSignalProtocol {

  /// Convert connectable signal into the ordinary signal by calling `connect`
  /// on the first observation and calling dispose when number of observers goes down to zero.
  public func refCount() -> Signal<Element, Error> {
    // check out: https://github.com/ReactiveKit/ReactiveKit/blob/e781e1d0ce398259ca38cc0d5d0ed6b56d8eab39/Sources/Connectable.swift#L68-L85
   }
}

实现 shareReplay 运算符

现在我们知道了about subjects 和 可连接信号,我们可以实现 shareReplay(limit:) 运算符。这是相当简单的

/// Ensure that all observers see the same sequence of elements.
public func shareReplay(limit: Int = Int.max) -> Signal<Element, Error> {
  return replay(limit).refCount()
}

处理信号错误

你可能选择忽略它们并延迟,但最终你将需要处理信号可能出现的错误。

如果信号有通过重试原始生产者恢复的潜力,你可以使用 retry 操作符。

let image /*: Signal<UIImage, NetworkError> */ = getImage().retry(3)

想象一下在命令式范式下,这需要多少行代码 :)

retry 操作符有时才会起作用,并且最终会失败。应用该操作符的结果仍然是一个可能失败的信号。

如何将可能失败的信号转换为非失败(安全)信号?我们必须以某种方式处理错误。一种方式是用默认元素恢复。

let image /*: Signal<UIImage, Never> */ = getImage().recover(with: .placeholder)

现在我们得到了安全的 Signal,因为转换后的信号永远不会失败。任何可能在原始信号上发生的 .failed 事件将仅用包含默认元素(例如,我们例子中的占位符图像)的 .next 事件替换。

获取安全信号的另一种方法是忽略(抑制)错误。你会这样做,如果你真的不关心错误,忽略它不会发生什么坏事情。

let image /*: Signal<UIImage, Never> */ = getImage().suppressError(logging: true)

始终记录错误是一个好主意。

如果需要执行错误的替代逻辑,你将将其扁平映射到另一个信号上。

let image = getImage().flatMapError { error in
  return getAlternativeImage()
}

属性

属性将可变状态包裹在一个对象中,该对象使观察该状态成为可能。只要状态发生变化,观察者就会被通知。就像 PassthroughSubject 一样,它代表了对命令式世界的桥梁。

要创建属性,只需用初始值初始化它。

let name = Property("Jim")

nil 是包装可选类型的属性的合法值。

属性与 Signal 类型的信号一样,是信号。它们可以转换为其他信号,并以与信号相同的方式观察和绑定。

例如,你可以使用 observeobserveNext 方法注册观察者。

name.observeNext { value in
  print("Hi \(value)!")
}

当你注册观察者时,它将被立即用属性的当前值调用,所以片段将打印 "Hi Jim!"。

之后更改属性的值,只需设置 value 属性。

name.value = "Jim Kirk" // Prints: Hi Jim Kirk!

加载信号

信号通常代表异步操作,例如网络调用。一个好的应用程序会在调用进行时向用户显示某种类型的加载指示器,并在调用失败时显示错误对话框,可能包含重试选项。为了简化这些用例,ReactiveKit 提供了 LoadingSignalLoadingProperty 类型。

一个操作或工作可以处于以下三种状态之一:正在加载、加载完成、加载失败。ReactiveKit 使用枚举 LoadingState 定义这些状态。

/// Represents loading state of an asynchronous action.
public enum LoadingState<LoadingValue, LoadingError: Error>: LoadingStateProtocol {

  /// Value is loading.
  case loading

  /// Value is loaded.
  case loaded(LoadingValue)

  /// Value loading failed with the given error.
  case failed(LoadingError)
}

具有 LoadingState 类型元素的信号被别名为 LoadingSignal

public typealias LoadingSignal<LoadingValue, LoadingError: Error> = Signal<LoadingState<LoadingValue, LoadingError>, Never>

请注意,加载信号是一个安全的信号。信号本身永远不会失败,但可以作为 .failed 加载状态发出错误。这意味着错误不会终止信号 - 错误发生后,可以接收到新的事件。

如何将常规信号转换为加载信号?只需应用 toLoadingSignal 操作符。假设我们有一个表示某些资源获取操作的信号

let fetchImage: Signal<UIImage, ApplicationError> = ...

然后,我们可以通过应用 toLoadingSignal 操作符将那个信号转换为加载信号。

fetchImage
    .toLoadingSignal()
    .observeNext { loadingState in
        switch loadingState {
        case .loading:
            // display loading indicator
        case .loaded(let image):
            // hide loading indicator
            // display image
        case .failed(let error):
            // hide loading indicator
            // display error message
        }
    }

观察下一个元素现在给出了信号的加载状态。当我们开始观察时,我们会收到 .loading 状态。当资源加载完成时,我们将收到资源在 .loaded 状态或错误在 .failed 状态。

消耗加载状态

加载信号看起来不错,但手动更新为加载数据视图的每个视图的加载状态并不有趣。幸运的是,有更好的方法——LoadingStateListener 协议。

/// A consumer of loading state.
public protocol LoadingStateListener: class {

    /// Consume observed loading state.
    func setLoadingState<LoadingValue, LoadingError>(_ state: ObservedLoadingState<LoadingValue, LoadingError>)
}

此协议可以由任何根据显示数据的加载状态更新其外观的东西实现。在 iOS 上,合适的选择可能是 UIViewController 或 UIView。例如

extension UIViewController: LoadingStateListener {

    public func setLoadingState<LoadingValue, LoadingError>(_ state: ObservedLoadingState<LoadingValue, LoadingError>) {
        switch state {
        case .loading:
            // display loading indicator
        case .reloading:
            // display reloading indicator
        case .loaded(let value):
            // hide loading indicator
            // display value
        case .failed(let error):
            // hide loading indicator
            // display error
        }
    }
}

请注意,LoadingStateListener 获得了 ObservedLoadingState 而不是 LoadingStateObservedLoadingStateLoadingState 的差异在于后者有一个额外状态:.reloading。ReactiveKit 会自动将后续的 .loading 状态转换为 .reloading 状态,这样您就可以在这两种情况下采取不同的行动。

现在,我们已经有了加载状态监听器,我们可以将任何加载信号转换为常规安全信号,通过监听器消耗其加载状态

fetchImage
    .toLoadingSignal()
    .consumeLoadingState(by: viewController)
    .bind(to: viewController.imageView) { imageView, image in
        imageView.image = image
    }

太棒了!consumeLoadingState 操作符接受加载状态监听器,并在加载信号生成状态时更新它。它返回一个加载值的神州信号,即它从 .loaded 状态解包底层值。在我们的例子中,这将是一个 Signal<UIImage, Never>,然后我们可以将其绑定到我们的图像视图并更新其内容。

转换加载信号

ReactiveKit 提供了针对加载数据信号的一批操作符,比如 valuemapValuemapLoadingErrordematerializeLoadingState 以及 flatMapValue。然而,您也可以将常规的操作符应用于加载数据的信号,这些操作符会作用于它们的值。要做到这一点,可以使用 liftValue 操作符。例如,跳过前三个值并且延迟一秒处理,您可以这样做

aLoadingSignal.liftValue {
    $0.skip(first: 3).delay(interval: 1)
}

liftValue 接受一个闭包,这个闭包接收一个常规信号,然后您可以使用常规信号操作符对其进行转换。

加载属性

我们通常需要一种方法来存储异步操作的结果以及一种刷新(重新加载)的方法。为此,我们可以使用 LoadingProperty 类型。它与常规的 Property 类型类似,但不同之处在于,我们不是用值来初始化它,而是用提供一个加载信号的闭包来初始化它——一个可以执行某些工作的闭包。LoadingProperty 可以像其他任何 LoadingSignal 一样使用。当它第一次被观察(或绑定)时,它将加载数值,即执行工作。它还提供了一个方法,通过再次执行工作来重新加载数值。

以下是如何使用 LoadingProperty 来实现简单的用户服务的一个示例

class UserService {

    let user: LoadingProperty<User, ApplicationError>

    init(_ api: API) {

        user = LoadingProperty {
            api.fetchUser()
        }
    }

    func refresh() -> LoadingSignal<User, ApplicationError> {
        return user.reload()
    }
}

其他常用模式

在 .next 事件上执行操作

假设您有一个按钮,在该按钮上(重新)加载您的应用中的图片。如何在响应式编程世界中实现这个功能呢?首先,我们需要一个表示按钮点击的信号。使用 Bond 框架,您可以像这样获取这个信号

let reload /*: Signal<Void, Never> */ = button.reactive.tap

每当按钮被点击时,该信号会发送一个 .next 事件。我们希望在每次这样的事件上加载图片。为了做到这一点,我们将重新加载信号平铺映射到图片请求。

let photo = reload.flatMapLatest { _ in
  return apiClient().loadPhoto() // returns Signal<UIImage, NetworkError>
}

photo 将具有内部信号的任何类型 - 在我们的情况下,Signal<UIImage, NetworkError>。然后我们可以将其绑定到图像视图

photo
  .suppressError(logging: true)  // we can bind only safe signals
  .bind(to: imageView.reactive.image) // using the Bond framework

这意味着每当按钮被点击时,都会发出新的图片请求,并且图像视图的图片将得到更新。

还有两个其他操作符平铺映射信号:flatMapConcatflatMapMerge。这三个之间在事件从内部信号传播时的不同之处在于它们处理多个活动内部信号的情况。例如,假设在先前的请求完成之前,用户点击了重新加载按钮。那么会发生什么?

  • 《flatMapLatest》会将之前的信号丢弃,并启动一个新的信号。
  • 《flatMapConcat》会启动一个新的信号,但在上一个信号完成之前,它不会传递其事件。
  • 《flatMapMerge》会启动一个新的信号,但它将传播来自所有信号的事件,无论哪个信号先出现。

组合多个信号

假设你有一个用户名和密码信号,并且你希望有一个信号告诉你它们是否都已输入,以便你可以启用登录按钮。你可以使用《combineLatest》操作符来实现这一点。

let username = usernameLabel.reactive.text
let password = passwordLabel.reactive.text

let canLogIn = combineLatest(username, password) { username, password in
  return !username.isEmpty && !password.isEmpty
}

canLogIn.bind(to: loginButton.reactive.isEnabled)

你需要提供给操作符的是信号以及一个闭包,该闭包将映射那些信号的最新元素到一个新的元素。

响应式扩展由Bond框架提供。

调试

时间线

ReactiveKit内置了对Timelane Xcode工具的支持。只需下载该工具,然后使用《lane》操作符将信号数据发送到Timelane工具。

mySignal
  .filter { ... }
  .lane("My Signal")
  .map { ... }
  .sink {
    ... 
  }

这只是一个单行代码!

请注意,“lane”仅在macOS 10.14、iOS 12、tvOS 12、watchOS 5或更高版本上可用。如果你正在编译旧系统版本,可以使用《laneIfAvailable》操作符进行便捷操作,但请注意,在旧系统版本上进行测试时事件日志将静默失败。

调试操作符

你可以通过应用《debug》操作符将信号事件打印到控制台。

mySignal
  .filter { ... }
  .debug("My Signal")
  .map { ... }
  .sink {
    ... 
  }

断点

ReactiveKit 还提供了一个 断点 操作符。它是基于 Combine 的 断点操作符 实现的。

要求

  • iOS 8.0+ / macOS 10.11+ / tvOS 9.0+ / watchOS 2.0+
  • Xcode 10.2

  • Linux + Swift 5.0

安装

Carthage

github "DeclarativeHub/ReactiveKit"

CocoaPods

pod 'ReactiveKit'

Swift Package Manager

// swift-tools-version:5.0

import PackageDescription

let package = Package(
  name: "MyApp",
  dependencies: [
    .package(url: "https://github.com/DeclarativeHub/ReactiveKit.git", from: "3.10.0")
  ],
  targets: [
    .target(name: "MyApp", dependencies: ["ReactiveKit"])
  ]
)

通信

附加文档

许可

MIT许可(MIT)

版权所有 (c) 2015-2020 Srdan Rasic (@srdanrasic)

在此特此允许任何获得此软件及其相关文档文件(以下简称“软件”)副本的人,可以不受限制地处理该软件,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件的副本,以及允许提供给软件的人按以下条件这样做

以上版权声明和本许可声明应包含在软件的所有副本或主要部分中。

软件按“原样”提供,不提供任何明示或暗示的保证,包括但不限于适销性、针对特定目的的适用性和非侵权性保证。在任何情况下,作者或版权所有者都不应对任何索赔、损害或其他责任(不论是否因合同、侵权或其他原因而产生),包括但又不仅限于因软件或其使用或操作而产生的任何索赔、损害或其他责任负责。