编程知识 cdmana.com

Reactor responsive programming, you only need this!

Ha ha ha ha ha ha ha , The title is a bit rampant . But since you're here , Let's have a look , After all, responsive programming with high concurrency is hard on performance , More and more important .

oh , This is an article Java article .

I don't say much nonsense , Go straight to the point .

Responsive programming core components

Before we get to the point , I want you to tell the publisher / The subscriber model has some understanding .

Look at the picture directly :

Talk is cheap, show you the code!

public class Main {

    public static void main(String[] args) {
        Flux<Integer> flux = Flux.range(0, 10);
        flux.subscribe(i -> {
            System.out.println("run1: " + i);
        });
        flux.subscribe(i -> {
            System.out.println("run2: " + i);
        });
    }
}

Output :

run1: 0
run1: 1
run1: 2
run1: 3
run1: 4
run1: 5
run1: 6
run1: 7
run1: 8
run1: 9
run2: 0
run2: 1
run2: 2
run2: 3
run2: 4
run2: 5
run2: 6
run2: 7
run2: 8
run2: 9

Process finished with exit code 0

Flux

Flux It's a multi-element producer , Between the lines , It can produce multiple elements , The sequence of constituent elements , For subscriber use .

Mono

Mono and Flux The difference is that , It can only produce one element for the producer to subscribe to , That's the difference in quantity .

Mono One of the common applications of is Mono<ServerResponse\> As WebFlux The return value of . After all, there's only one request at a time Response object , therefore Mono It is just fine .

Quickly create one Flux/Mono And subscribe to it

Let's take a look at some of the official document demonstrations .

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Mono<String> noData = Mono.empty();

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

subscribe() Method (Lambda form )

  • subscribe() Method accepts a by default Lambda To use expressions as subscribers . It comes in four variants .
  • Let me explain it here subscribe() Fourth parameter , Points out that when the subscription signal arrives , Number of initial requests , If it is null Then all requests (Long.MAX_VALUE)
public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done");
        }, p -> {
            p.request(2);
        });
    }
}

If you remove the initial request , Then the maximum value will be requested :

public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        //  Let me explain it here subscribe() Fourth parameter , Points out that when the subscription signal arrives , Number of initial requests , If it is null Then all requests (Long.MAX_VALUE)
        //  rest subscribe() See source code or documentation for details :https://projectreactor.io/docs/core/release/reference/#flux
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done");
        });
    }
}

Output :

run
0
run
1
run
2
run
3
run
4
run
5
run
6
run
7
run
8
run
9
done

Process finished with exit code 0

Inherit BaseSubscriber( Not Lambda form )

  • This way is more like for Lambda An alternative expression of an expression .
  • For subscriptions based on this method , There are a few caveats , For example, when you first subscribe , Ask at least once . Otherwise, the program can't continue to get new elements .
public class FluxWithBaseSubscriber {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(new MySubscriber());
    }

    /**
     *  Generally speaking , By inheritance BaseSubscriber<T> To achieve , And it's custom-made hookOnSubscribe() and hookOnNext() Method 
     */
    private static class MySubscriber extends BaseSubscriber<Integer> {

        /**
         *  Called on initial subscription 
         */
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println(" Here we go !");
            //  Remember to ask at least once , Otherwise, it will not execute hookOnNext() Method 
            request(1);
        }

        /**
         *  Every time a new value is read, call 
         */
        @Override
        protected void hookOnNext(Integer value) {
            System.out.println(" Start reading ...");
            System.out.println(value);
            //  Indicate how many to read next 
            request(2);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println(" The end! ");
        }
    }
}

Output :

 Here we go !
 Start reading ...
0
 Start reading ...
1
 Start reading ...
2
 Start reading ...
3
 Start reading ...
4
 Start reading ...
5
 Start reading ...
6
 Start reading ...
7
 Start reading ...
8
 Start reading ...
9
 The end! 

Process finished with exit code 0

