RxSwiftExt 6.2.1

RxSwiftExt 6.2.1

测试已测试
语言语言 SwiftSwift
许可 MIT
发布最后发布2023年9月
SPM支持 SPM

Florent PilletRxSwift Community 维护。



  • RxSwiftCommunity

CircleCI pod Carthage compatible

RxSwiftExt

如果您正在使用 RxSwift,您可能遇到过内置操作符无法提供您所需的确切功能的情况。RxSwift 核心被有意保持尽可能紧凑,以避免臃肿。此仓库的目的是提供额外的便利操作符和响应式扩展。

安装

RxSwiftExt 的此分支针对 Swift 5.x 和 RxSwift 5.0.0 或更高版本。

  • 如果您正在寻找 RxSwiftExt 的 Swift 4 版本,请使用框架版本 3.4.0

CocoaPods

添加到您的 Podfile

pod 'RxSwiftExt', '~> 5'

这将安装 RxSwiftRxCocoa 扩展。如果您只想安装 RxSwift 扩展,而不安装 RxCocoa 扩展,只需使用以下方式

pod 'RxSwiftExt/Core'

使用 Swift 4

pod 'RxSwiftExt', '~> 3'

Carthage

将此添加到您的 Cartfile

github "RxSwiftCommunity/RxSwiftExt"

运算子

RxSwiftExt 主要关于为RxSwift添加运算子和响应式扩展!

运算子

这些运算子与 RxSwift & RxCocoa 核心运算子非常相似,但为您的 Rx 工具库提供了额外的实用功能。

还有两个额外的运算子可供使用于 materialize() 序列

下文将详细介绍每个运算子的细节。

响应式扩展

RxSwift/RxCocoa 响应式扩展旨在通过响应式能力增强 Apple-生态系统的现有对象和类。


运算子详情

unwrap

取消可选类型的包装并过滤掉 nil 值。

  Observable.of(1,2,nil,Int?(4))
    .unwrap()
    .subscribe { print($0) }
next(1)
next(2)
next(4)

忽略

忽略特定元素。

  Observable.from(["One","Two","Three"])
    .ignore("Two")
    .subscribe { print($0) }
next(One)
next(Three)
completed

ignoreWhen

根据闭包忽略元素。

  Observable<Int>
    .of(1,2,3,4,5,6)
    .ignoreWhen { $0 > 2 && $0 < 6 }
    .subscribe { print($0) }
next(1)
next(2)
next(6)
completed

once

将下一个元素恰好发送给第一个接受者。后续的订阅者将收到一个空序列。

  let obs = Observable.once("Hello world")
  print("First")
  obs.subscribe { print($0) }
  print("Second")
  obs.subscribe { print($0) }
First
next(Hello world)
completed
Second
completed

distinct

仅在元素在序列中之前从未见到过时才传递元素。

Observable.of("a","b","a","c","b","a","d")
    .distinct()
    .subscribe { print($0) }
next(a)
next(b)
next(c)
next(d)
completed

mapTo

将每个元素替换为提供的值。

Observable.of(1,2,3)
    .mapTo("Nope.")
    .subscribe { print($0) }
next(Nope.)
next(Nope.)
next(Nope.)
completed

mapAt

将每个元素转换为提供的键路径处的值。

struct Person {
    let name: String
}

Observable
    .of(
        Person(name: "Bart"),
        Person(name: "Lisa"),
        Person(name: "Maggie")
    )
    .mapAt(\.name)
    .subscribe { print($0) }
next(Bart)
next(Lisa)
next(Maggie)
completed

not

否定布尔值。

Observable.just(false)
    .not()
    .subscribe { print($0) }
next(true)
completed

验证每个发出的值是否为true

Observable.of(true, true)
	.and()
	.subscribe { print($0) }

Observable.of(true, false)
	.and()
	.subscribe { print($0) }

Observable<Bool>.empty()
	.and()
	.subscribe { print($0) }

返回Maybe<Bool>

success(true)
success(false)
completed

级联

