CombineExt
CombineExt 提供了一组由 Apple 提供的 Combine 操作符、发布者和工具,这些在其他响应式框架和标准中很常见。
许多这些添加的原始灵感来源于我多年来使用 RxSwift 和 ReactiveX 后探索 Combine 的经历。
所有操作符、工具和助手都尊重 Combine 的发布者合约,包括背压。
操作符
- withLatestFrom
- flatMapLatest
- assign
- amb 和 Collection.amb
- materialize
- values
- failures
- dematerialize
- partition
- zip(with:) 和 Collection.zip
- Collection.merge()
- combineLatest(with:) 和 Collection.combineLatest
- mapMany(_:)
- filterMany(_:)
- setOutputType(to:)
- removeAllDuplicates 和 removeAllDuplicates(by:)
- share(replay:)
- prefix(duration:tolerance:on:options:)
- prefix(while:behavior:)
- toggle()
- nwise(_:) 和 pairwise()
- ignoreOutput(setOutputType:)
- ignoreFailure
- mapToResult
- flatMapBatches(of:)
发布者
主体
提示:CombineExt仍然处于相对早期版本,还有很多需要改进的地方。我乐意接受PR、想法、意见或改进建议。谢谢!:)
安装
CocoaPods
将以下行添加到您的 Podfile
pod 'CombineExt'
Swift Package Manager
将以下依赖项添加到您的 Package.swift 文件中
.package(url: "https://github.com/CombineCommunity/CombineExt.git", from: "1.0.0")
Carthage
Carthage支持提供预构建的二进制文件。
将以下内容添加到您的 Cartfile
github "CombineCommunity/CombineExt"
运算符
本部分概述了CombineExt提供的一些自定义运算符。
withLatestFrom
通过将self
的每个值与来自其他发布者的最新值(如果有)合并,将最多四个发布者合并成一个发布者。
let taps = PassthroughSubject<Void, Never>()
let values = CurrentValueSubject<String, Never>("Hello")
taps
.withLatestFrom(values)
.sink(receiveValue: { print("withLatestFrom: \($0)") })
taps.send()
taps.send()
values.send("World!")
taps.send()
输出
withLatestFrom: Hello
withLatestFrom: Hello
withLatestFrom: World!
flatMapLatest
将输出值转换为一个新的发布者,并将多个上游发布者的事件流合并成单个事件流。
将映射到新发布者将取消前一个订阅的订阅,只保持单一订阅活跃以及其事件发射。
注意:flatMapLatest
是map
和switchToLatest
的组合。
let trigger = PassthroughSubject<Void, Never>()
trigger
.flatMapLatest { performNetworkRequest() }
trigger.send()
trigger.send() // cancels previous request
trigger.send() // cancels previous request
assign
CombineExt提供了对assign(to:on:)
的定制重载,允许您同时将 发布者绑定到多个keypath目标。
var label1: UILabel
var label2: UILabel
var text: UITextField
["hey", "there", "friend"]
.publisher
.assign(to: \.text, on: label1,
and: \.text, on: label2,
and: \.text, on: text)
CombineExt还提供了一个额外的重载assign(to:on:ownership)
,允许您指定您想要为您的assign操作指定的拥有权类型:strong
、weak
或unowned
。
// Retain `self` strongly
subscription = subject.assign(to: \.value, on: self)
subscription = subject.assign(to: \.value, on: self, ownership: .strong)
// Use a `weak` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .weak)
// Use an `unowned` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .unowned)
amb
Amb接受多个发布者,并将第一个发布者的事件镜像出来。您可以将它想象成一个发布者的竞赛,第一个发布者发射事件,而其他发布者则被忽略(还有一个Collection.amb
方法来简化与多个发布者的协同工作)。
名字amb
来自于Reactive Extensions运算符,在RxJS中也被称作race
。
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
subject1
.amb(subject2)
.sink(receiveCompletion: { print("amb: completed with \($0)") },
receiveValue: { print("amb: \($0)") })
subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)
subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)
输出
amb: 3
amb: 6
amb: completed with .finished
materialize
将任何发布者转换为事件发布者。给定一个Publisher
,此操作符将返回一个Publisher
,这意味着您的失败实际上是一个普通值,这使得在许多场景中错误处理变得更加简单。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.sink(receiveCompletion: { print("materialized: completed with \($0)") },
receiveValue: { print("materialized: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
输出
materialize: .value("Hello")
materialize: .value("World")
materialize: .value("What's up?")
materialize: .failure(.ohNo)
materialize: completed with .finished
值
给定一个已转换的发布者,仅发布发出的上游值,省略失败。给定一个Publisher
,此操作符将返回一个Publisher
。
注意:此操作符仅适用于使用materialize()
操作符材料化的发布者。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.values()
.sink(receiveValue: { print("values: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
输出
values: "Hello"
values: "World"
values: "What's up?"
失败
给定一个已转换的发布者,仅发布发出的上游失败,省略值。给定一个Publisher
,此操作符将返回一个Publisher
。
注意:此操作符仅适用于使用materialize()
操作符材料化的发布者。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.failures()
.sink(receiveValue: { print("failures: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
输出
failure: MyError.ohNo
去物质化
将之前物质化的发布者转换回其原始形式。给定一个Publisher<Event<String, MyError>, Never>
,此操作符将返回一个Publisher<String, MyError>
注意:此操作符仅适用于使用materialize()
操作符材料化的发布者。
分区
将发布者的值分成两个独立的发布者,一个与提供的谓词匹配,另一个不匹配。
let source = PassthroughSubject<Int, Never>()
let (even, odd) = source.partition { $0 % 2 == 0 }
even.sink(receiveValue: { print("even: \($0)") })
odd.sink(receiveValue: { print("odd: \($0)") })
source.send(1)
source.send(2)
source.send(3)
source.send(4)
source.send(5)
输出
odd: 1
even: 2
odd: 3
even: 4
odd: 5
ZipMany
此存储库包括 Combine 的 Publisher.zip
方法(按照编写的时间,仅支持到阶数为三)的两种重载。
这允许您任意地将许多发布者组合在一起,并接收一个包含内部发布者输出的数组。
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
subscription = first
.zip(with: second, third, fourth)
.map { $0.reduce(0, +) }
.sink(receiveValue: { print("zipped: \($0)") })
first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
您还可以直接在具有相同输出和失败类型的发布者集合上使用 .zip()
,例如。
[first, second, third, fourth]
.zip()
.map { $0.reduce(0, +) }
.sink(receiveValue: { print("zipped: \($0)") })
输出
zipped: 10
MergeMany
该仓库包含一个针对集合的扩展,允许您直接在具有相同输出和失败类型的发布者集合上调用.merge()
。
这使得您可以将许多发布者任意组合,并从单个发布者接收内部发布者输出。
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
subscription = [first, second, third, fourth]
.merge()
.sink(receiveValue: { print("output: \($0)") })
first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
输出
output: 1
output: 2
output: 3
output: 4
CombineLatestMany
该仓库包括Combine的Publisher.combineLatest
方法(到写作时为止,此方法的参数数量最多为三个)的两种重载以及一个限制的Collection.combineLatest
扩展。
这使您能够任意组合许多发布者,并接收一个包含内部发布者输出的数组。
let first = PassthroughSubject<Bool, Never>()
let second = PassthroughSubject<Bool, Never>()
let third = PassthroughSubject<Bool, Never>()
let fourth = PassthroughSubject<Bool, Never>()
subscription = [first, second, third, fourth]
.combineLatest()
.sink(receiveValue: { print("combineLatest: \($0)") })
first.send(true)
second.send(true)
third.send(true)
fourth.send(true)
first.send(false)
输出
combineLatest: [true, true, true, true]
combineLatest: [false, true, true, true]
FilterMany
将发布者集合中的元素过滤到一个新的发布者集合中。
let intArrayPublisher = PassthroughSubject<[Int], Never>()
intArrayPublisher
.filterMany { $0.isMultiple(of: 2) }
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 4, 3, 8])
输出
none
[10, 2, 4, 8]
MapMany
将发布者集合的每个元素映射到新的发布者集合形式。
let intArrayPublisher = PassthroughSubject<[Int], Never>()
intArrayPublisher
.mapMany(String.init)
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 2, 4, 3, 8])
输出
["10", "2", "2", "4", "3", "8"]
setOutputType
Publisher.setOutputType(to:)
是当 Output
被限制为 Never
时与 .setFailureType(to:)
等价的操作。这在调用 .ignoreOutput()
后连用运算符时特别有用。
removeAllDuplicates
Publisher.removeAllDuplicates
和 .removeAllDuplicates(by:)
是 Apple 的 Publisher.removeDuplicates
和 .removeDuplicates(by:)
的更严格形式——这些运算符在所有之前的值事件上执行去重,而不是依次进行。
如果你的 Output
不符合 Hashable
或 Equatable
,你可以使用基于比较器的该运算符版本来决定两个元素是否相等。
subscription = [1, 1, 2, 1, 3, 3, 4].publisher
.removeAllDuplicates()
.sink(receiveValue: { print("removeAllDuplicates: \($0)") })
输出
removeAllDuplicates: 1
removeAllDuplicates: 2
removeAllDuplicates: 3
removeAllDuplicates: 4
share(replay:)
与 Publisher.share
相似,.share(replay:)
可以用来创建具有引用语义的发布者实例,该实例将预定义数量的值事件回放给后续订阅者。
let subject = PassthroughSubject<Int, Never>()
let replayedPublisher = subject
.share(replay: 3)
subscription1 = replayedPublisher
.sink(receiveValue: { print("first subscriber: \($0)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subscription2 = replayedPublisher
.sink(receiveValue: { print("second subscriber: \($0)") })
输出
first subscriber: 1
first subscriber: 2
first subscriber: 3
first subscriber: 4
second subscriber: 2
second subscriber: 3
second subscriber: 4
前缀(duration:)
Publisher.prefix
上的重载函数,在指定的 duration
(以秒为单位)期间重新发布值,然后结束。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.prefix(duration: 0.5, on: DispatchQueue.main)
.sink(receiveValue: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
subject.send(4)
}
输出
1
2
3
前缀(while:behavior:)
Publisher.prefix(while:)
上允许包含第一个不满足 while
断言的元素的重载函数。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.prefix(
while: { $0 % 2 == 0 },
behavior: .inclusive
)
.sink(
receivecompletion: { print($0) },
receiveValue: { print($0) }
)
subject.send(0)
subject.send(2)
subject.send(4)
subject.send(5)
0
2
4
5
finished
toggle()
切换一个发布者集合中每个布尔元素的值。
let subject = PassthroughSubject<Bool, Never>()
subscription = subject
.toggle()
.sink(receiveValue: { print($0) })
subject.send(true)
subject.send(false)
subject.send(true)
输出
false
true
false
nwise
将源发布者的元素分组为N个连续元素的数组。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.nwise(3)
.sink(receiveValue: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)
输出
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
成对
将源发布者的元素分组为前一个和当前元素的元组。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.pairwise()
.sink(receiveValue: { print("\($0.0) -> \($0.1)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)
输出
1 -> 2
2 -> 3
3 -> 4
4 -> 5
忽略输出(设置输出类型:)
同时忽略发布者的价值事件并重写其 输出
泛型。
let onlyAFour = ["1", "2", "3"].publisher
.ignoreOutput(setOutputType: Int.self)
.append(4)
忽略失败
CombineExt提供的几个重载用于忽略错误,并可选择指定新的错误类型以及在这种情况下是否触发完成。
忽略失败(立即完成:)
忽略失败(设置失败类型:立即完成:)
enum AnError {
case someError
}
let subject = PassthroughSubject<Int, AnError>()
subscription = subject
.ignoreFailure() // The `completeImmediately` parameter defaults to `true`.
.sink(receiveValue: { print($0) }, receiveCompletion: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))
输出
1
2
3
.finished
mapToResult
将类型为 AnyPublisher<Output, Failure>
的发布者转换为 AnyPublisher<Result<Output, Failure>, Never>
enum AnError: Error {
case someError
}
let subject = PassthroughSubject<Int, AnError>()
let subscription = subject
.mapToResult()
.sink(receiveCompletion: { print("completion: \($0)") },
receiveValue: { print("value: \($0)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))
Output
value: success(1)
value: success(2)
value: success(3)
value: failure(AnError.someError)
completion: finished
flatMapBatches(of:)
`Collection.flatMapBatches(of:)` 以批量的形式订阅接收者的内容发布者,并按批量返回它们的输出(同时保持顺序)。只有当之前的批量成功完成后,才会订阅后续的发布者批量 —— 任何失败都会向下传递。
let ints = (1...6).map(Just.init)
subscription = ints
.flatMapBatches(of: 2)
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
Output
[1, 2]
[3, 4]
[5, 6]
.finished
Publishers
本节概述了CombineExt提供的一些自定义Combine发布者
AnyPublisher.create
一个发布者,它接受一个具有订阅者参数的闭包,您可以向其中动态发送值或完成事件。
这使您能够轻松创建自定义发布者来包装任何非发布者异步工作,同时仍然尊重下游消费者对背压的需求。
您应该从闭包返回一个符合 Cancellable
的对象,您可以在此对象中定义当发布者完成或取消对发布者的订阅时要执行的任何清理操作。
AnyPublisher<String, MyError>.create { subscriber in
// Values
subscriber.send("Hello")
subscriber.send("World!")
// Complete with error
subscriber.send(completion: .failure(MyError.someError))
// Or, complete successfully
subscriber.send(completion: .finished)
return AnyCancellable {
// Perform cleanup
}
}
您还可以使用具有相同签名的 AnyPublisher
初始化器
AnyPublisher<String, MyError> { subscriber in
/// ...
return AnyCancellable { }
当前值中继
当前值中继
与当前值主题
相同,但有两个主要区别
- 它只接受值,但不接受完成事件,这意味着它不会失败。
- 它仅在分配时发布
.finished
事件。
let relay = CurrentValueRelay<String>("well...")
relay.sink(receiveValue: { print($0) }) // replays current value, e.g. "well..."
relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")
输出
well...
values
only
provide
great
guarantees
透传中继
透传中继
与透传主题
相同,但有二个主要区别
- 它只接受值,但不接受完成事件,这意味着它不会失败。
- 它仅在分配时发布
.finished
事件。
let relay = PassthroughRelay<String>()
relay.accept("well...")
relay.sink(receiveValue: { print($0) }) // does not replay past value(s)
relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")
输出
values
only
provide
great
guarantees
主题
重放主题
Combine的Rx的重放主题
类型。它与当前值主题
相似,因为它会缓存值,但它更进一步,允许消费者指定要缓冲和回放给未来订阅者的值的数量。此外,当缓冲区清除时,它还将处理所有完成事件的前传。
let subject = ReplaySubject<Int, Never>(bufferSize: 3)
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject
.sink(receiveValue: { print($0) })
subject.send(5)
输出
2
3
4
5
许可协议
当然是MIT许可证;-) 请参阅LICENSE文件。
苹果标志和Combine框架是苹果公司的财产。