编程知识 cdmana.com

Android rxjava2 learning notes

____tz_zs

One 、hello world

establish Observable Observed
establish Observer The observer
Use subscribe subscribe

public void tzzs_rxjava2_demo() {
   
   
        /*
         * hello world
         * */
        Observable.create(new ObservableOnSubscribe<Object>() {
   
   
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception {
   
   
                emitter.onNext("tzzs");
            }
        }).subscribe(new Consumer<Object>() {
   
   
            @Override
            public void accept(Object o) throws Exception {
   
   
                System.out.println(" Received : " + o);
            }
        });
    }

Two 、 Five observer models

  • Observable and Observer
    Observed Observable Capable of launching 0 or n Data , And terminate with a success or error event
  • Flowable and Subscriber
    Observed Flowable Capable of launching 0 or n Data , And terminate with a success or error event . Support back pressure , It can control the speed of data source emission
  • Single and SingleObserver
    Observed Single Launch only a single data or error event
  • Completable and CompletableObserver
    Observed Completable Never send data , Only deal with onComplete and onError event . Can be seen as Rx Of Runnable
  • Maybe and MaybeObserver
    Observed Maybe Capable of launching 0 perhaps 1 Data , Or success , Or failure . It's kind of like Optional

3、 ... and 、do The operator

do The operator can give Observable Each stage of the lifecycle plus a series of callback listeners , When Observable At this stage , These callbacks are triggered .

  • doOnSubscribe
    Once the observer has subscribed Observable, It will be called
  • doOnLifecycle
    After the observer subscribes , Set whether to unsubscribe
  • doOnNext
    It produced Observable Every time a piece of data is transmitted, it will be called once , its Consumer Accept the data item of the launch . Generally used in subscribe Data processing before
  • doOnEach
    It produced Observable Every time a piece of data is transmitted, it will be called once , Not only onNext , It also includes onError and onCompleted
  • doAfterNext
    stay onNext After performing , and doOnNext() Is in onNext Before execution
  • doOnComplete
    When it comes Observable Call on normal termination onComplete Will be called
  • doFinally
    When it comes to Observable Called after termination , Whether normal termination or abnormal termination .doFinally Prior to the doAfterTerminate Call to
  • doAfterTerminate
    Sign up for a Action, When Observable call onComplete or onError Trigger when

Four 、 Create operators

1. create

create() Create a... From scratch using a function Observable.

public void tzzs_rxjava2_create() {
   
   
        /*
         * create()
         *  Create a... From scratch using a function  Observable.
         * */
        Observable.create(new ObservableOnSubscribe<Integer>() {
   
   
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
   
   
                try {
   
   
                    if (!emitter.isDisposed()) {
   
    //  Check the observer's  isDisposed  state , When there is no observer , Stop sending data .
                        for (int i = 0; i < 10; i++) {
   
   
                            System.out.println("emitter.onNext:" + i);
                            emitter.onNext(i);
                        }
                        System.out.println("emitter.onComplete");
                        emitter.onComplete();
                    }
                } catch (Exception e) {
   
   
                    emitter.onError(e);
                }
            }
        }).subscribe(new Consumer<Integer>() {
   
   
            @Override
            public void accept(Integer integer) throws Exception {
   
   
                System.out.println(" Received : " + integer);
            }
        }, new Consumer<Throwable>() {
   
   
            @Override
            public void accept(Throwable throwable) throws Exception {
   
   
                System.out.println(" Report errors : " + throwable.getMessage());
            }
        }, new Action() {
   
   
            @Override
            public void run() throws Exception {
   
   
                System.out.println(" end ");
            }
        });

        /*
        emitter.onNext:0
         Received : 0
        emitter.onNext:1
         Received : 1
        emitter.onNext:2
         Received : 2
        emitter.onNext:3
         Received : 3
        emitter.onNext:4
         Received : 4
        emitter.onNext:5
         Received : 5
        emitter.onNext:6
         Received : 6
        emitter.onNext:7
         Received : 7
        emitter.onNext:8
         Received : 8
        emitter.onNext:9
         Received : 9
        emitter.onComplete
         end 
        * */
    }

2. just

just() Convert one or more objects to one that emits this or these objects Observable.

public void tzzs_rxjava2_just() {
   
   
        /*
         * just()
         *  Convert one or more objects to one that emits this or these objects  Observable.
         */
        Observable.just("just The string sent ")
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Received :" + s);
                    }
                });
        Observable.just("1", "2", "3", "4")//  You can take one to ten parameters , Returns a... That emits the data in the order of the parameter list  Observable.
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Received :" + s);
                    }
                }, new Consumer<Throwable>() {
   
   
                    @Override
                    public void accept(Throwable throwable) throws Exception {
   
   
                        System.out.println(" Report errors : " + throwable.getMessage());
                    }
                }, new Action() {
   
   
                    @Override
                    public void run() throws Exception {
   
   
                        System.out.println(" end ");
                    }
                });
        /*
         Received :just The string sent 
         Received :1
         Received :2
         Received :3
         Received :4
         end 
        * */

        Observable.just("1", "2", "3", "4", "", null)//rxjava2  in  just()  Cannot be introduced  null. java.lang.NullPointerException: The sixth item is null
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Received :" + s);

                    }
                });
    }