依次遍历观察列表,一旦列表中下方的观察者开始发出元素,即取消前面的订阅。

let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
    .subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
next(a:1)
next(a:2)
next(b:1)
next(c:1)
next(c:2)

配对

将Observable发出的元素组合成数组,每个数组包含的最后两个连续项;类似于滑动窗口。

Observable.from([1, 2, 3, 4, 5, 6])
    .pairwise()
    .subscribe { print($0) }
next((1, 2))
next((2, 3))
next((3, 4))
next((4, 5))
next((5, 6))
completed

nwise

将Observable发出的元素组合成数组,每个数组包含的最后N个连续项;类似于滑动窗口。

Observable.from([1, 2, 3, 4, 5, 6])
    .nwise(3)
    .subscribe { print($0) }
next([1, 2, 3])
next([2, 3, 4])
next([3, 4, 5])
next([4, 5, 6])
completed

重试

在出错或成功终止之前,重复源可观察对象序列使用给定的行为。有四种行为,包括各种谓词和延迟选项:立即延迟指数延迟自定义计时器延迟

// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
    }, onError: { error in
        print("Receive error: \(error)")
    })
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError

带行为重复

在序列完成后,使用给定的行为重复源可观察对象序列。此运算符接受与重试运算符相同的参数。有四种行为,包括各种谓词和延迟选项:立即延迟指数延迟自定义计时器延迟

// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second

catchErrorJustComplete

当发生错误时完成一个序列,并撤销错误状态

let _ = sampleObservable
    .do(onError: { print("Source observable emitted error \($0), ignoring it") })
    .catchErrorJustComplete()
    .subscribe {
        print ("\($0)")
}
next(First)
next(Second)
Source observable emitted error fatalError, ignoring it
completed

pausable

除非第二个可观察序列的最新元素为 true,否则暂停源可观察序列的元素。

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()

let pausedObservable = observable.pausable(pauser)

let _ = pausedObservable
    .subscribe { print($0) }
next(2)
next(3)

更多示例可在项目的 Playground 中找到。

pausableBuffered

除非第二个可观察序列的最新元素为 true,否则暂停源可观察序列的元素。源可观察序列发出的元素被缓冲(可配置限制)并在可观察序列恢复时“刷新”(重新发出)。

有关示例,请参阅项目中的 Playground。

apply

apply提供了一种统一机制来应用于可观察序列的转换,而无需扩展ObservableType或重复您的转换。有关更多信息,请参阅github上的讨论

// An ordinary function that applies some operators to its argument, and returns the resulting Observable
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
    return request.retry(maxAttempts)
        .do(onNext: sideEffect)
        .map { Response.success }
        .catchError { error in Observable.just(parseRequestError(error: error)) }

// We can apply the function in the apply operator, which preserves the chaining style of invoking Rx operators
let resilientRequest = request.apply(requestPolicy)

filterMap

Rx中一个常用的模式是过滤掉一些值,然后将剩下的映射到其他地方。filterMap 允许您一步完成此操作

// keep only even numbers and double them
Observable.of(1,2,3,4,5,6)
	.filterMap { number in
		(number % 2 == 0) ? .ignore : .map(number * 2)
	}

上面的序列保留了偶数 2、4、6 并产生序列 4、8、12。

errors,elements

这些运算符仅适用于使用 materialize() 运算符(来自 RxSwift 核心库)实现了物化的可观察序列。`errors` 返回一个经过筛选的错误事件序列,忽略元素。`elements` 返回一个经过筛选的元素事件序列,忽略错误。

let imageResult = _chooseImageButtonPressed.asObservable()
    .flatMap { imageReceiver.image.materialize() }
    .share()

let image = imageResult
    .elements()
    .asDriver(onErrorDriveWith: .never())

let errorMessage = imageResult
    .errors()
    .map(mapErrorMessages)
    .unwrap()
    .asDriver(onErrorDriveWith: .never())

fromAsync

将简单的异步完成处理程序转换为可观察序列。适用于仅使用一个参数调用完成处理程序的现有异步服务。发出完成处理程序产生的结果然后完成。

