只因为在众多框架中多看了你一眼 RxJava (六) 背压策略

理解 RxJava2 中的被压策略

文中措辞 被观察者 <=> 上游; 观察者 <=> 下游 是等价的。

宏观上解决流速不匹配的问题

通过上一篇,我们知道,阻塞是因为上游发送事件的流速和下游接收处理事件的流速不匹配造成的(一般是 上游流速 > 下游流速),那么要解决这个问题,「要么减少上游发送事件的数量」,「要么减少上游发送事件的频率」,但是我们又不能手动实现这一点,因为我们不知道上游道理应该发送多少事件和发送事件的速度,并且也不知道下游处理消耗事件的能力,所以简单的通过控制上游发送事件的数量和速度是不可行的。

那么我们这里抛开代码实现,从宏观上谈一下如何解决这个问题。

要解决阻塞,控制流速,需要上下游协同合作。比如说:下游告诉上游,我可以处理 20 个事件(响应式拉取),那么上游就发送 20 个事件(反馈控制),当下游处理完了之后,下游又告诉上游,我还可以再处理 30 个事件,那么上游又发送 30 个事件过来,我们知道,异步订阅,上游发送的事件是放在一个 缓存池 中的, 而下游直接从缓存池中去取,那么如果上游发送的事件个数大于下游的处理个数,缓存池就会有溢出的风险,所以在这里也得有一个方法来解决。

这种方法就类似于 Universal Image Loader 中提供的各种缓存策略,内存缓存的大小有限,所以用内存做缓存时要处理好当接近内存大小临界值时,如何丢弃现有的缓存,存入新的缓存?是最近最少使用的丢弃?还是暂用空间最小的丢弃?还是最先缓存的丢弃?这里就涉及到各种策略。RxJava2.0 中也提供了好几种策略,后面详细说。

通过上面描述的方案,就可以有效的解决上下游流速不匹配导致的阻塞问题。

总的来说:

  1. 要尽量避免出现流速不匹配。
  2. 当已经出现流速不匹配了,应该采取策略。

下面谈谈 RxJava2 中的背压策略。

背压策略

定义

一种控制事件流速的方法。

作用

异步事件订阅 中,控制事件发送的速度和接收的速度。

解决的问题

解决了 异步订阅中 因被观察者发送事件速度 与 观察者接收事件速度不匹配(一般是前者 快于 后者),从而导致观察者无法及时响应 / 处理所有被观察者发送事件的问题

原理

示意图:

与 Rxjava1 中被观察者的旧实现 Observable 相比:

图中出现了一个新的类, Flowable ,它就是 RxJava2.0 中被观察者的一种新的实现,同时也是背压策略的承载者,下面就来看看如何使用它。

背压策略的实现:Flowable

Flowable 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() { //1.ObservableOnSubscribe -> FlowableOnSubscribe
@Override //2.ObservableEmitter -> FlowableEmitter
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "emitter 1");
e.onNext(1);
LogUtil.d(TAG, "emitter 2");
e.onNext(2);
LogUtil.d(TAG, "emitter 3");
e.onNext(3);
LogUtil.d(TAG, "emitter complete");
e.onComplete();
}
}, BackpressureStrategy.ERROR) //3.背压策略
.subscribe(new Subscriber<Integer>() {
@Override //4.onSubscribe(Disposable d) -> onSubscribe(Subscription s)
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);//5. 向上游请求的元素,响应式拉取
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

日志:

1
2
3
4
5
6
7
8
9
10
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onSubscribe
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 1
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=1
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 2
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=2
02-25 10:09:11.591 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 3
02-25 10:09:11.592 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=3
02-25 10:09:11.592 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter complete
02-25 10:09:11.592 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onComplete

代码中已经注释出了 Flowable 相对于 Observable 使用的不同之处,这里总结一下

  1. ObservableOnSubscribe -> FlowableOnSubscribe
  2. ObservableEmitter -> FlowableEmitter
  3. 背压策略
  4. onSubscribe(Disposable d) -> onSubscribe(Subscription s)
  5. s.request(Long.MAX_VALUE);// 向被观察者请求的元素,响应式拉取

下面针对这几点不同做讲解。

Subscription.request 控制观察者的接收速度

我们从 request 方法开始看看 Flowable 的使用。

现在将19行的 s.request(Long.MAX_VALUE) 删掉,在看日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
02-25 10:03:53.949 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onSubscribe
02-25 10:03:53.949 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 1
02-25 10:03:53.950 22450-22450/com.rengwuxian.rxjavasamples W/===RxJavaSample==: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
at com.rengwuxian.rxjavasamples.Test$2$override.subscribe(Test.java:91)
at com.rengwuxian.rxjavasamples.Test$2$override.access$dispatch(Test.java)
at com.rengwuxian.rxjavasamples.Test$2.subscribe(Test.java:0)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:12970)
at io.reactivex.Flowable.subscribe(Flowable.java:12920)
at com.rengwuxian.rxjavasamples.Test$override.onClick(Test.java:100)
at com.rengwuxian.rxjavasamples.Test$override.access$dispatch(Test.java)
at com.rengwuxian.rxjavasamples.Test.onClick(Test.java:0)
at com.rengwuxian.rxjavasamples.Test_ViewBinding$1.doClick(Test_ViewBinding.java:37)
at butterknife.internal.DebouncingOnClickListener.onClick(DebouncingOnClickListener.java:22)
at android.view.View.performClick(View.java:5619)
at android.view.View$PerformClick.run(View.java:22295)
at android.os.Handler.handleCallback(Handler.java:754)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:163)
at android.app.ActivityThread.main(ActivityThread.java:6342)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:880)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:770)
02-25 10:03:53.950 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 2
02-25 10:03:53.950 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 3
02-25 10:03:53.950 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter complete