3. from

from() Will a Iterable、 One Future Or an array into an Observable.

public void tzzs_rxjava2_from() {
   
   
        /*
         * from()
         *  Will a  Iterable、 One  Future  Or an array into an  Observable.
         * */
        Observable.fromArray("1", "2")
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Received :" + s);
                    }
                });
        /*
         Received :1
         Received :2
        * */

        ArrayList<Integer> items = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
   
   
            items.add(i);
        }

        Observable.fromArray(items)
                .subscribe(new Consumer<ArrayList<Integer>>() {
   
   
                    @Override
                    public void accept(ArrayList<Integer> integers) throws Exception {
   
   
                        System.out.println(" Received :" + integers);
                    }
                });
        /*
         Received :[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        * */

        Observable.fromIterable(items)
                .subscribe(new Consumer<Integer>() {
   
   
                    @Override
                    public void accept(Integer integer) throws Exception {
   
   
                        System.out.println(" Received :" + integer);
                    }
                });
        /*
         Received :0
         Received :1
         Received :2
         Received :3
         Received :4
         Received :5
         Received :6
         Received :7
         Received :8
         Received :9
        * */
    }

4. timer

timer() Create a... To transmit individual data after a given delay Observable.

public void tzzs_rxjava2_timer() {
   
   
        /*
         * timer()
         *  Create a... To transmit individual data after a given delay  Observable.
         * */
        Observable.timer(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
   
   
                    @Override
                    public void accept(Long aLong) throws Exception {
   
   
                        System.out.println(" Delay receiving message :" + aLong);
                    }
                });

        try {
   
   
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
    }

5、 ... and 、 Transform operator and filter operator

1. map

map() Yes Observable Each item of data sent is applied to a function of your choice , Perform the transform operation , And then I'll go back to the Observable.

public void tzzs_rxjava2_map() {
   
   
        /*
         * map()
         *  Yes  Observable  Each item of data sent is applied to a function of your choice , Perform the transform operation , And then I'll go back to the  Observable..
         * */
        Observable.just("just message")
                .map(new Function<String, String>() {
   
   
                    @Override
                    public String apply(@NonNull String s) throws Exception {
   
   
                        return s + "|map  operation |";
                    }
                })
                .map(new Function<String, String>() {
   
   
                    @Override
                    public String apply(@NonNull String s) throws Exception {
   
   
                        return s + "|map2  operation |";
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Received :" + s);
                    }
                });
        /*
         Received :just message|map  operation ||map2  operation |
        * */
	}

2. flatMap

flatMap() Will send a data of Observable Change to more than one Observables, And then merge the data they're sending into a single Observable.
Concrete , Use a specified function on the original Observable Each data transmitted performs a transformation operation , This function returns a function that also emits data Observable, then flatMap Merge these Observables Data from the launch , Finally, the merged result is sent as its own data sequence .

