只因为在众多框架中多看了你一眼 RxJava (四) RxJava 调度器和操作符

摘要:之前的记录了一下 RxJava 最基本的使用方法,没有涉及到为什么这么多人使用它的具体原因,就是说没体现它的魅力所在嘛, 所以这一篇就记录一下我学习的RxJava 当中的变换操作符(Operator)和调度器(Scheduler),前者可以说是 RxJava 的核心功能之一,也是大多数人使用 RxJava 的主要原因,后者是可以让不同线程之间的代码在一条链路代码中书写,极大简化逻辑。

调度器(Scheduler)

开发过程中经常会碰到这样的需求:子线程中去请求服务器数据,拿到数据之后进行解析,然后回调给主线程中的接口去展示。

这种需求有很多种写法,比如用 AsyncTask ,在 doInBackground 中进行耗时操作,然后在 onPostExecute 当中接受结果,进行处理;或者是直接在主线程中切进行耗时操作,然后通过用 Looper.getMainLooper() 创建的 Handler 将结果发送至主线程去处理。

上面提到的两种方法,主要要解决的问题就是切换线程,因为 Android 中规定耗时操作不能在主线程当中进行,但是 UI 的更新操作又必须在主线程中进行,而 UI 的更新状态往往是需要耗时操作所得到的结果来做支撑的。所以为了解决这一矛盾,Google 官方给了 AsyncTask 和 Handler 两个工具。

RxJava 当然也可以解决上述问题,并且是在同一条链路中,不存在各种接口的回调,起到这个作用的就是 Scheduler,线程调度器。RxJava 通过它来指定每一行代码应该运行在什么样的线程环境,RxJava 当中已经内置了好几种 Scheduler:

Scheduler 的种类

1. Schedulers.newThread()

这个 Scheduler 会创建一个新的线程,并且用这个 Scheduler 指定的代码会在新创建的线程中去执行。

2. Scheduler.io()

这个 Scheduler 适用于一些执行阻塞式 IO 操作的,比如说:读写文件、读写数据库、访问网络等。它在内部是使用 CacheThreadPool 实现的。

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这个线程池没有核心线程数,它会根据需要去创建线程,并且有 60秒 的超时机制。不要用这个 Scheduler 去指定计算操作的运行线程,这样可以避免创建多余的线程。

3. Schedulers.computation()

这个 Scheduler 适合执行 CPU 密集型的操作,比如事件循环,处理回调和其他计算工作。它内部使用的是固定线程数的线程池,大小等于 CPU 核数。不要用它去指定 IO 操作的代码运行线程环境,不然 IO 操作的等待会浪费 CPU。

4. AndroidScheculers.mainThread()

这个 Scheduler 是 Android 独有的, 用它指定的代码会运行在主线程当中。

Scheduler 的使用

有了上述 Scheduler 之后, 就可以使用 subscribeOn() 和 observerOn() 两个方法来指定代码的运行环境了。

这里的代码使用的是 RxJava2 的 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
tv = findViewById(R.id.id_tv);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = getDataFromServer();
if (!TextUtils.isEmpty(result)) {
e.onNext(result);
}
}
})
.subscribeOn(Schedulers.io())//指定subscribe()发生在io线程
.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 的回调发在主线程
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String result) throws Exception {
LogUtil.d(TAG, "[ accept ] " + "result=" + result);
LogUtil.d(TAG, "[ accept ] " + "current thread is " + Thread.currentThread().getName());
tv.setText(result);
}
});
}
public String getDataFromServer() {
LogUtil.d(TAG, "[ getDataFromServer ] " + "current thread is " + Thread.currentThread().getName());
LogUtil.d(TAG, "[ getDataFromServer ] " + "get data from server");
return "Hello RxJava";
}

看看 Log情况:

1
2
3
4
5
D/===RxJavaSample==: TestActivity [ getDataFromServer ] current thread is RxCachedThreadScheduler-1
D/===RxJavaSample==: TestActivity [ getDataFromServer ] get data from server
D/===RxJavaSample==: TestActivity [ accept ] result=Hello RxJava
D/===RxJavaSample==: TestActivity [ accept ] current thread is main

很明显了, Observable 是在子线程中发送事件, 而 Obserber 接收并处理事件是在 主线程中进行的。

总结一下,

  • subscribeOn() 用于指定 Observable 发送事件的线程。
  • obserberOn() 用于指定 Observer 接受并处理事件的线程。

关于两者多次使用的情况,做一下总结:

  • subscribeOn()多次调用,只有第一次有效。
  • observerOn()每调用一次,下面的代码就会切换一次。