在上游发送了第一个事件之后,下游就抛出了 MissingBackpressureException 异常,但是根据上一篇的内容,这里是同步订阅关系,上游要等下游处理完事件之后才回去发送下一个事件,所以应该不会阻塞,所以这里不应该会抛出异常啊!但事实还是抛出了异常,为什么呢?带着这个疑问,我们仍然将 19 行 s.request(Long.MAX_VALUE) 删掉并且将上下游放到不同的线程中再看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "emitter 1");
e.onNext(1);
LogUtil.d(TAG, "emitter 2");
e.onNext(2);
LogUtil.d(TAG, "emitter 3");
e.onNext(3);
LogUtil.d(TAG, "emitter complete");
e.onComplete();
}
}, BackpressureStrategy.ERROR) //策略
.subscribeOn(Schedulers.io())//上游io线程
.observeOn(AndroidSchedulers.mainThread())//下游主线程
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

日志:

1
2
3
4
5
6
02-25 10:13:13.942 22450-22450/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onSubscribe
02-25 10:13:13.943 22450-29562/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 1
02-25 10:13:13.944 22450-29562/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 2
02-25 10:13:13.944 22450-29562/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter 3
02-25 10:13:13.944 22450-29562/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== emitter complete

只能看到上游发送的事件,但是下游并没有获取到事件,这又是为什么?

这是因为 Flowable 采用的是一种响应式拉取的方式,上游需要根据下游的能力去发送事件。
第 19 行的 s.request(Long.MAX_VALUE) 就相当于下游告知上游其本身的接收能力

那么现在就可以解释上面代码的现象了。

  • 当同步的订阅时,上游发送出一个事件后就抛出 MissingBackpressureException 异常,这是因为下游没有调用 request 方法,也就是说下游没有告知上游其处理事件的能力,那么上游就认为下游没有处理事件的能力,又因为是同步的,上游是需要等待下游处理完事件之后才发送下一个事件,现在下游没有处理事件的能力,那上游不能一直等着吧?所以就抛出异常咯。

  • 当异步订阅的时候,会存在一个缓存池,上游发送的事件都放到这个缓存池里面去了,然后下游根据其能力 (request 方法的参数) 从缓存池里取出事件。然而此时并没有调用 request 方法,所以我们只看得到上游发送事件的日志,看不到下游响应事件的日志。但是一旦下游调用 request 时,就会从缓存池中去取出事件,是不是这样呢? 看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@OnClick({R.id.id_btn_request, R.id.id_btn_emitter})
