编程知识 cdmana.com

Thread and scheduler in reactor

brief introduction

Today we are going to introduce Reactor Multithreading model and timer model in ,Reactor We've already introduced , It's actually an extension of the observer pattern .

So essentially ,Reactor It's not about multithreading . You can use it in multithreading or not in multithreading .

Today, I will introduce you how to do it in Reactor Multithreading and timer model are used in .

Thread Multithreading

Let's take a look at the Flux An example of the creation of :

        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);

You can see , Whether it's Flux generator still subscriber, They are actually running in the same thread .

If we want to subscribe Occurs in a new thread , We need to start a new thread , And then within the thread subscribe operation .

        Mono<String> mono = Mono.just("hello ");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();

In the example above ,Mono Create in main thread , and subscribe It happened in the newly launched Thread in .

Schedule Timer

In many cases , our publisher It is necessary to call some methods regularly , To produce elements .Reactor Provides a new Schedule Class is responsible for the generation and management of timed tasks .

Scheduler It's an interface :

public interface Scheduler extends Disposable 

It defines some methods that must be implemented in timers :

For example, immediately executed :

Disposable schedule(Runnable task);

Delayed execution :

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

And regularly :

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule There is a tool class called Schedules, It provides multiple creation Scheduler Methods , Its essence is to ExecutorService and ScheduledExecutorService encapsulate , Think of it as Supplier To create Schedule.

Just look at it Schedule That's right ExecutorService Encapsulation .

Schedulers Tool class

Schedulers Tool classes provide many useful tool classes , Let's introduce in detail :

Schedulers.immediate():

The submitted Runnable Will immediately execute in the current thread .

Schedulers.single():

Use the same thread to perform all tasks .

Schedulers.boundedElastic():

Create a reusable thread pool , If the thread in the thread pool has not been used for a long time , Then it will be recycled .boundedElastic There will be a maximum number of threads , Generally speaking, it is CPU cores x 10. If there is no available worker Threads , The submitted task will be put in the queue to wait .

Schedulers.parallel():

Create a fixed number of worker threads , Sum of numbers CPU The kernel correlation of .

Schedulers.fromExecutorService(ExecutorService):

Create from an existing thread pool Scheduler.

Schedulers.newXXX:

Schedulers It provides a lot of new Opening method , To create all kinds of Scheduler.

Let's look at one Schedulers Specific application of , We can specify specific Scheduler To produce elements :

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn and subscribeOn

publishOn and subscribeOn It is mainly used for switching Scheduler Execution context .

Let's start with a conclusion , In chain calls ,publishOn Switchable Scheduler, however subscribeOn It doesn't work .

It's because of the real publish-subscribe The relationship is only in subscriber Start subscribe It's time to set up .

Let's take a look at the usage of these two methods :

publishOn

publishOn Can be in the chain call process , Conduct publish Handoff :

    @Test
    public void usePublishOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

Above we created a name called parallel-scheduler Of scheduler.

And then I created one Flux,Flux First made a map operation , Then switch the execution context to parallel-scheduler, Finally, right executed once map operation .

Last , We use a new thread to do subscribe Output .

Let's look at the output first :

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

You can see , The name of the main thread is Thread.Subscriber The name of the thread is ThreadA.

So in publishOn Before ,map The thread used is ThreadA. And in the publishOn after ,map The thread used is switched to parallel-scheduler Thread pool .

subscribeOn

subscribeOn It's used to switch Subscriber Execution context , No matter subscribeOn Which part of the call chain appears , It will eventually apply to the entire call chain .

Let's look at an example :

    @Test
    public void useSubscribeOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

alike , In the example above , We used two map, And then in two map Used a subscribeOn To switch subscribe Execution context .

Look at the output :

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

You can see , Either way map, It's all used. It's switched parallel-scheduler.

Examples of this article learn-reactive

The author of this article :flydean Program those things

Link to this article :http://www.flydean.com/reactor-thread-scheduler/

In this paper, the source :flydean The blog of

Welcome to my official account. :「 Program those things 」 The most popular interpretation , The deepest dry goods , The most concise tutorial , There are so many tricks you don't know about waiting for you to discover !

版权声明
本文为[floated]所创,转载请带上原文链接,感谢

Scroll to Top