End subscription :Disposable

  • Disposable Is an interface returned when subscribing , There are many ways to operate a subscription .
  • For example, unsubscribe .

Here we use multithreading to simulate the production of the producer very quickly , And then immediately unsubscribe ( Although it was cancelled immediately, because the producers were so fast , So the subscriber still receives some elements ).

Other methods , such as Disposables.composite() You'll get one Disposable Set , Call it the dispose() The method takes all of the Disposable Of dispose() Methods call .

public class FluxWithDisposable {

    public static void main(String[] args) {
        Disposable disposable = getDis();
        //  The number of prints per time is generally different , Because of the call disposable Of dispose() Method has been cancelled , But if the producer is too fast , Then it may be too late to stop .
        disposable.dispose();
    }

    private static Disposable getDis() {
        class Add implements Runnable {

            private final FluxSink<Integer> fluxSink;

            public Add(FluxSink<Integer> fluxSink) {
                this.fluxSink = fluxSink;
            }

            @Override
            public synchronized void run() {
                fluxSink.next(new Random().nextInt());
            }
        }
        Flux<Integer> integerFlux = Flux.create(integerFluxSink -> {
            Add add = new Add(integerFluxSink);
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
        });
        return integerFlux.subscribe(System.out::println);
    }
}

Output :

 The output here may vary with each call , Because the subscription was cancelled , So how much you can print depends on that moment CPU The speed of .

Adjust the publishing rate of publishers

  • To ease the pressure on subscribers , Subscribers can reshape the publishing rate of publishers through negative pressure flow backtracking . The most typical usage is the following —— By inheritance BaseSubscriber To set your own request rate . But one thing must be clear , Namely hookOnSubscribe() Method must be requested at least once , Otherwise, your publisher may “ Get stuck ”.
public class FluxWithLimitRate1 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 100);
        integerFlux.subscribe(new MySubscriber());
    }

    private static class MySubscriber extends BaseSubscriber<Integer> {

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println(" Here we go !");
            //  Remember to ask at least once , Otherwise, it will not execute hookOnNext() Method 
            request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println(" Start reading ...");
            System.out.println(value);
            //  Indicate how many to read next 
            request(2);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println(" The end! !");
        }
    }
}
  • Or use limitRate() Instance method to limit , It returns a rate limited Flux or Mono. Some upmarket operations can change the request rate of downstream subscribers , There are some operations, there is a prefetch As input , You can get more sequence elements than the number of downstream subscriber requests , This is done to deal with their own internal sequence . These prefetch operations generally default to prefetch 32 individual , But to optimize ; Each time the number of pre fetched has been obtained 75% When , You'll get it again 75%. This is called “ Add optimization ”.
public class FluxWithLimitRate2 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 100);
        //  Last , Let's see Flux The provided pre acquisition method :
        //  Indicate the number of prefetches 
        integerFlux.limitRate(10);
        // lowTide Indicates the value of the supplementary optimization of the prefetch operation , I.e. modification 75% The default value of ;highTide Indicate the number of prefetches .
        integerFlux.limitRate(10, 15);
        //  Ah ~ The most typical is , There are countless requests :request(Long.MAX_VALUE) But I'll give you limitRate(2); You can only get two at a time. Ha ha ha !
        //  The other one is limitRequest(N), It limits dirty total requests to N. If the dirty request exceeds N, So just go back to N individual , Otherwise, return the actual quantity . And then think the request is complete , Send... Down the stream onComplete The signal .
        integerFlux.limitRequest(5).subscribe(new MySubscriber());
        //  The above one will only output 5 individual .
    }
}

Programmatically create a sequence

Static synchronization method :generate()

Now it's programmed generation Flux/Mono When . First introduced generate() Method , This is a synchronous method . The implication is , It's not thread safe , And its receiver can only accept input one at a time to generate Flux/Mono. in other words , It can only be called once at any time and only takes one input .

Or so to speak , The sequence of elements it generates , It depends on how the code is written .

