CombineExt 1.8.0

CombineExt 1.8.0

Shai Mishali 维护。



  • Combine 社区和 Shai Mishali

CombineExt



Build Status Code Coverage for CombineExt on codecov

CombineExt supports CocoaPods CombineExt supports Swift Package Manager (SPM) CombineExt supports Carthage

CombineExt 提供了一组由 Apple 提供的 Combine 操作符、发布者和工具,这些在其他响应式框架和标准中很常见。

许多这些添加的原始灵感来源于我多年来使用 RxSwift 和 ReactiveX 后探索 Combine 的经历。

所有操作符、工具和助手都尊重 Combine 的发布者合约,包括背压。

操作符

发布者

主体

提示:CombineExt仍然处于相对早期版本,还有很多需要改进的地方。我乐意接受PR、想法、意见或改进建议。谢谢!:)

安装

CocoaPods

将以下行添加到您的 Podfile

pod 'CombineExt'

Swift Package Manager

将以下依赖项添加到您的 Package.swift 文件中

.package(url: "https://github.com/CombineCommunity/CombineExt.git", from: "1.0.0")

Carthage

Carthage支持提供预构建的二进制文件。

将以下内容添加到您的 Cartfile

github "CombineCommunity/CombineExt"

运算符

本部分概述了CombineExt提供的一些自定义运算符。

withLatestFrom

通过将self的每个值与来自其他发布者的最新值(如果有)合并,将最多四个发布者合并成一个发布者。

let taps = PassthroughSubject<Void, Never>()
let values = CurrentValueSubject<String, Never>("Hello")

taps
  .withLatestFrom(values)
  .sink(receiveValue: { print("withLatestFrom: \($0)") })

taps.send()
taps.send()
values.send("World!")
taps.send()

输出

withLatestFrom: Hello
withLatestFrom: Hello
withLatestFrom: World!

flatMapLatest

将输出值转换为一个新的发布者,并将多个上游发布者的事件流合并成单个事件流。

将映射到新发布者将取消前一个订阅的订阅,只保持单一订阅活跃以及其事件发射。

注意flatMapLatestmapswitchToLatest的组合。

let trigger = PassthroughSubject<Void, Never>()
trigger
    .flatMapLatest { performNetworkRequest() }

trigger.send()
trigger.send() // cancels previous request
trigger.send() // cancels previous request

assign

CombineExt提供了对assign(to:on:)的定制重载,允许您同时将 发布者绑定到多个keypath目标。

var label1: UILabel
var label2: UILabel
var text: UITextField

["hey", "there", "friend"]
    .publisher
    .assign(to: \.text, on: label1,
            and: \.text, on: label2,
            and: \.text, on: text)

CombineExt还提供了一个额外的重载assign(to:on​:ownership),允许您指定您想要为您的assign操作指定的拥有权类型:strongweakunowned

// Retain `self` strongly
subscription = subject.assign(to: \.value, on: self)
subscription = subject.assign(to: \.value, on: self, ownership: .strong)

// Use a `weak` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .weak)

// Use an `unowned` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .unowned)

amb

Amb接受多个发布者,并将第一个发布者的事件镜像出来。您可以将它想象成一个发布者的竞赛,第一个发布者发射事件,而其他发布者则被忽略(还有一个Collection.amb方法来简化与多个发布者的协同工作)。

名字amb来自于Reactive Extensions运算符,在RxJS中也被称作race

let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()

subject1
  .amb(subject2)
  .sink(receiveCompletion: { print("amb: completed with \($0)") },
        receiveValue: { print("amb: \($0)") })

subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)

subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)

输出

amb: 3
amb: 6
amb: completed with .finished

materialize

将任何发布者转换为事件发布者。给定一个Publisher,此操作符将返回一个Publisher, Never>,这意味着您的失败实际上是一个普通值,这使得在许多场景中错误处理变得更加简单。

let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
  case ohNo
}

values
  .materialize()
  .sink(receiveCompletion: { print("materialized: completed with \($0)") },
        receiveValue: { print("materialized: \($0)") })

