RxSwiftExt
如果您正在使用 RxSwift,您可能遇到过内置操作符无法提供您所需的确切功能的情况。RxSwift 核心被有意保持尽可能紧凑,以避免臃肿。此仓库的目的是提供额外的便利操作符和响应式扩展。
安装
RxSwiftExt 的此分支针对 Swift 5.x 和 RxSwift 5.0.0 或更高版本。
- 如果您正在寻找 RxSwiftExt 的 Swift 4 版本,请使用框架版本
3.4.0
。
CocoaPods
添加到您的 Podfile
pod 'RxSwiftExt', '~> 5'
这将安装 RxSwift
和 RxCocoa
扩展。如果您只想安装 RxSwift
扩展,而不安装 RxCocoa
扩展,只需使用以下方式
pod 'RxSwiftExt/Core'
使用 Swift 4
pod 'RxSwiftExt', '~> 3'
Carthage
将此添加到您的 Cartfile
github "RxSwiftCommunity/RxSwiftExt"
运算子
RxSwiftExt 主要关于为RxSwift添加运算子和响应式扩展!
运算子
这些运算子与 RxSwift & RxCocoa 核心运算子非常相似,但为您的 Rx 工具库提供了额外的实用功能。
- unwrap
- ignore
- ignoreWhen
- Observable.once
- distinct
- map
- not
- and
- Observable.cascade
- pairwise
- nwise
- retry
- repeatWithBehavior
- catchErrorJustComplete
- pausalable
- pausalableBuffered
- apply
- filterMap
- Observable.fromAsync
- Observable.zip(with:)
- Observable.merge(with:)
- count
- partition
- bufferWithTrigger
还有两个额外的运算子可供使用于 materialize()
序列
下文将详细介绍每个运算子的细节。
响应式扩展
RxSwift/RxCocoa 响应式扩展旨在通过响应式能力增强 Apple-生态系统的现有对象和类。
运算子详情
unwrap
取消可选类型的包装并过滤掉 nil 值。
Observable.of(1,2,nil,Int?(4))
.unwrap()
.subscribe { print($0) }
next(1)
next(2)
next(4)
忽略
忽略特定元素。
Observable.from(["One","Two","Three"])
.ignore("Two")
.subscribe { print($0) }
next(One)
next(Three)
completed
ignoreWhen
根据闭包忽略元素。
Observable<Int>
.of(1,2,3,4,5,6)
.ignoreWhen { $0 > 2 && $0 < 6 }
.subscribe { print($0) }
next(1)
next(2)
next(6)
completed
once
将下一个元素恰好发送给第一个接受者。后续的订阅者将收到一个空序列。
let obs = Observable.once("Hello world")
print("First")
obs.subscribe { print($0) }
print("Second")
obs.subscribe { print($0) }
First
next(Hello world)
completed
Second
completed
distinct
仅在元素在序列中之前从未见到过时才传递元素。
Observable.of("a","b","a","c","b","a","d")
.distinct()
.subscribe { print($0) }
next(a)
next(b)
next(c)
next(d)
completed
mapTo
将每个元素替换为提供的值。
Observable.of(1,2,3)
.mapTo("Nope.")
.subscribe { print($0) }
next(Nope.)
next(Nope.)
next(Nope.)
completed
mapAt
将每个元素转换为提供的键路径处的值。
struct Person {
let name: String
}
Observable
.of(
Person(name: "Bart"),
Person(name: "Lisa"),
Person(name: "Maggie")
)
.mapAt(\.name)
.subscribe { print($0) }
next(Bart)
next(Lisa)
next(Maggie)
completed
not
否定布尔值。
Observable.just(false)
.not()
.subscribe { print($0) }
next(true)
completed
和
验证每个发出的值是否为true
Observable.of(true, true)
.and()
.subscribe { print($0) }
Observable.of(true, false)
.and()
.subscribe { print($0) }
Observable<Bool>.empty()
.and()
.subscribe { print($0) }
返回Maybe<Bool>
success(true)
success(false)
completed
级联
依次遍历观察列表,一旦列表中下方的观察者开始发出元素,即取消前面的订阅。
let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
.subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
next(a:1)
next(a:2)
next(b:1)
next(c:1)
next(c:2)
配对
将Observable发出的元素组合成数组,每个数组包含的最后两个连续项;类似于滑动窗口。
Observable.from([1, 2, 3, 4, 5, 6])
.pairwise()
.subscribe { print($0) }
next((1, 2))
next((2, 3))
next((3, 4))
next((4, 5))
next((5, 6))
completed
nwise
将Observable发出的元素组合成数组,每个数组包含的最后N个连续项;类似于滑动窗口。
Observable.from([1, 2, 3, 4, 5, 6])
.nwise(3)
.subscribe { print($0) }
next([1, 2, 3])
next([2, 3, 4])
next([3, 4, 5])
next([4, 5, 6])
completed
重试
在出错或成功终止之前,重复源可观察对象序列使用给定的行为。有四种行为,包括各种谓词和延迟选项:立即
、延迟
、指数延迟
和自定义计时器延迟
。
// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
}, onError: { error in
print("Receive error: \(error)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError
带行为重复
在序列完成后,使用给定的行为重复源可观察对象序列。此运算符接受与重试运算符相同的参数。有四种行为,包括各种谓词和延迟选项:立即
、延迟
、指数延迟
和自定义计时器延迟
。
// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
catchErrorJustComplete
当发生错误时完成一个序列,并撤销错误状态
let _ = sampleObservable
.do(onError: { print("Source observable emitted error \($0), ignoring it") })
.catchErrorJustComplete()
.subscribe {
print ("\($0)")
}
next(First)
next(Second)
Source observable emitted error fatalError, ignoring it
completed
pausable
除非第二个可观察序列的最新元素为 true
,否则暂停源可观察序列的元素。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()
let pausedObservable = observable.pausable(pauser)
let _ = pausedObservable
.subscribe { print($0) }
next(2)
next(3)
更多示例可在项目的 Playground 中找到。
pausableBuffered
除非第二个可观察序列的最新元素为 true
,否则暂停源可观察序列的元素。源可观察序列发出的元素被缓冲(可配置限制)并在可观察序列恢复时“刷新”(重新发出)。
有关示例,请参阅项目中的 Playground。
apply
apply提供了一种统一机制来应用于可观察序列的转换,而无需扩展ObservableType或重复您的转换。有关更多信息,请参阅github上的讨论
// An ordinary function that applies some operators to its argument, and returns the resulting Observable
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
return request.retry(maxAttempts)
.do(onNext: sideEffect)
.map { Response.success }
.catchError { error in Observable.just(parseRequestError(error: error)) }
// We can apply the function in the apply operator, which preserves the chaining style of invoking Rx operators
let resilientRequest = request.apply(requestPolicy)
filterMap
Rx中一个常用的模式是过滤掉一些值,然后将剩下的映射到其他地方。filterMap
允许您一步完成此操作
// keep only even numbers and double them
Observable.of(1,2,3,4,5,6)
.filterMap { number in
(number % 2 == 0) ? .ignore : .map(number * 2)
}
上面的序列保留了偶数 2、4、6 并产生序列 4、8、12。
errors,elements
这些运算符仅适用于使用 materialize()
运算符(来自 RxSwift 核心库)实现了物化的可观察序列。`errors` 返回一个经过筛选的错误事件序列,忽略元素。`elements` 返回一个经过筛选的元素事件序列,忽略错误。
let imageResult = _chooseImageButtonPressed.asObservable()
.flatMap { imageReceiver.image.materialize() }
.share()
let image = imageResult
.elements()
.asDriver(onErrorDriveWith: .never())
let errorMessage = imageResult
.errors()
.map(mapErrorMessages)
.unwrap()
.asDriver(onErrorDriveWith: .never())
fromAsync
将简单的异步完成处理程序转换为可观察序列。适用于仅使用一个参数调用完成处理程序的现有异步服务。发出完成处理程序产生的结果然后完成。
func someAsynchronousService(arg1: String, arg2: Int, completionHandler:(String) -> Void) {
// a service that asynchronously calls
// the given completionHandler
}
let observableService = Observable
.fromAsync(someAsynchronousService)
observableService("Foo", 0)
.subscribe(onNext: { (result) in
print(result)
})
.disposed(by: disposeBag)
zip(with:)
是 Observable.zip(_:)
的便捷版本。通过选择函数将指定的可观察序列合并到一个可观察序列中,当所有可观察序列在相应索引处产生元素时执行。
let first = Observable.from(numbers)
let second = Observable.from(strings)
first.zip(with: second) { i, s in
s + String(i)
}.subscribe(onNext: { (result) in
print(result)
})
next("a1")
next("b2")
next("c3")
merge(with:)
是 Observable.merge(_:)
的便捷版本。将可观察序列的元素合并到不同可观察序列的元素中,形成一个单一的观察序列。
let oddStream = Observable.of(1, 3, 5)
let evenStream = Observable.of(2, 4, 6)
let otherStream = Observable.of(1, 5, 6)
oddStream.merge(with: evenStream, otherStream)
.subscribe(onNext: { result in
print(result)
})
1 2 1 3 4 5 5 6 6
ofType
当提供的类型是该类型的实例时,ofType运算符过滤可观察序列的元素。
Observable.of(NSNumber(value: 1),
NSDecimalNumber(string: "2"),
NSNumber(value: 3),
NSNumber(value: 4),
NSDecimalNumber(string: "5"),
NSNumber(value: 6))
.ofType(NSDecimalNumber.self)
.subscribe { print($0) }
next(2)
next(5)
completed
此示例发出 2, 5(NSDecimalNumber 类型)。
count
Observable 在无错误终止时,返回发出的项的数量。如果提供了谓词,则仅计数匹配谓词的元素。
Observable.from([1, 2, 3, 4, 5, 6])
.count { $0 % 2 == 0 }
.subscribe()
next(3)
completed
partition
将流分割成两个单独的流,一个匹配提供的谓词,另一个不匹配。
let numbers = Observable
.of(1, 2, 3, 4, 5, 6)
let (evens, odds) = numbers.partition { $0 % 2 == 0 }
_ = evens.debug("even").subscribe() // emits 2, 4, 6
_ = odds.debug("odds").subscribe() // emits 1, 3, 5
bufferWithTrigger
收集源 Observable 的元素,在触发器发出时将它们作为数组发出。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let signalAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in () }
let signalAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in () }
let trigger = Observable.of(signalAtThreeSeconds, signalAtFiveSeconds).merge()
let buffered = observable.bufferWithTrigger(trigger)
buffered.subscribe { print($0) }
// prints next([0, 1, 2]) @ 3, next([3, 4]) @ 5
在 PlayGround 中提供实时演示。
响应式扩展详情
UIViewPropertyAnimator.animate
animate(afterDelay:)
操作符提供一个 Completable,在订阅时触发动画,并在动画结束时完成。
button.rx.tap
.flatMap {
animator1.rx.animate()
.andThen(animator2.rx.animate(afterDelay: 0.15))
.andThen(animator3.rx.animate(afterDelay: 0.1))
}
UIViewPropertyAnimator.fractionComplete
fractionComplete
绑定器提供了一种响应式方式来绑定到 UIViewPropertyAnimator.fractionComplete
。
slider.rx.value.map(CGFloat.init)
.bind(to: animator.rx.fractionComplete)
UIScrollView.reachedBottom
reachedBottom
会提供一个序列,每次当 UIScrollView
滚动到页面底部时都会触发,可选项是偏移量。
tableView.rx.reachedBottom(offset: 40)
.subscribe { print("Reached bottom") }
许可协议
这个库属于 RxSwift Community。
RxSwiftExt 可在 MIT 许可下使用。欲了解更多信息,请参阅 LICENSE 文件。