只因为在众多框架中多看了你一眼 RxJava (五) 背压问题引入

背压问题的引入

问题引入

在上一篇文章中谈到了 zip 操作符,其中 Observer 接收到的事件的数量是发送事件数量虽少的 Observable 发送的个数。那么现在有一种情况,如果有一个 Observable1 一直在发送事件,而 Observable2 只发送一个事件,并且两个 Observable 都不调用 onComplete 方法,那么这时 Observer 接收事件的顺序是怎么样的的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
e.onNext(i++);
}
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
}
}).subscribeOn(Schedulers.io());
Observable
.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.d(TAG, s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtil.e(TAG, throwable.getMessage());
}
});

可以看到 Observable1 以机器指令的执行速度循环发送 onNext 事件,Observable2 只发送一个 onNext 事件,并且 Observable1 Observable2 Obserber 分别在各自的线程中执行。

看看内存占用情况。

是的,OOM 了,占用内存飙升到 500+M,报错信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with 360 free bytes and 360B until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)" (recursive case)
W/art: "RxCachedThreadScheduler-1" daemon prio=5 tid=17 Runnable
W/art: | group="main" sCount=0 dsCount=0 obj=0x32c06940 self=0x7fa349b600
W/art: | sysTid=28826 nice=0 cgrp=default sched=0/0 handle=0x7f94aba450
W/art: | state=R schedstat=( 20565055506 41440899 661 ) utm=1941 stm=115 core=4 HZ=100
W/art: | stack=0x7f949b8000-0x7f949ba000 stackSize=1037KB
W/art: | held mutexes= "mutator lock"(shared held)
W/art: at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
W/art: at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:62)
W/art: at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
W/art: at java.util.concurrent.FutureTask.run(FutureTask.java:237)
W/art: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
W/art: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
W/art: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
W/art: at java.lang.Thread.run(Thread.java:760)

为什么会这样呢? 这里先不解释。

现在讲问题简化一下,只使用一个 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
e.onNext(i++);
}
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
LogUtil.d(TAG, integer + "");
}
});

发送端循环发送事件,接收端接收处理事件前延迟 2 秒,并且两段工作在同一个线程中。

似乎很平静,和想象当中的不一样。

但是如果将发送端和接收端放置在不同的线程中会是什么情况呢?

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
e.onNext(i++);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
LogUtil.d(TAG, integer + "");
}
});

发送端在 io 线程,接收端在主线程。

又像脱缰的野马一样,直接 OOM了。

为什么同一线程和不同线程区别这么大呢?这里就涉及到同步和异步的知识。

  • 当接收端和发送端在同一个线程中时,这时是一种同步订阅关系,既然是同步的,那么发送端就必须等接收端处理完一个事件之后才可以去发送下一个事件。

  • 当接收端和发送端在不同线程中时,这时是一种异步订阅关系,此时两个线程不能直接进行通讯,所以有一个异步的「缓存池」用于缓存接收端来不及处理的发送端发送的数据,因此当发送端发送速度太快,接收端取出事件处理速度太慢,缓存池就会阻塞溢出,最后导致 OOM。

同步订阅和异步订阅的区别在于是否有「缓存池」,那么到这里问题的源头也知道了,只要有缓存池就会出现发送端和接收端速度不平衡的情况。那么如何解决这种问题呢?

总结一下两种订阅关系:

到这里对「背压」就有了一个直观的了解了,可以这么理解:

被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,从而操作消息的阻塞现象。

如何解决阻塞

首先,我们分析阻塞形成的原因,无非是因为下面的原因啊:

  1. 上游的水流过快(发送端发送事件过快)
  2. 上游的水流过大(接收端发送事件过多)

总结来说就是短时间发送的事件过多,接收端忙不过来!

那么先使用第一种方法,让事件发送的顺序慢一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
Thread.sleep(2000);
e.onNext(i++);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.d(TAG, integer + "");
}
});

还是看看日志和内存

稳稳地,没毛病。

再试试第二种方法,下游少接收一点事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
e.onNext(i++);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 100 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.d(TAG, integer + "");
}
});

这里只接收 100 的整数倍事件,看看日志:

这种情况,内存还是会爆掉,但是速度会相对来说慢一点。

那么在这里,对文章开头的 zip 操作符的例子,就可以使用上述两个方法去解决,这里就不写出代码了。

上面唠唠叨叨说了那么多,基本上也给是阐明了阻塞形成的原因和解决阻塞的方法,基本策略就是减少发送事件的频率和减少发送事件的数量。

But……

我们手动让上游发送事件的速度满下来貌似是不可取的,你想让上游的速度是多快呢?上游需要等多久呢?

还有……

我们依旧无法知道下游处理事件的能力,无法很好地处理阻塞的事件。

所以这个时候,RxJava2 很好的支持了背压,对阻塞进行了比较好的处理。

下一篇详细讲解 RxJava2.0 中的背压策略。

共82.3k字
0%
.gt-container a{border-bottom: none;}