void onClick(View view) {
if (view.getId() == R.id.id_btn_emitter) {
LogUtil.d(TAG,"emitter event");
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "emitter 1");
e.onNext(1);
LogUtil.d(TAG, "emitter 2");
e.onNext(2);
LogUtil.d(TAG, "emitter 3");
e.onNext(3);
LogUtil.d(TAG, "emitter complete");
e.onComplete();
}
}, BackpressureStrategy.ERROR) //策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
mSubscription=s;
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});
} else if (view.getId() == R.id.id_btn_request) {
if (mSubscription != null) {
LogUtil.d(TAG,"request invoked");
mSubscription.request(1);
} else {
LogUtil.d(TAG, "mSubscription is null");
}
}
}

可以看到,这里我们设置了两个按钮,当点击 btnEmitter 时会发送事件,并且在 onSubscribe 回调中将 Subscription 保存起来,然后当点击 btnRequest 按钮时,会调用 mSubscription.request(1)。这里我们点击 btnEmitter 一下,然后点击 btnRequest 3 下,看看日志:

ok,验证了,确实是这样。

之前我们说到,上游会将事件发送到缓存池里,然后下游根据其能力从缓存池中取出事件,那么这个缓存池有多大呢? 128 !从上游发送 128 个事件和 129 个事件,分别观察日志即可证明,这里就不在写出代码。

到这里,下游已经可以通过 request 控制处理事件的能力了,但是上游还得获取这一信息呀,上游根据这一信息来控制其本身发送事件的速度,怎么获取呢?

FlowableEmitter.requested 控制被观察者发送事件的速度

FlowableEmitter 的 requested 方法的返回值就是当前线程中,下游 subscription 的 request(n) 的 n 值,那么很显然,这里就分为同步订阅关系和异步订阅关系。

同步订阅情况下

image

看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 调用emitter.requested()获取当前观察者需要接收的事件数量
long n = emitter.requested();
Log.d(TAG, "观察者可接收事件" + n);
// 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件
for (int i = 0; i < n; i++) {
Log.d(TAG, "发送了事件" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 设置观察者每次能接受10个事件
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
02-26 10:45:10.844 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: onSubscribe
02-26 10:45:10.845 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 观察者可接收事件10
02-26 10:45:10.845 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件0
02-26 10:45:10.845 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件0
02-26 10:45:10.845 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件1
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件1
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件2
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件2
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件3
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件3
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件4
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件4
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件5
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件5
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件6
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件6
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件7
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件7
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件8
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件8
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件9
02-26 10:45:10.846 23613-23613/com.rengwuxian.rxjavasamples D/==Test==: 接收到了事件9

并且在同步订阅当中使用 requested 时,有三个特性需要注意。

1. 可叠加性:下游可以连续调用 request,上游会进行叠加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
long requested = e.requested();
LogUtil.d(TAG, "subscribe requested=" + requested);
}
}, BackpressureStrategy.ERROR) //策略
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
s.request(10);
s.request(20);
s.request(30);
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

日志:

2. 实时更新性:每次发送事件后,emitter.requested() 会实时更新观察者能接受的事件

仅计算 onNext 事件,onComplete onError事件不算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
long requested = e.requested();
LogUtil.d(TAG, "没有发送事件前, requested=" + requested);
e.onNext(1);
requested = e.requested();
LogUtil.d(TAG, "发送第一个事件之后, requested=" + requested);
e.onNext(2);
requested = e.requested();
LogUtil.d(TAG, "发送第二个事件之后, requested=" + requested);
e.onNext(3);
requested = e.requested();
LogUtil.d(TAG, "发送第三个事件之后, requested=" + requested);
e.onNext(4);
requested = e.requested();
LogUtil.d(TAG, "发送第四个事件之后, requested=" + requested);
e.onNext(5);
requested = e.requested();
LogUtil.d(TAG, "发送第五个事件之后, requested=" + requested);
}
}, BackpressureStrategy.ERROR) //策略
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
s.request(10);
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

日志:

3. 异常

当 FlowableEmitter.requested() 减到 0 时,则代表观察者已经不可接收事件,此时被观察者若继续发送事件,则会抛出MissingBackpressureException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
long requested = e.requested();
LogUtil.d(TAG, "没有发送事件前, requested=" + requested);
e.onNext(1);
requested = e.requested();
LogUtil.d(TAG, "发送第一个事件之后, requested=" + requested);
e.onNext(2);
requested = e.requested();
LogUtil.d(TAG, "发送第二个事件之后, requested=" + requested);
e.onNext(3);
requested = e.requested();
LogUtil.d(TAG, "发送第三个事件之后, requested=" + requested);
e.onNext(4);
requested = e.requested();
LogUtil.d(TAG, "发送第四个事件之后, requested=" + requested);
}
}, BackpressureStrategy.ERROR) //策略
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