public void tzzs_rxjava2_flatmap() {
   
   
        /*
         * flatMap()
         *  Will send a data of  Observable  Change to more than one  Observables, And then merge the data they're sending into a single  Observable.
         *
         *  Use a specified function on the original  Observable  Each data transmitted performs a transformation operation , This function returns a function that also emits data  Observable, And then merge these  Observables  Data from the launch , Finally, the merged result is sent as its own data sequence .
         * */
        Observable.just("1", "2", "3").delay(1, TimeUnit.SECONDS)
                .flatMap(new Function<String, ObservableSource<Integer>>() {
   
   
                    @Override
                    public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
   
   
                        //  Original  Observable  One of the transmitted data performs the transformation 
                        List<Integer> list = new ArrayList<>();
                        for (int i = 5; i < 7; i++) {
   
   
                            list.add(Integer.parseInt(s) * 10 + i);
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS); //  For the transformed data , Create a new  Observable  return .
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
   
   
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
   
   
                        return Observable.just("flatMap just1: " + integer, "flatMap just2: " + integer).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Message received :" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
         Message received :flatMap just1: 25
         Message received :flatMap just1: 26
         Message received :flatMap just2: 26
         Message received :flatMap just2: 25
         Message received :flatMap just1: 36
         Message received :flatMap just1: 15
         Message received :flatMap just2: 15
         Message received :flatMap just1: 35
         Message received :flatMap just2: 35
         Message received :flatMap just1: 16
         Message received :flatMap just2: 16
         Message received :flatMap just2: 36
        * */
	}

3. concatMap

flatMap There's no guarantee of the sequence of launches , Use concatMap The data will be sent in strict order .

public void tzzs_rxjava2_concatmap() {
   
   
        Observable.just("1", "2", "3").delay(1, TimeUnit.SECONDS)
                .concatMap(new Function<String, ObservableSource<Integer>>() {
   
   
                    @Override
                    public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
   
   
                        //  Original  Observable  One of the transmitted data performs the transformation 
                        List<Integer> list = new ArrayList<>();
                        for (int i = 5; i < 7; i++) {
   
   
                            list.add(Integer.parseInt(s) * 10 + i);
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS); //  For the transformed data , Create a new  Observable  return .
                    }
                })
                .concatMap(new Function<Integer, ObservableSource<String>>() {
   
   
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
   
   
                        return Observable.just("concatMap just1: " + integer, "concatMap just2: " + integer).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Message received :" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
         Message received :concatMap just1: 15
         Message received :concatMap just2: 15
         Message received :concatMap just1: 16
         Message received :concatMap just2: 16
         Message received :concatMap just1: 25
         Message received :concatMap just2: 25
         Message received :concatMap just1: 26
         Message received :concatMap just2: 26
         Message received :concatMap just1: 35
         Message received :concatMap just2: 35
         Message received :concatMap just1: 36
         Message received :concatMap just2: 36
        * */
    }

4. filter

filter() Specify an assertion function to test data items , The data passed to test ( Return to true) Will be launched .

public void tzzs_rxjava2_filter() {
   
   
        /*
         * filter()
         *  Specify an assertion function to test data items , The data passed to test ( Return to true) Will be launched .
         * */
        Observable.just("1", "2", "3", "4", "5", "6").delay(1, TimeUnit.SECONDS)
                .filter(new Predicate<String>() {
   
   
                    @Override
                    public boolean test(@NonNull String s) throws Exception {
   
   
                        return Integer.parseInt(s) > 2;
                    }
                })
                .subscribe(new Consumer<String>() {
   
   
                    @Override
                    public void accept(String s) throws Exception {
   
   
                        System.out.println(" Message received :" + s);
                    }
                });
        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }
        /*
         Message received :3
         Message received :4
         Message received :5
         Message received :6
        * */
    }

6、 ... and 、 Merge operator and join operator

1. merge

merge() Will be multiple Observable Merge into one .merge It's parallel to the timeline .merge At most, you can only merge 4 Observers , redundant 4 You need to use mergeArray.

public void tzzs_rxjava2_merge() {
   
   
        /*
         * merge()
         *  Will be multiple  Observable  Merge into one .merge  It's parallel to the timeline .merge  At most, you can only merge  4  Observers , redundant  4  You need to use  mergeArray.
         * */
        Observable<String> observable_tzzs_1 = Observable.just("tzzs_1_1", "tzzs_1_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_2 = Observable.just("tzzs_2_1", "tzzs_2_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_3 = Observable.just("tzzs_3_1", "tzzs_3_2").delay(1, TimeUnit.SECONDS);

        Observable
                .merge(observable_tzzs_1, observable_tzzs_2, observable_tzzs_3)
                .subscribe(new DisposableObserver<String>() {
   
   
                    @Override
                    public void onNext(@NonNull String s) {
   
   
                        System.out.println(" Message received :" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
   
   
                        System.out.println(" Report errors : " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
   
   
                        System.out.println(" end ");
                    }
                });

        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }

        /*
         Message received :tzzs_3_1
         Message received :tzzs_3_2
         Message received :tzzs_1_1
         Message received :tzzs_1_2
         Message received :tzzs_2_1
         Message received :tzzs_2_2
         end 
        * */
    }

2. concat

concat() Will be multiple Observable Merge into one . And merge The difference is ,concat It is executed serially in the sending order .

public void tzzs_rxjava2_concat() {
   
   
        /*
         * concat()
         *  Will be multiple  Observable  Merge into one . And  merge  The difference is ,concat  It is executed serially in the sending order .
         * */
        Observable<String> observable_tzzs_1 = Observable.just("tzzs_1_1", "tzzs_1_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_2 = Observable.just("tzzs_2_1", "tzzs_2_2").delay(2, TimeUnit.SECONDS);
        Observable<String> observable_tzzs_3 = Observable.just("tzzs_3_1", "tzzs_3_2").delay(1, TimeUnit.SECONDS);

        Observable
                .concat(observable_tzzs_1, observable_tzzs_2, observable_tzzs_3)
                .subscribe(new DisposableObserver<String>() {
   
   
                    @Override
                    public void onNext(@NonNull String s) {
   
   
                        System.out.println(" Message received :" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
   
   
                        System.out.println(" Report errors : " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
   
   
                        System.out.println(" end ");
                    }
                });

        try {
   
   
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
   
   
            e.printStackTrace();
        }

        /*
         Message received :tzzs_1_1
         Message received :tzzs_1_2
         Message received :tzzs_2_1
         Message received :tzzs_2_2
         Message received :tzzs_3_1
         Message received :tzzs_3_2
         end 
        * */
    }

版权声明
本文为[osc_ uu6euvkf]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201225080834915Y.html

Scroll to Top