举个例子(使用了 lambda 表达式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
tv = findViewById(R.id.id_tv);
Observable.create((ObservableOnSubscribe<String>) e -> {
LogUtil.d(TAG, "[ subscribe ] " + "current thread is " + Thread.currentThread().getName());
String token = getTokenFromServer();
LogUtil.d(TAG, "[ subscribe ] " + "token:" + token);
e.onNext(token);
})
.subscribeOn(Schedulers.io())//第一次指定subscribe()在io线程
.subscribeOn(AndroidSchedulers.mainThread())//第二次指定subscribe()的线程在主线程
.observeOn(Schedulers.io())//指定下面map操作发生在io线程
.map(s -> {
LogUtil.d(TAG, "[ apply ] " + "current thread is " + Thread.currentThread().getName());
String playUrl = getPlayUrl(s);
LogUtil.d(TAG, "[ apply ] " + "play url is " + playUrl);
return playUrl;
})
.observeOn(AndroidSchedulers.mainThread())//指定Observer接受事件是在主线程
.subscribe(s -> {
LogUtil.d(TAG, "[ accept ] " + "current thread is " + Thread.currentThread().getName());
LogUtil.d(TAG, "[ accept ] " + "s=" + s);
tv.setText(s);
});
}
public String getTokenFromServer() {
return "token-123456";
}
public String getPlayUrl(String token) {
return "api.xxxx.com?a=12&b=34&token=" + token;
}

输出结果:

1
2
3
4
5
6
7
01-07 18:45:51.191 12827-12853/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] current thread is RxCachedThreadScheduler-1
01-07 18:45:51.191 12827-12853/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] token:token-123456
01-07 18:45:51.191 12827-12854/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ apply ] current thread is RxCachedThreadScheduler-2
01-07 18:45:51.191 12827-12854/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ apply ] play url is api.xxxx.com?a=12&b=34&token=token-123456
01-07 18:45:51.351 12827-12827/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ accept ] current thread is main
01-07 18:45:51.351 12827-12827/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ accept ] s=api.xxxx.com?a=12&b=34&token=token-123456
  • 可以看到在第二次调用 subscribeOn(AndroidSchedulers.mainThread()) 并没有起作用,拿 token 的操作仍然是在 io 线程执行的。

  • 而第一次调用 observeOn(Schedulers.io()) 之后,后面的 map 操作用 token 去拿 url 地址这个过程是在 io 线程执行的。

  • 在第二次调用 observeOn(AndroidSchedulers.mainThread()) 之后,将 url 地址显示在 TextView 这个过程是在 主线程中执行的。

变换操作符

只因为在众多框架中多看了你一眼 RxJava (二) 从概念上理解 RxJava 这篇中举了一个过滤的例子,那么我理解的 RxJava 的变换操作,就类似于过滤过程,“过滤” 就是这个变换,但是变换不仅仅包含过滤。下面记录一些常用的操作符。

map

map 就是对 Observable 发出的每一个事件,都施加一个函数,使得每一个事件都按照指定的函数去变化,Observer 接收到的事件就是经过函数变化后的。

官方图:

用代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just(1,2,3)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer*10;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
LogUtil.d(TAG,"[accept] " + "after map s:"+s);
}
});

从上面的代码和示意图能看出来, map 操作符是一种一对一的关系

flatMap

这个操作符通过例子来理解:

需求:打印出每一个学生所选修的课程,一个学生不止选修一门课程。

学生和课程的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author: fanyuzeng on 2018/1/8 10:22
*/
public class Student {
private String mName;
private List<Course>mCourses;
//getter setter
}
/**
* @author: fanyuzeng on 2018/1/8 10:35
*/
public class Course {
private String courseName;
//getter setter
}

常规写法:

1
2
3
4
5
6
7
8
9
10
Observable.fromIterable(mStudentList)
.subscribe(new Consumer<Student>() {
@Override
public void accept(@NonNull Student student) throws Exception {
for (Course course : student.getCourses()) {
LogUtil.d(TAG, "[accept] " + course.toString());
}
}
});

mStudentList 中,xiaoming1 选修 yuwen1、yuwen2;xiaoming2 选修 yuwen3、yuwen4;xiaoming3 选修 yuwen5、yuwen6;

上述代码没报名,可以打印出每一个学生的选修课程,但是这对代码的复用性不高,我们希望的是在 Consumer 的 accept() 当中传入的参数就是一个一个的 Course 对象。

这个时候就要靠 flatMap 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.fromIterable(mStudentList)
.flatMap(new Function<Student, ObservableSource<Course>>() {
@Override
public ObservableSource<Course> apply(@NonNull Student student) throws Exception {
return Observable.fromIterable(student.getCourses());
}
})
.subscribe(new Consumer<Course>() {
@Override
public void accept(@NonNull Course course) throws Exception {
LogUtil.d(TAG, "[accept] " + course);
}
});