这里下游只能处理 3 个事件,但是上游却发出了 4 个,那么看看发出第四个事件时,会发生什么情况:

抛出异常。

额外的一点,当下游没有调用 request 方法时,此时上游 requested 的返回值为 0 。

异步订阅情况下

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
long requested = e.requested();
LogUtil.d(TAG, "没有发送事件前, requested=" + requested);
}
}, BackpressureStrategy.ERROR) //策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});

日志:

这里我们可以看到,下游调用 request 表明自己处理事件的能力为 3 ,但是上游 requested 方法返回的值仍然是 128;

所以,当上下游二者处于不同线程,上游无法通过 FlowableEmitter.requested() 知道下游自身接收事件能力,即 上游不能根据 下游自身接收事件的能力 控制发送事件的速度。

而在异步订阅关系中,反向控制的原理是:通过 RxJava 内部固定调用被观察者线程中的 request(n) 从而 反向控制被观察者的发送事件速度.

那么什么时候在 RxJava 内部调用 request(n),并且 n 等于多少呢?

看下图:

image

究竟是不是这样呢,还是要上代码来验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Flowable //Observable --> Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "观察者可接收事件数量 = " + e.requested());
boolean flag; //设置标记位控制
// 被观察者一共需要发送500个事件
for (int i = 1; i < 501; i++) {
flag = false;
// 若requested() == 0则不发送
while (e.requested() == 0) {
if (!flag) {
Log.d(TAG, "不再发送");
flag = true;
}
}
// requested() ≠ 0 才发送
e.onNext(i);
Log.d(TAG, "发送了事件" + i + ",观察者可接收事件数量 = " + e.requested());
}
}
}, BackpressureStrategy.ERROR) //策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
LogUtil.d(TAG, "onNext integer=" + integer);
}
@Override
public void onError(Throwable t) {
LogUtil.w(TAG, t);
}
@Override
public void onComplete() {
LogUtil.d(TAG, "onComplete");
}
});
@OnClick(R.id.id_btn_request)
void onClick(View view) {
mSubscription.request(48);
}
  • 程序启动:
1
2
3
4
5
6
7
8
9
10
11
12
02-25 16:38:53.177 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onSubscribe
02-25 16:38:53.179 16487-16512/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== 观察者可接收事件数量 = 128
02-25 16:38:53.180 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件1,观察者可接收事件数量 = 127
02-25 16:38:53.180 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件2,观察者可接收事件数量 = 126
02-25 16:38:53.180 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件3,观察者可接收事件数量 = 125
02-25 16:38:53.180 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件4,观察者可接收事件数量 = 124
....
02-25 16:38:53.186 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件126,观察者可接收事件数量 = 2
02-25 16:38:53.186 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件127,观察者可接收事件数量 = 1
02-25 16:38:53.186 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件128,观察者可接收事件数量 = 0
02-25 16:38:53.186 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 不再发送

上游发送事件 1~128,下游没有接收任何事件

  • 第一次点击按钮调用 request(48):
1
2
3
4
5
6
7
8
9
10
2-25 16:39:04.089 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=1
02-25 16:39:04.089 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=2
02-25 16:39:04.089 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=3
02-25 16:39:04.089 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=4
02-25 16:39:04.089 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=5
...
02-25 16:39:04.091 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=46
02-25 16:39:04.091 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=47
02-25 16:39:04.091 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=48

从缓存池中取出 48 个事件(1~48),且上游没有重新发送事件,此时下游观察者接收事件 n=48

  • 第二次点击按钮调用 request(48):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
02-25 16:39:25.661 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=49
02-25 16:39:25.661 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=50
02-25 16:39:25.661 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=51
02-25 16:39:25.661 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=52
...
02-25 16:39:25.663 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=94
02-25 16:39:25.663 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=95
02-25 16:39:25.663 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=96
02-25 16:39:25.663 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件129,观察者可接收事件数量 = 95
02-25 16:39:25.663 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件130,观察者可接收事件数量 = 94
02-25 16:39:25.663 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件131,观察者可接收事件数量 = 93
02-25 16:39:25.663 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件132,观察者可接收事件数量 = 92
...
02-25 16:39:25.667 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件222,观察者可接收事件数量 = 2
02-25 16:39:25.667 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件223,观察者可接收事件数量 = 1
02-25 16:39:25.667 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件224,观察者可接收事件数量 = 0
02-25 16:39:25.667 16487-16512/com.rengwuxian.rxjavasamples D/==Test==: 不再发送