public class FluxWithGenerate {

    public static void main(String[] args) {
        //  Here's one of its variants : The first parameter is to provide the initial state , The second parameter is a generator that writes data to the receiver , The parameter for state( It's usually an integer , Used to record status ), And receiver .
        //  See the source code for other variants 
        Flux.generate(() -> 0, (state, sink) -> {
            sink.next(state+"asdf");
            //  Plus for sink.complete() Call to terminate the build ; Otherwise, it's an infinite sequence .
            return state+1;
        }).subscribe(System.out::println);
        // generate The third parameter of the method is called to end the build , Consume state.
        Flux.generate(AtomicInteger::new, (state, sink) -> {
            sink.next(state.getAndIncrement()+"qwer");
            return state;
        }).subscribe(System.out::println);
        // generate() The workflow for looks like :next()->next()->next()->...
    }
}
  • It's not hard to see through the above code , Each receiver receives a value from the return value of the last generation method , That is to say state= The return value of the last iteration ( In fact, it is accurate to call it the last stream , It's just for the sake of understanding ).
  • But this state Every time it's a brand new ( Every time +1 New, of course ), Is there any way to do the two iterations state Is the same reference and can also update the value ? The answer is atomic type . That's the second way above .

Static asynchronous multithreading method :create()

Now, synchronous generation , The next step is asynchronous generation , Multithreading or multithreading ! Let's invite :create() The debut !!!

  • create() The method exposed a FluxSink object , It allows us to access and generate the sequence we need . besides , It can also trigger multithreaded events in callbacks .
  • create Another feature is that it's easy to bridge other interfaces to responsive bridges . Be careful , It's asynchronous multithreading doesn't mean create You can parallelize your code or execute it asynchronously ; How do you understand that ? Namely ,create Method Lambda The expression code is still blocked by a single thread . If you block the code where the sequence was created , Then it may cause the subscriber to even request data , I can't get it , Because the sequence is blocked , There's no way to generate new .
  • In fact, we can guess from the above phenomenon that , By default, the thread used by the subscriber and create Using a thread , Of course blocking create It will make the subscriber unable to run !
  • The above problems can be solved by Scheduler solve , It will be mentioned later .
public class FluxWithCreate {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> stringTestListener) {
                this.testListener = stringTestListener;
            }

            @Override
            public TestListener<String> get() {
                return testListener;
            }
        };
        Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() {
            @Override
            public void onChunk(List<String> chunk) {
                for (String s : chunk) {
                    stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {
                stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        System.out.println(" Now it is 2020/10/22 22:58; I am sleepy ");
        TestListener<String> testListener = testProcessor.get();
        Runnable1<String> runnable1 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run1");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable2 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run2");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable3 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run3");
                }
                testListener.onChunk(list);
            }
        };
        runnable1.set(testListener);
        runnable2.set(testListener);
        runnable3.set(testListener);
        // create So-called " asynchronous "," Multithreading " Refers to calling in multithreading. sink.next() Method . This is in the following push In contrast, we can see that 
        new Thread(runnable1).start();
        new Thread(runnable2).start();
        new Thread(runnable3).start();
        Thread.sleep(1000);
        testListener.onComplete();
        //  On the other hand ,create Another variant of can set parameters to achieve negative pressure control , See the source code for details .
    }
    public interface TestListener<T> {

        void onChunk(List<T> chunk);

        void onComplete();
    }

    public interface TestProcessor<T> {

        void register(TestListener<T> tTestListener);

        TestListener<T> get();
    }

    public interface Runnable1<T> extends Runnable {
         void set(TestListener<T> testListener);
    }
}

Static asynchronous single threaded methods :push()

Now, asynchronous multithreading , Synchronous generation method , And then there's asynchronous single threading :push().