flatMap 将原始 Observable 发送的事件构造成多个 Observable 对象,然后将他们发送的事件整合到另外的一个单独的 Observable 中。

官方解释图:

扔物线的解释图:

还有一种图,很形象:

flatMap 的原理:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象「铺平」之后通过统一路径分发了下去。而这个「铺平」就是 flatMap() 所谓的 flat。

可以看得出来上面的示意图能看出来,flatMap 并不能保证事件的顺序,要想保证顺序,则用 ConcatMap

从上面的代码和示意图能看出来, flatMap 操作符是一种一对多的关系

flatMap 拓展

传统的嵌套请求是需要嵌套回调机制来实现的,但是使用 flatMap 就可以把请求写在一条链中,使逻辑清晰,这也可以体现它的优势吧:随着程序逻辑越来越复杂,它仍然能保持简洁。

假设存在两个 Retrofit 接口, 一个用于注册,一个用于登录:

1
2
3
4
5
6
7
8
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}

(这里涉及到Rxjava + Retrofit 的使用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
api.register(new RegisterRequest()) //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根据注册的响应结果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});

concat

concat 操作符用于链接多个事件,只有当前面一个事件结束之后,才会发送后面的事件。

官方图:

我们获取网络数据的时候,经常会在本地缓存一份,然后在下一次获取数据的时候,会先从缓存中去拿数据,如果拿到了数据就直接返回,如果没有拿到数据,再去请求网络获取数据。

这样的需求就十分适合使用 concat 操作符来完成。

  • 从缓存中拿数据的 Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//从缓存获取数据
Observable<FidResponseBean> getCacheDataObservable = Observable.create(new ObservableOnSubscribe<FidResponseBean>() {
@Override
public void subscribe(ObservableEmitter<FidResponseBean> e) throws Exception {
LogUtil.d(TAG, "[ subscribe ] " + "current thread:" + Thread.currentThread().getName());
//尝试从缓存弄中拿数据
FidResponseBean cacheData = CacheManager.getInstance().getCache(Network.fid);
if (cacheData != null) {
isFromNet = false;
LogUtil.d(TAG, "[ subscribe ] " + "data from cache");
runOnUiThread(() -> tv.setText("data from: cahce"));
//调用 onNext 方法不会触发下一个 Observable
e.onNext(cacheData);
} else {
isFromNet = true;
LogUtil.d(TAG, "[ subscribe ] " + "data from net");
runOnUiThread(() -> tv.setText("data from: net"));
//只有调用 onComplete 之后,才会执行下一个 Observable
e.onComplete();
}
}
});
  • 从网络拿数据的 Observable
1
2
3
4
5
6
//从网络获取数据
Observable<FidResponseBean> getNetDataObservable = Network.getFidApi().getFidInfo(
Network.name,
Network.length,
Network.ppfeature);

其中 Network 是封装好的 Retrofit 工具。

API:

1
2
3
4
5
6
7
public interface FidApi {
@GET("uploadtest/uptoken")
Observable<FidResponseBean> getFidInfo(@Query("name") String name,
@Query("length") String length,
@Query("ppfeature")String ppfeature);
}
  • 使用 concat 操作符。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.concat(getCacheDataObservable, getNetDataObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FidResponseBean>() {
@Override
public void accept(@NonNull FidResponseBean fidResponseBean) throws Exception {
LogUtil.d(TAG, "[ accept ] " + "subscribe success thread:" + Thread.currentThread().getName());
show.setText(String.format("data: %s", fidResponseBean.toString()));
if (isFromNet) {
//缓存
CacheManager.getInstance().putCache(fidResponseBean);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
LogUtil.d(TAG, "[ accept ] " + "subscribe fail thread:" + Thread.currentThread().getName() + "\n" +
"e:" + throwable.toString());
}
});

输出结果:

1
2
3
4
5
6
7
8
9
10
//没有缓存的时候
01-20 13:58:56.774 6750-7543/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] current thread:RxCachedThreadScheduler-1
01-20 13:58:56.775 6750-7543/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] data from net
01-20 13:58:57.491 6750-6750/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ accept ] subscribe success thread:main
//有缓存的时候
01-20 14:06:32.625 6750-14144/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] current thread:RxCachedThreadScheduler-2
01-20 14:06:32.625 6750-14144/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ subscribe ] data from cache
01-20 14:06:32.627 6750-6750/com.rengwuxian.rxjavasamples D/===RxJavaSample==: TestActivity [ accept ] subscribe success thread:main

zip

关于 zip 操作符,官方的解释图:

另一种解释图:

详细版:

后两张水管图引用自: https://www.jianshu.com/p/bb58571cdb64

