编程知识 cdmana.com

Android rxjava2 学习笔记

____tz_zs

一、hello world

创建 Observable 被观察者
创建 Observer 观察者
使用 subscribe 订阅

public void tzzs_rxjava2_demo() {
   
   
        /*
         * hello world
         * */
        Observable.create(new ObservableOnSubscribe<Object>() {
   
   
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception {
   
   
                emitter.onNext("tzzs");
            }
        }).subscribe(new Consumer<Object>() {
   
   
            @Override
            public void accept(Object o) throws Exception {
   
   
                System.out.println("接收到: " + o);
            }
        });
    }

二、五种观察者模式

  • Observable 和 Observer
    被观察者 Observable 能够发射 0 或 n 个数据,并以成功或错误事件终止
  • Flowable 和 Subscriber
    被观察者 Flowable 能够发射 0 或n个数据,并以成功或错误事件终止。支持背压,可以控制数据源发射的速度
  • Single 和 SingleObserver
    被观察者 Single 只发射单个数据或错误事件
  • Completable 和 CompletableObserver
    被观察者 Completable 从来不发射数据,只处理 onComplete 和 onError 事件。可以看成 Rx 的 Runnable
  • Maybe 和 MaybeObserver
    被观察者 Maybe 能够发射 0 或者 1 个数据,要么成功,要么失败。有点类似于 Optional

三、do 操作符

do 操作符可以给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段时,这些回调就会被触发。

  • doOnSubscribe
    一旦观察者订阅了 Observable,它就会被调用
  • doOnLifecycle
    可以在观察者订阅之后,设置是否取消订阅
  • doOnNext
    它产生的 Observable 每发射一项数据就会调用它一次,它的 Consumer 接受发射的数据项。一般用于在 subscribe 之前对数据进行处理
  • doOnEach
    它产生的 Observable 每发射一项数据就会调用它一次,不仅包括 onNext ,还包括 onError 和 onCompleted
  • doAfterNext
    在 onNext 之后执行,而 doOnNext() 是在 onNext 之前执行
  • doOnComplete
    当它产生的 Observable 在正常终止调用 onComplete 时会被调用
  • doFinally
    在当它产生的 Observable 终止之后会被调用,无论是正常终止还是异常终止。doFinally 优先于 doAfterTerminate 的调用
  • doAfterTerminate
    注册一个 Action,当 Observable 调用 onComplete 或 onError 时触发

四、创建操作符

1. create

create() 使用一个函数从头创建一个 Observable。

public void tzzs_rxjava2_create() {
   
   
        /*
         * create()
         * 使用一个函数从头创建一个 Observable。
         * */
        Observable.create(new ObservableOnSubscribe<Integer>() {
   
   
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
   
   
                try {
   
   
                    if (!emitter.isDisposed()) {
   
    // 检查观察者的 isDisposed 状态,没有观察者时,停止发送数据。
                        for (int i = 0; i < 10; i++) {
   
   
                            System.out.println("emitter.onNext:" + i);
                            emitter.onNext(i);
                        }
                        System.out.println("emitter.onComplete");
                        emitter.onComplete();
                    }
                } catch (Exception e) {
   
   
                    emitter.onError(e);
                }
            }
        }).subscribe(new Consumer<Integer>() {
   
   
            @Override
            public void accept(Integer integer) throws Exception {
   
   
                System.out.println("接收到: " + integer);
            }
        }, new Consumer<Throwable>() {
   
   
            @Override
            public void accept(Throwable throwable) throws Exception {
   
   
                System.out.println("报错: " + throwable.getMessage());
            }
        }, new Action() {
   
   
            @Override
            public void run() throws Exception {
   
   
                System.out.println("结束");
            }
        });

        /*
        emitter.onNext:0
        接收到: 0
        emitter.onNext:1
        接收到: 1
        emitter.onNext:2
        接收到: 2
        emitter.onNext:3
        接收到: 3
        emitter.onNext:4
        接收到: 4
        emitter.onNext:5
        接收到: 5
        emitter.onNext:6
        接收到: 6
        emitter.onNext:7
        接收到: 7
        emitter.onNext:8
        接收到: 8
        emitter.onNext:9
        接收到: 9
        emitter.onComplete
        结束
        * */
    }

2. just

just() 将一个或多个对象转换为发射这个或这些对象的一个 Observable。

public void tzzs_rxjava2_just() {
   
   
        /*
         * just()
         * 将一个或多个对象转换为发射这个或这些对象的一个 Observable。
         */
        Observable.just("just发送的字符串")
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到:" + s);
                    }
                });
        Observable.just("1", "2", "3", "4")// 可以接受一到十个参数,返回一个按照参数列表顺序发射这些数据的 Observable。
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到:" + s);
                    }
                }, new Consumer<Throwable>() {
   
   
                    @Override
                    public void accept(Throwable throwable) throws Exception {
   
   
                        System.out.println("报错: " + throwable.getMessage());
                    }
                }, new Action() {
   
   
                    @Override
                    public void run() throws Exception {
   
   
                        System.out.println("结束");
                    }
                });
        /*
        接收到:just发送的字符串
        接收到:1
        接收到:2
        接收到:3
        接收到:4
        结束
        * */

        Observable.just("1", "2", "3", "4", "", null)//rxjava2 中 just() 不能传入 null。 java.lang.NullPointerException: The sixth item is null
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到:" + s);

                    }
                });
    }