Actually speaking of push and create Comparison of , My personal understanding is as follows :

  • reate Allow multi thread environment to call .next() Method , Just generate elements , The sequence of elements depends on ... Forget it , Random , After all, multithreading ;
  • however push Only one thread is allowed to produce elements , So it's orderly , As for asynchrony, it means that you can also , It doesn't have to be in the current thread .
  • By the way ,push and create All support onCancel() and onDispose() operation . Generally speaking ,onCancel Only in response to cancel operation , and onDispose In response to error,cancel,complete Wait for the operation .
public class FluxWithPush {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public TestListener<String> get() {
                return this.testListener;
            }
        };
        Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() {
            @Override
            public void onChunk(List<String> list) {
                for (String s : list) {
                    stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {
                stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        Runnable1<String> runnable = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++i) {
                    list.add(UUID.randomUUID().toString());
                }
                testListener.onChunk(list);
            }
        };
        TestListener<String> testListener = testProcessor.get();
        runnable.set(testListener);
        new Thread(runnable).start();
        Thread.sleep(15);
        testListener.onComplete();
    }

    public interface TestListener<T> {
        void onChunk(List<T> list);
        void onComplete();
    }

    public interface TestProcessor<T> {
        void register(TestListener<T> testListener);
        TestListener<T> get();
    }

    public interface Runnable1<T> extends Runnable {
        void set(TestListener<T> testListener);
    }
}

Same as create equally ,push It also supports negative pressure regulation . But I didn't write it out , I tried Demo It's all direct requests Long.MAX_VALUE, It's really just through sink.onRequest(LongConsumer) Method to achieve negative pressure control . Here's the principle , If you want to know more, please explore by yourself , I am not talented , The whole afternoon didn't come true .

Example method :handle()

stay Flux In the example method of ,handle similar filter and map The operation of .

public class FluxWithHandle {

    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.push(stringFluxSink -> {
            for (int i = 0; i < 10; ++ i) {
                stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5));
            }
        });
        //  Get all that contains 'a' String of 
        Flux<String> flux = stringFlux.handle((str, sink) -> {
            String s = f(str);
            if (s != null) {
                sink.next(s);
            }
        });
        flux.subscribe(System.out::println);
    }

    private static String f(String str) {
        return str.contains("a") ? str : null;
    }
}

Threads and scheduling

Schedulers Those static methods of

Generally speaking , None of the responsive frameworks support concurrency ,P.s. create That's producer concurrency , It's not concurrent in itself . So there is no concurrency library available , Need to be implemented by developers themselves .

meanwhile , Each operation is usually run in the thread where the previous operation is located , They don't have their own threads , And the top operation is with subscribe() In the same thread . such as Flux.create(...).handle(...).subscribe(...) It's all running in the main thread .

In a responsive framework ,Scheduler Determines the thread in which the operation is executed and how , Its function is similar to ExecutorService. But a little bit more . If you want to implement some concurrent operations , Then consider using Schedulers Static methods provided , Let's see what's available :

Schedulers.immediate(): Submit directly in the current thread Runnable Mission , And execute immediately .

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        // Schedulers.immediate():  Submit directly in the current thread Runnable Mission , And execute immediately .
        System.out.println(" Current thread :" + Thread.currentThread().getName());
        System.out.println("zxcv");
        Schedulers.immediate().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("qwer");
        });
        System.out.println("asdf");
        //  Make sure that the asynchronous task can be printed 
        Thread.sleep(1000);
    }
}

It can be seen from the above that ,immediate() In fact, it is to insert what needs to be executed in the execution position Runnable To achieve . It's no different from writing the code here .

Schedulers.newSingle(): Ensure that each operation is performed using a new thread .

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        //  If you want every call to be a new thread , have access to Schedulers.newSingle(), It ensures that each operation is performed using a new thread .
        Schedulers.single().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("bnmp");
        });
        Schedulers.single().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("ghjk");
        });
        Schedulers.newSingle(" Threads 1").schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("1234");
        });
        Schedulers.newSingle(" Threads 1").schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("5678");
        });
        Schedulers.newSingle(" Threads 2").schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("0100");
        });
        Thread.sleep(1000);
    }
}