values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))

输出

materialize: .value("Hello")
materialize: .value("World")
materialize: .value("What's up?")
materialize: .failure(.ohNo)
materialize: completed with .finished

给定一个已转换的发布者,仅发布发出的上游值,省略失败。给定一个Publisher, Never>,此操作符将返回一个Publisher

注意:此操作符仅适用于使用materialize()操作符材料化的发布者。

let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
  case ohNo
}

values
  .materialize()
  .values()
  .sink(receiveValue: { print("values: \($0)") })

values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))

输出

values: "Hello"
values: "World"
values: "What's up?"

失败

给定一个已转换的发布者,仅发布发出的上游失败,省略值。给定一个Publisher, Never>,此操作符将返回一个Publisher

注意:此操作符仅适用于使用materialize()操作符材料化的发布者。

let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
  case ohNo
}

values
  .materialize()
  .failures()
  .sink(receiveValue: { print("failures: \($0)") })

values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))

输出

failure: MyError.ohNo

去物质化

将之前物质化的发布者转换回其原始形式。给定一个Publisher<Event<String, MyError>, Never>,此操作符将返回一个Publisher<String, MyError>

注意:此操作符仅适用于使用materialize()操作符材料化的发布者。


分区

将发布者的值分成两个独立的发布者,一个与提供的谓词匹配,另一个不匹配。

let source = PassthroughSubject<Int, Never>()

let (even, odd) = source.partition { $0 % 2 == 0 }

even.sink(receiveValue: { print("even: \($0)") })
odd.sink(receiveValue: { print("odd: \($0)") })

source.send(1)
source.send(2)
source.send(3)
source.send(4)
source.send(5)

输出

odd: 1
even: 2
odd: 3
even: 4
odd: 5

ZipMany

此存储库包括 Combine 的 Publisher.zip 方法(按照编写的时间,仅支持到阶数为三)的两种重载。

这允许您任意地将许多发布者组合在一起,并接收一个包含内部发布者输出的数组。

let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()

subscription = first
  .zip(with: second, third, fourth)
  .map { $0.reduce(0, +) }
  .sink(receiveValue: { print("zipped: \($0)") })

first.send(1)
second.send(2)
third.send(3)
fourth.send(4)

您还可以直接在具有相同输出和失败类型的发布者集合上使用 .zip(),例如。

[first, second, third, fourth]
  .zip()
  .map { $0.reduce(0, +) }
  .sink(receiveValue: { print("zipped: \($0)") })

输出

zipped: 10

MergeMany

该仓库包含一个针对集合的扩展,允许您直接在具有相同输出和失败类型的发布者集合上调用.merge()

这使得您可以将许多发布者任意组合,并从单个发布者接收内部发布者输出。

let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()

subscription = [first, second, third, fourth]
  .merge()
  .sink(receiveValue: { print("output: \($0)") })

first.send(1)
second.send(2)
third.send(3)
fourth.send(4)

输出

output: 1
output: 2
output: 3
output: 4

CombineLatestMany

该仓库包括Combine的Publisher.combineLatest方法(到写作时为止,此方法的参数数量最多为三个)的两种重载以及一个限制的Collection.combineLatest扩展。

这使您能够任意组合许多发布者,并接收一个包含内部发布者输出的数组。

let first = PassthroughSubject<Bool, Never>()
let second = PassthroughSubject<Bool, Never>()
let third = PassthroughSubject<Bool, Never>()
let fourth = PassthroughSubject<Bool, Never>()

subscription = [first, second, third, fourth]
  .combineLatest()
  .sink(receiveValue: { print("combineLatest: \($0)") })

first.send(true)
second.send(true)
third.send(true)
fourth.send(true)

first.send(false)

输出

combineLatest: [true, true, true, true]
combineLatest: [false, true, true, true]

FilterMany

将发布者集合中的元素过滤到一个新的发布者集合中。

let intArrayPublisher = PassthroughSubject<[Int], Never>()

intArrayPublisher
  .filterMany { $0.isMultiple(of: 2) }
  .sink(receiveValue: { print($0) })