从缓存池中取出 48 个事件(49~96),此时下游观察者接收事件 n=96,满足条件了,所以上游接着发送 96 个事件(129~224)

  • 第三次点击按钮调用 request(48):
1
2
3
4
5
6
7
8
02-25 16:39:55.555 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=97
02-25 16:39:55.555 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=98
02-25 16:39:55.555 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=99
...
02-25 16:39:55.557 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=142
02-25 16:39:55.557 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=143
02-25 16:39:55.557 16487-16487/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=144

从缓存池中取出 48 个事件(97~144),此时下游观察者接收事件 n=48.

  • 第四次点击按钮调用 request(48):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
02-25 17:01:33.844 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=145
02-25 17:01:33.845 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=146
02-25 17:01:33.845 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=147
...
02-25 17:01:33.846 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=190
02-25 17:01:33.846 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=191
02-25 17:01:33.846 25959-25959/com.rengwuxian.rxjavasamples D/===RxJavaSample==: ==Test== onNext integer=192
02-25 17:01:33.846 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件225,观察者可接收事件数量 = 95
02-25 17:01:33.846 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件226,观察者可接收事件数量 = 94
02-25 17:01:33.846 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件227,观察者可接收事件数量 = 93
...
02-25 17:01:33.850 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件318,观察者可接收事件数量 = 2
02-25 17:01:33.850 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件319,观察者可接收事件数量 = 1
02-25 17:01:33.850 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件320,观察者可接收事件数量 = 0
02-25 17:01:33.850 25959-26028/com.rengwuxian.rxjavasamples D/==Test==: 不再发送

从缓存池中取出 48 个事件(145~192),此时下游观察者接收事件 n=96,满足条件了,所以上游接着发送 96 个事件(225~320)

这个情况和第二次点击按钮情况一直。

到这里就可以证明上面图片的正确性了。

到这里,避免上下游流速不匹配的方法就讲完了,下游响应式拉取,上游根据下游的能力控制发送速度。但是万一流速已经不匹配了呢?接下来就是解决这个问题的。

BackpressureStrategy 解决流速已经不匹配的问题

看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}

根据注释,应该就可以很好的理解这几种策略了:

  • MISSING:不丢弃不缓存的策略,给出友好提示 queue is full ?

  • ERROR:直接抛出 MissingBackpressureException 异常。

  • BUFFER:缓冲所有的 onNext 事件,直到下游消费它。就是相当于无限大的缓冲池。

  • DROP:当下游消费不了事件时,将事件直接丢弃

  • LATEST:当下游消费不了事件时,只向下游发送最近的事件

一个一个的展示。

BUFFER

策略:使用大小不受限制的缓存池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 500; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER) //注意这里用的策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

日志:

1
2
3
4
5
6
7
8
9
02-25 17:40:47.655 18477-18477/com.rengwuxian.rxjavasamples D/==Test==: onSubscribe
02-25 17:40:47.658 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 0
02-25 17:40:47.658 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 1
02-25 17:40:47.658 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 2
...
02-25 17:40:47.685 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 497
02-25 17:40:47.685 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 498
02-25 17:40:47.685 18477-18501/com.rengwuxian.rxjavasamples D/==Test==: emit 499

使用无限大缓存池的 Flowable 表现出来的效果好像和 Observable 是一样的。但是单纯这种使用 Flowable 也需要注意 OOM 的情况,比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

日志和内存情况:

这里可以看到,即使使用容量无限大的缓存池,对内存的压力也是特别大的,只是和使用 Observable 比较起来,这里的增长速度是比较慢的。这也看出 FLowable 相比 Observable, 在性能方面有些不足, 毕竟FLowable内部为了实现响应式拉取做了更多的操作, 性能有所丢失也是在所难免, 因此单单只是说因为 FLowable 是新兴产物就盲目的使用也是不对的, 也要具体分场景.

DROP