Schedulers.single(), Its function is to open up a new thread for the current operation , But remember , All operations using this method share a single thread ;

Schedulers.elastic(): A flexible unbounded thread pool .

Unbounded generally means unmanageable , Because it can cause negative pressure problems and too many threads to be created . So we're going to talk about its alternatives .

Schedulers.bounededElastic(): Bounded reusable thread pool

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("1478");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("2589");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("0363");
        });
        Thread.sleep(1000);
    }
}

Schedulers.boundedElastic() It's a better choice , Because it can create a worker pool when needed , And reuse the free pool ; meanwhile , Some pools will be discarded if they have more than a certain amount of idle time .

meanwhile , It also has a capacity limit , commonly 10 Twice as much as CPU The core number , This is the maximum capacity of its backup thread pool . Submit at most 10 Ten thousand tasks , It's then loaded into the task queue , Wait until it's available , If it's delay scheduling , Then the delay start time is calculated when the thread is available .

thus it can be seen Schedulers.boundedElastic() For blocked I/O Operation is a good choice , Because it allows each operation to have its own thread . But remember , Too many threads can put pressure on the system .

Schedulers.parallel(): Provides system level parallelism

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.parallel().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("6541");
        });
        Schedulers.parallel().schedule(() -> {
            System.out.println(" The current thread is :" + Thread.currentThread().getName());
            System.out.println("9874");
        });
        Thread.sleep(1000);
    }
}

Last ,Schedulers.parallel() Provides parallel capabilities , It creates a number equal to CPU Core number of threads to achieve this function .

Other thread operations

By the way , You can also use ExecutorService Create a new Scheduler. Of course ,Schedulers A bunch of newXXX The method can be .

One thing is very important , Namely boundedElastic() Method can be applied to traditional blocking code , however single() and parallel() Neither. , If you have to, you're going to throw the exception . Customize Schedulers Can be set by ThreadFactory Property to set whether the receiving thread is NonBlocking Interface modification Thread example .

Flux Some of the methods of will use the default Scheduler, such as Flux.interval() Methods are used by default Schedulers.parallel() Method , Of course, by setting Scheduler To change this default .

In a responsive chain , There are two ways to switch the execution context , Namely publishOn() and subscribeOn() Method , The position of the former in the flow chain is very important . stay Reactor in , You can add any number of subscribers in any form to meet your needs , however , After the subscription method is set, only after , In order to activate all objects in this subscription chain . That's the only way , The request will be traced back to the publisher , And then the source sequence .

Switch in the context of the subscription chain

publishOn()

publishOn() Just like normal operation , Add in the middle of the chain of operations , It affects the execution context of all operations below it . Look at an example :

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
        //  Create a parallel thread 
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                // map It must be running in T Upper .
                .map(i -> 10 + i)
                //  The execution context is switched to the parallel thread 
                .publishOn(s)
                //  This map Or running on parallel threads , because publishOn() The following operations are switched to another execution context .
                .map(i -> "value " + i);
        //  Suppose this new The thread that comes out is called T
        new Thread(() -> flux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn()

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
        //  It's still creating a parallel thread 
        Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> fluxflux = Flux
                .range(1, 2)
                //  But the map It's already in ss It's running out of here 
                .map(i -> 10 + i)
                //  Switch here , But it's the whole chain that switches 
                .subscribeOn(s)
                //  there map It also runs in ss On 
                .map(i -> "value " + i);
        //  This is an anonymous thread TT
        new Thread(() -> fluxflux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn() Method will switch the entire subscription chain after the subscription to the new execution context . No matter in subscribeOn() Where? , You can switch the subscription sequence after the first subscription , Yes, of course , If there's any more publishOn(),publishOn() A new switch will be made .

版权声明
本文为[SuanCaiYu]所创,转载请带上原文链接,感谢

Scroll to Top