3. from

from() 将一个 Iterable、一个 Future 或者一个数组转换成一个 Observable。

public void tzzs_rxjava2_from() {
   
   
        /*
         * from()
         * 将一个 Iterable、一个 Future 或者一个数组转换成一个 Observable。
         * */
        Observable.fromArray("1", "2")
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到:" + s);
                    }
                });
        /*
        接收到:1
        接收到:2
        * */

        ArrayList<Integer> items = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
   
   
            items.add(i);
        }

        Observable.fromArray(items)
                .subscribe(new Consumer<ArrayList<Integer>>() {
   
   
                    @Override
                    public void accept(ArrayList<Integer> integers) throws Exception {
   
   
                        System.out.println("接收到:" + integers);
                    }
                });
        /*
        接收到:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        * */

        Observable.fromIterable(items)
                .subscribe(new Consumer<Integer>() {
   
   
                    @Override
                    public void accept(Integer integer) throws Exception {
   
   
                        System.out.println("接收到:" + integer);
                    }
                });
        /*
        接收到:0
        接收到:1
        接收到:2
        接收到:3
        接收到:4
        接收到:5
        接收到:6
        接收到:7
        接收到:8
        接收到:9
        * */
    }

4. timer

timer() 创建一个在给定的延时之后发射单个数据的 Observable。

public void tzzs_rxjava2_timer() {
   
   
        /*
         * timer()
         * 创建一个在给定的延时之后发射单个数据的 Observable。
         * */
        Observable.timer(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
   
   
                    @Override
                    public void accept(Long aLong) throws Exception {
   
   
                        System.out.println("延迟接收到消息:" + aLong);
                    }
                });

        try {
   
   
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
    }

五、变换操作符和过滤操作符

1. map

map() 对 Observable 发射的每一项数据都应用一个你选择的函数,执行变换操作,然后返回一个发射这些结果的 Observable。

public void tzzs_rxjava2_map() {
   
   
        /*
         * map()
         * 对 Observable 发射的每一项数据都应用一个你选择的函数,执行变换操作,然后返回一个发射这些结果的 Observable。。
         * */
        Observable.just("just message")
                .map(new Function<String, String>() {
   
   
                    @Override
                    public String apply(@NonNull String s) throws Exception {
   
   
                        return s + "|map 操作|";
                    }
                })
                .map(new Function<String, String>() {
   
   
                    @Override
                    public String apply(@NonNull String s) throws Exception {
   
   
                        return s + "|map2 操作|";
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到:" + s);
                    }
                });
        /*
        接收到:just message|map 操作||map2 操作|
        * */
	}

2. flatMap

flatMap() 将一个发射数据的 Observable 变换为多个 Observables,然后将他们发射的数据合并后放进一个单独的 Observable。
具体的,使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的 Observable,然后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果当作自己的数据序列发射。