intArrayPublisher.send([10, 2, 4, 3, 8])

输出

none
[10, 2, 4, 8]

MapMany

将发布者集合的每个元素映射到新的发布者集合形式。

let intArrayPublisher = PassthroughSubject<[Int], Never>()
    
intArrayPublisher
  .mapMany(String.init)
  .sink(receiveValue: { print($0) })
    
intArrayPublisher.send([10, 2, 2, 4, 3, 8])

输出

["10", "2", "2", "4", "3", "8"]

setOutputType

Publisher.setOutputType(to:) 是当 Output 被限制为 Never 时与 .setFailureType(to:) 等价的操作。这在调用 .ignoreOutput() 后连用运算符时特别有用。


removeAllDuplicates

Publisher.removeAllDuplicates.removeAllDuplicates(by:) 是 Apple 的 Publisher.removeDuplicates.removeDuplicates(by:) 的更严格形式——这些运算符在所有之前的值事件上执行去重,而不是依次进行。

如果你的 Output 不符合 HashableEquatable,你可以使用基于比较器的该运算符版本来决定两个元素是否相等。

subscription = [1, 1, 2, 1, 3, 3, 4].publisher
  .removeAllDuplicates()
  .sink(receiveValue: { print("removeAllDuplicates: \($0)") })

输出

removeAllDuplicates: 1
removeAllDuplicates: 2
removeAllDuplicates: 3
removeAllDuplicates: 4

share(replay:)

Publisher.share 相似,.share(replay:) 可以用来创建具有引用语义的发布者实例,该实例将预定义数量的值事件回放给后续订阅者。

let subject = PassthroughSubject<Int, Never>()

let replayedPublisher = subject
  .share(replay: 3)

subscription1 = replayedPublisher
  .sink(receiveValue: { print("first subscriber: \($0)") })
  
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)

subscription2 = replayedPublisher
  .sink(receiveValue: { print("second subscriber: \($0)") })

输出

first subscriber: 1
first subscriber: 2
first subscriber: 3
first subscriber: 4
second subscriber: 2
second subscriber: 3
second subscriber: 4

前缀(duration:)

Publisher.prefix 上的重载函数,在指定的 duration(以秒为单位)期间重新发布值,然后结束。

let subject = PassthroughSubject<Int, Never>()

subscription = subject
  .prefix(duration: 0.5, on: DispatchQueue.main)
  .sink(receiveValue: { print($0) })
  
subject.send(1)
subject.send(2)
subject.send(3)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
  subject.send(4)
}

输出

1
2
3

前缀(while:behavior:)

Publisher.prefix(while:) 上允许包含第一个不满足 while 断言的元素的重载函数。

let subject = PassthroughSubject<Int, Never>()

subscription = subject
  .prefix(
    while: { $0 % 2 == 0 },
    behavior: .inclusive
  )
  .sink(
    receivecompletion: { print($0) },
    receiveValue: { print($0) }
  )
  
subject.send(0)
subject.send(2)
subject.send(4)
subject.send(5)
0
2
4
5
finished

toggle()

切换一个发布者集合中每个布尔元素的值。

let subject = PassthroughSubject<Bool, Never>()

subscription = subject
  .toggle()
  .sink(receiveValue: { print($0) })
  
subject.send(true)
subject.send(false)
subject.send(true)

输出

false
true
false

nwise

将源发布者的元素分组为N个连续元素的数组。

let subject = PassthroughSubject<Int, Never>()

subscription = subject
  .nwise(3)
  .sink(receiveValue: { print($0) })
  
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)

输出

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

成对

将源发布者的元素分组为前一个和当前元素的元组。

let subject = PassthroughSubject<Int, Never>()

subscription = subject
  .pairwise()
  .sink(receiveValue: { print("\($0.0) -> \($0.1)") })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)

输出

1 -> 2
2 -> 3
3 -> 4
4 -> 5

忽略输出(设置输出类型:)

同时忽略发布者的价值事件并重写其 输出 泛型。