有了上面三张图, zip 操作符的作用就很好理解了,就是通过一个函数将多个 Observable 发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个 Observable 一样多的数据。

当然了,还有一些细节上的问题需要注意的,闲先总结:

  • zip 可以将多个 Observable 发送的事件组合在一起,并且基于这个函数的结果,将每一个组合后的事件重新发送出去,并且一个事件只能被组合一次,组合的顺序是严格按照事件发送的顺序来进行的。
  • 最终 Observer 接收到的事件的数量,是 Observable 发送事件最少的哪一个的事件数量。

举个栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "[ subscribe ] " + "emitter 1");
e.onNext(1);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 2");
e.onNext(2);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 3");
e.onNext(3);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 4");
e.onNext(4);
LogUtil.d(TAG, "[ subscribe ] " + "complete 1");
e.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
LogUtil.d(TAG, "[ subscribe ] " + "emitter a");
e.onNext("a");
LogUtil.d(TAG, "[ subscribe ] " + "emitter b");
e.onNext("b");
LogUtil.d(TAG, "[ subscribe ] " + "emitter c");
e.onNext("c");
LogUtil.d(TAG, "[ subscribe ] " + "complete 2");
e.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer+s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.d(TAG, "[ onSubscribe ] ");
}
@Override
public void onNext(String s) {
LogUtil.d(TAG, "[ onNext ] " + "s:" + s);
}
@Override
public void onError(Throwable e) {
LogUtil.d(TAG, "[ onError ] " + e.toString());
}
@Override
public void onComplete() {
LogUtil.d(TAG, "[ onComplete ] ");
}
});

代码很简单,一个 Observable 发送 1、2、3、4、onCompele,另外一个 Observab 发送 a、b、c、onComplete。

打印出的结果:

照这个顺序看,似乎是先发送 Observable1 的事件,然后再发送 Observable2 的事件,并且每发送一个 Observable2 的事件就会触发 Observer 做出反应。

那么是不是这样呢?

我们将每个发送事件的代码都延时 1s ,这里就不贴代码了。

似乎是这样哈? 但是按照上面的解释图来说,不应该是这样呀,应该是 Observable1 发送一个事件,然后 Observable2 发送一个事件,然后组合了之后,Observer 接收一个事件 才对。

分析一下原因:

目前的代码,Observable1 和 Observable2 发送事件都在同一个线程当中,在同一个线程里代码的执行时有先后顺序的,当然是先执行完 Observable1 发送的所有事件之后,再执行 Observer2 发送的事件,然后组合,然后 Observer 接收。

如果将 Observa1 和 Observa2 发送事件分别放在两个线程当中呢?

修改一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtil.d(TAG, "[ subscribe ] " + "emitter 1");
e.onNext(1);
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 2");
e.onNext(2);
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 3");
e.onNext(3);
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "emitter 4");
e.onNext(4);
//注意这里没有睡 1s
LogUtil.d(TAG, "[ subscribe ] " + "complete 1");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
LogUtil.d(TAG, "[ subscribe ] " + "emitter a");
e.onNext("a");
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "emitter b");
e.onNext("b");
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "emitter c");
e.onNext("c");
Thread.sleep(1000);
LogUtil.d(TAG, "[ subscribe ] " + "complete 2");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.d(TAG, "[ onSubscribe ] ");
}
@Override
public void onNext(String s) {
LogUtil.d(TAG, "[ onNext ] " + "s:" + s);
}
@Override
public void onError(Throwable e) {
LogUtil.d(TAG, "[ onError ] " + e.toString());
}
@Override
public void onComplete() {
LogUtil.d(TAG, "[ onComplete ] ");
}
});

跟之前的代码比较,就是将两个 Observab 放在子线程中去执行了。

看看 Log

动态图:

这次的 Log 是不是就比较合理了!两个 Observable 同时开始发送,每发送一个 zip 就会组合一个,然后发送给 Observer,然后当事件少的哪一个 Observab2 发送了 onComplete 之后,虽然 Observable1 还是会继续发送,但是 Obserber 不会接收了。

还有一点要注意的:上面图片中的 红色的框框,会发现并不一定就是 observable1 发一个,然后 Observable2 发送一个,两者可能是交错进行的,这个涉及到某一时刻是哪个线程获取到 cpu 的执行权,所以上面代码多运行几次,log 并不相同,比如还有这种情况:

并且把延迟的代码去掉之后,再看看 log

对的,毫无规律可言,因为当前是哪个线程拿到 cpu 的控制权是不确定的,并且在每一轮当中的时间片长短也有差别,所以日志中会出现这种情况。

这个部分就总结到这了,后面看看背压相关的。

共82.3k字
0%
.gt-container a{border-bottom: none;}