策略:丢弃超过缓存区大小(128)的事件。
比如发送 150 个事件,仅保存 1-128 个事件,129-150 个事件丢弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 发送150个事件
for (int i = 0;i< 150; i++) {
Log.d(TAG, "发送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.DROP) // 设置背压模式 = BackpressureStrategy.DROP
.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 通过按钮进行接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

看看日志:

下游只能从缓存池中取出前 128 个事件,后面再去取事件时,已经取不出来了,因为后面的时间已经被丢弃了,缓存池里没有其他的事件。

LATEST

策略:只保存最新(最后)的事件,其他的事件丢弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@OnClick({R.id.id_btn_request, R.id.id_btn_emitter})
void onClick(View view) {
if (view.getId() == R.id.id_btn_request && mSubscription != null) {
LogUtils.d(TAG,"Request button clicked");
mSubscription.request(128);
}
if (view.getId() == R.id.id_btn_emitter) {
LogUtils.d(TAG,"Emitter button clicked");
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 发送150个事件
for (int i = 0; i < 150; i++) {
Log.d(TAG, "发送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST) // 设置背压模式 = BackpressureStrategy.DROP
.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 通过按钮进行接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

下游第一次接受到了前 128 个事件, 第二次接收到了 第 150 个事件,其他事件都没有收到。

这里就体现出 DROP 和 LATEST 的区别了。

MISSING

策略和 ERROR 的体现方式类似,只是给出友好提示

这里就不贴出代码只贴出日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
02-26 11:23:20.493 13037-13037/com.rengwuxian.rxjavasamples D/==Test==: onSubscribe
02-26 11:23:20.495 13037-13060/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件0
02-26 11:23:20.496 13037-13060/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件1
...
02-26 11:23:20.501 13037-13060/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件127
02-26 11:23:20.501 13037-13060/com.rengwuxian.rxjavasamples D/==Test==: 发送了事件128
02-26 11:23:20.521 13037-13037/com.rengwuxian.rxjavasamples W/==Test==: onError:
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:114)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onNext(FlowableSubscribeOn.java:97)
at io.reactivex.internal.operators.flowable.FlowableCreate$MissingEmitter.onNext(FlowableCreate.java:338)
at com.rengwuxian.rxjavasamples.Test$2.subscribe(Test.java:52)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:12970)
at io.reactivex.Flowable.subscribe(Flowable.java:12917)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:760)

当 Flowable 不是我们自己创建时

上面例子中红的 Flowable 都是我们自己手动创建的,但是存在部分情况 Flowable 不是右我们自己创建的,比如使用 interval 操作符创建 Flowable , 那么这种情况下当流速不匹配时如何选择背压策略呢?

intetval 操作符:
Interval运算符返回一个 Observable / Flowable,它发出一个升序整数(Long 类型)的无限序列,并在发射之间选择一个固定的时间间隔,默认运行在一个新的线程上。

看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Flowable.interval(1, TimeUnit.MILLISECONDS) // 1秒发送1000个事件
.observeOn(Schedulers.newThread()) // 观察者同样工作在一个新开线程中
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE); //默认可以接收Long.MAX_VALUE个事件
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

日志:

1
2
3
4
5
6
7
8
9
10
11
12
02-26 11:59:29.497 3959-3959/com.rengwuxian.rxjavasamples D/==Test==: onSubscribe
02-26 11:59:29.501 3959-4126/com.rengwuxian.rxjavasamples D/==Test==: onNext: 0
02-26 11:59:30.508 3959-4126/com.rengwuxian.rxjavasamples W/==Test==: onError:
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:760)

这是没有采取背压策略,缓冲区马上被充满,然后抛出 MissingBackpressureException 。

如何解决?

上面抛出的异常是因为 RxJava2.0 在内部已经封装好了策略,默认使用的是 ERROR 策略,还有其他的三种:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

使用 DROP 策略看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Flowable.interval(1, TimeUnit.MILLISECONDS) // 1秒发送1000个事件
.onBackpressureDrop()// 采用 DROP 策略
.observeOn(Schedulers.newThread()) // 观察者同样工作在一个新开线程中
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE); //默认可以接收Long.MAX_VALUE个事件
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

日志:

注意 127 之后的序列,符合 DROP 的策略模式。

其他两种就不做演示了。

总结

共82.3k字
0%
.gt-container a{border-bottom: none;}