public void tzzs_rxjava2_flatmap() {
   
   
        /*
         * flatMap()
         * 将一个发射数据的 Observable 变换为多个 Observables,然后将他们发射的数据合并后放进一个单独的 Observable。
         *
         * 使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的 Observable,然后合并这些 Observables 发射的数据,最后将合并后的结果当作自己的数据序列发射。
         * */
        Observable.just("1", "2", "3").delay(1, TimeUnit.SECONDS)
                .flatMap(new Function<String, ObservableSource<Integer>>() {
   
   
                    @Override
                    public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
   
   
                        // 对原始 Observable 发射的一项数据执行变换
                        List<Integer> list = new ArrayList<>();
                        for (int i = 5; i < 7; i++) {
   
   
                            list.add(Integer.parseInt(s) * 10 + i);
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS); // 对这项变换后的数据,创建一个新 Observable 返回。
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
   
   
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
   
   
                        return Observable.just("flatMap just1: " + integer, "flatMap just2: " + integer).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到消息:" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
        接收到消息:flatMap just1: 25
        接收到消息:flatMap just1: 26
        接收到消息:flatMap just2: 26
        接收到消息:flatMap just2: 25
        接收到消息:flatMap just1: 36
        接收到消息:flatMap just1: 15
        接收到消息:flatMap just2: 15
        接收到消息:flatMap just1: 35
        接收到消息:flatMap just2: 35
        接收到消息:flatMap just1: 16
        接收到消息:flatMap just2: 16
        接收到消息:flatMap just2: 36
        * */
	}

3. concatMap

flatMap 不能保证发射的顺序,使用 concatMap 则会严格按照顺序发射这些数据。

public void tzzs_rxjava2_concatmap() {
   
   
        Observable.just("1", "2", "3").delay(1, TimeUnit.SECONDS)
                .concatMap(new Function<String, ObservableSource<Integer>>() {
   
   
                    @Override
                    public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
   
   
                        // 对原始 Observable 发射的一项数据执行变换
                        List<Integer> list = new ArrayList<>();
                        for (int i = 5; i < 7; i++) {
   
   
                            list.add(Integer.parseInt(s) * 10 + i);
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS); // 对这项变换后的数据,创建一个新 Observable 返回。
                    }
                })
                .concatMap(new Function<Integer, ObservableSource<String>>() {
   
   
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
   
   
                        return Observable.just("concatMap just1: " + integer, "concatMap just2: " + integer).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到消息:" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
        接收到消息:concatMap just1: 15
        接收到消息:concatMap just2: 15
        接收到消息:concatMap just1: 16
        接收到消息:concatMap just2: 16
        接收到消息:concatMap just1: 25
        接收到消息:concatMap just2: 25
        接收到消息:concatMap just1: 26
        接收到消息:concatMap just2: 26
        接收到消息:concatMap just1: 35
        接收到消息:concatMap just2: 35
        接收到消息:concatMap just1: 36
        接收到消息:concatMap just2: 36
        * */
    }

4. filter

filter() 指定一个断言函数测试数据项,通过来测试的数据(返回为true)才会被发射。

public void tzzs_rxjava2_filter() {
   
   
        /*
         * filter()
         * 指定一个断言函数测试数据项,通过来测试的数据(返回为true)才会被发射。
         * */
        Observable.just("1", "2", "3", "4", "5", "6").delay(1, TimeUnit.SECONDS)
                .filter(new Predicate<String>() {
   
   
                    @Override
                    public boolean test(@NonNull String s) throws Exception {
   
   
                        return Integer.parseInt(s) > 2;
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println("接收到消息:" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
        接收到消息:3
        接收到消息:4
        接收到消息:5
        接收到消息:6
        * */
    }

六、合并操作符和连接操作符

1. merge

merge() 将多个 Observable 合并为一个。merge 是按照时间线并行。merge 最多只能合并 4 个被观察者,多余 4 个则需使用 mergeArray。

public void tzzs_rxjava2_merge() {
   
   
        /*
         * merge()
         * 将多个 Observable 合并为一个。merge 是按照时间线并行。merge 最多只能合并 4 个被观察者,多余 4 个则需使用 mergeArray。
         * */
        Observable<String> observable_tzzs_1 = Observable.just("tzzs_1_1", "tzzs_1_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_2 = Observable.just("tzzs_2_1", "tzzs_2_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_3 = Observable.just("tzzs_3_1", "tzzs_3_2").delay(1, TimeUnit.SECONDS);

        Observable
                .merge(observable_tzzs_1, observable_tzzs_2, observable_tzzs_3)
                .subscribe(new DisposableObserver<String>() {
   
   
                    @Override
                    public void onNext(@NonNull String s) {
   
   
                        System.out.println("接收到消息:" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
   
   
                        System.out.println("报错: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
   
   
                        System.out.println("结束");
                    }
                });

        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }

        /*
        接收到消息:tzzs_3_1
        接收到消息:tzzs_3_2
        接收到消息:tzzs_1_1
        接收到消息:tzzs_1_2
        接收到消息:tzzs_2_1
        接收到消息:tzzs_2_2
        结束
        * */
    }

2. concat

concat() 将多个 Observable 合并为一个。与 merge 的不同在于,concat 是按发送顺序串行执行。

public void tzzs_rxjava2_concat() {
   
   
        /*
         * concat()
         * 将多个 Observable 合并为一个。与 merge 的不同在于,concat 是按发送顺序串行执行。
         * */
        Observable<String> observable_tzzs_1 = Observable.just("tzzs_1_1", "tzzs_1_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_2 = Observable.just("tzzs_2_1", "tzzs_2_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_3 = Observable.just("tzzs_3_1", "tzzs_3_2").delay(1, TimeUnit.SECONDS);

        Observable
                .concat(observable_tzzs_1, observable_tzzs_2, observable_tzzs_3)
                .subscribe(new DisposableObserver<String>() {
   
   
                    @Override
                    public void onNext(@NonNull String s) {
   
   
                        System.out.println("接收到消息:" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
   
   
                        System.out.println("报错: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
   
   
                        System.out.println("结束");
                    }
                });

        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }

        /*
        接收到消息:tzzs_1_1
        接收到消息:tzzs_1_2
        接收到消息:tzzs_2_1
        接收到消息:tzzs_2_2
        接收到消息:tzzs_3_1
        接收到消息:tzzs_3_2
        结束
        * */
    }

版权声明
本文为[osc_uu6euvkf]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4278787/blog/4839613

Scroll to Top