func someAsynchronousService(arg1: String, arg2: Int, completionHandler:(String) -> Void) {
    // a service that asynchronously calls
	// the given completionHandler
}

let observableService = Observable
    .fromAsync(someAsynchronousService)

observableService("Foo", 0)
    .subscribe(onNext: { (result) in
        print(result)
    })
    .disposed(by: disposeBag)

zip(with:)

Observable.zip(_:) 的便捷版本。通过选择函数将指定的可观察序列合并到一个可观察序列中,当所有可观察序列在相应索引处产生元素时执行。

let first = Observable.from(numbers)
let second = Observable.from(strings)

first.zip(with: second) { i, s in
        s + String(i)
    }.subscribe(onNext: { (result) in
        print(result)
    })
next("a1")
next("b2")
next("c3")

merge(with:)

Observable.merge(_:) 的便捷版本。将可观察序列的元素合并到不同可观察序列的元素中,形成一个单一的观察序列。

let oddStream = Observable.of(1, 3, 5)
let evenStream = Observable.of(2, 4, 6)
let otherStream = Observable.of(1, 5, 6)

oddStream.merge(with: evenStream, otherStream)
    .subscribe(onNext: { result in
        print(result)
    })
1 2 1 3 4 5 5 6 6

ofType

当提供的类型是该类型的实例时,ofType运算符过滤可观察序列的元素。

Observable.of(NSNumber(value: 1),
                  NSDecimalNumber(string: "2"),
                  NSNumber(value: 3),
                  NSNumber(value: 4),
                  NSDecimalNumber(string: "5"),
                  NSNumber(value: 6))
        .ofType(NSDecimalNumber.self)
        .subscribe { print($0) }
next(2)
next(5)
completed

此示例发出 2, 5(NSDecimalNumber 类型)。

count

Observable 在无错误终止时,返回发出的项的数量。如果提供了谓词,则仅计数匹配谓词的元素。

Observable.from([1, 2, 3, 4, 5, 6])
    .count { $0 % 2 == 0 }
    .subscribe()
next(3)
completed

partition

将流分割成两个单独的流,一个匹配提供的谓词,另一个不匹配。

let numbers = Observable
        .of(1, 2, 3, 4, 5, 6)

    let (evens, odds) = numbers.partition { $0 % 2 == 0 }

    _ = evens.debug("even").subscribe() // emits 2, 4, 6
    _ = odds.debug("odds").subscribe() // emits 1, 3, 5

bufferWithTrigger

收集源 Observable 的元素,在触发器发出时将它们作为数组发出。

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let signalAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in () }
let signalAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in () }
let trigger = Observable.of(signalAtThreeSeconds, signalAtFiveSeconds).merge()
let buffered = observable.bufferWithTrigger(trigger)
buffered.subscribe { print($0) }
// prints next([0, 1, 2]) @ 3, next([3, 4]) @ 5

在 PlayGround 中提供实时演示。

响应式扩展详情

UIViewPropertyAnimator.animate

animate(afterDelay:) 操作符提供一个 Completable,在订阅时触发动画,并在动画结束时完成。

button.rx.tap
    .flatMap {
        animator1.rx.animate()
            .andThen(animator2.rx.animate(afterDelay: 0.15))
            .andThen(animator3.rx.animate(afterDelay: 0.1))
    }

UIViewPropertyAnimator.fractionComplete

fractionComplete 绑定器提供了一种响应式方式来绑定到 UIViewPropertyAnimator.fractionComplete

slider.rx.value.map(CGFloat.init)
    .bind(to: animator.rx.fractionComplete)

UIScrollView.reachedBottom

reachedBottom 会提供一个序列,每次当 UIScrollView 滚动到页面底部时都会触发,可选项是偏移量。

tableView.rx.reachedBottom(offset: 40)
            .subscribe { print("Reached bottom") }

许可协议

这个库属于 RxSwift Community

RxSwiftExt 可在 MIT 许可下使用。欲了解更多信息,请参阅 LICENSE 文件。