let onlyAFour = ["1", "2", "3"].publisher
  .ignoreOutput(setOutputType: Int.self)
  .append(4)

忽略失败

CombineExt提供的几个重载用于忽略错误,并可选择指定新的错误类型以及在这种情况下是否触发完成。

  • 忽略失败(立即完成:)
  • 忽略失败(设置失败类型:立即完成:)
enum AnError {
  case someError 
}

let subject = PassthroughSubject<Int, AnError>()

subscription = subject
  .ignoreFailure() // The `completeImmediately` parameter defaults to `true`.
  .sink(receiveValue: { print($0) }, receiveCompletion: { print($0) })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))

输出

1
2
3
.finished

mapToResult

将类型为 AnyPublisher<Output, Failure> 的发布者转换为 AnyPublisher<Result<Output, Failure>, Never>

enum AnError: Error {
    case someError
}

let subject = PassthroughSubject<Int, AnError>()

let subscription = subject
    .mapToResult()
    .sink(receiveCompletion: { print("completion: \($0)") },
          receiveValue: { print("value: \($0)") })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))

Output

value: success(1)
value: success(2)
value: success(3)
value: failure(AnError.someError)
completion: finished

flatMapBatches(of:)

`Collection.flatMapBatches(of:)` 以批量的形式订阅接收者的内容发布者,并按批量返回它们的输出(同时保持顺序)。只有当之前的批量成功完成后,才会订阅后续的发布者批量 —— 任何失败都会向下传递。

let ints = (1...6).map(Just.init)

subscription = ints
  .flatMapBatches(of: 2)
  .sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })

Output

[1, 2]
[3, 4]
[5, 6]
.finished

Publishers

本节概述了CombineExt提供的一些自定义Combine发布者

AnyPublisher.create

一个发布者,它接受一个具有订阅者参数的闭包,您可以向其中动态发送值或完成事件。

这使您能够轻松创建自定义发布者来包装任何非发布者异步工作,同时仍然尊重下游消费者对背压的需求。

您应该从闭包返回一个符合 Cancellable 的对象,您可以在此对象中定义当发布者完成或取消对发布者的订阅时要执行的任何清理操作。

AnyPublisher<String, MyError>.create { subscriber in
  // Values
  subscriber.send("Hello")
  subscriber.send("World!")
  
  // Complete with error
  subscriber.send(completion: .failure(MyError.someError))
  
  // Or, complete successfully
  subscriber.send(completion: .finished)

  return AnyCancellable { 
    // Perform cleanup
  }
}

您还可以使用具有相同签名的 AnyPublisher 初始化器

AnyPublisher<String, MyError> { subscriber in 
    /// ...
    return AnyCancellable { }

当前值中继

当前值中继当前值主题相同,但有两个主要区别

  • 它只接受值,但不接受完成事件,这意味着它不会失败。
  • 它仅在分配时发布.finished事件。
let relay = CurrentValueRelay<String>("well...")

relay.sink(receiveValue: { print($0) }) // replays current value, e.g. "well..."

relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")

输出

well...
values
only
provide
great
guarantees

透传中继

透传中继透传主题相同,但有二个主要区别

  • 它只接受值,但不接受完成事件,这意味着它不会失败。
  • 它仅在分配时发布.finished事件。
let relay = PassthroughRelay<String>()
relay.accept("well...")

relay.sink(receiveValue: { print($0) }) // does not replay past value(s)

relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")

输出

values
only
provide
great
guarantees

主题

重放主题

Combine的Rx的重放主题类型。它与当前值主题相似,因为它会缓存值,但它更进一步,允许消费者指定要缓冲和回放给未来订阅者的值的数量。此外,当缓冲区清除时,它还将处理所有完成事件的前传。

let subject = ReplaySubject<Int, Never>(bufferSize: 3)

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)

subject
  .sink(receiveValue: { print($0) })

subject.send(5)

输出

2
3
4
5

许可协议

当然是MIT许可证;-) 请参阅LICENSE文件。

苹果标志和Combine框架是苹果公司的财产。