编程知识 cdmana.com

Rxjava2 - analyse du code source pour Android, optimisation du paquet d'installation flutter

public static boolean isDisposed(Disposable d) {
//JugementDisposableLa référence de la variable de type estDISPOSED
//Pour déterminer si ce connecteur est déconnecté
return d == DISPOSED;
}

public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//Prends ça.fieldSet toDISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}

}

Je vois.DisposableHelperEst une classe d'énumération,Et il n'y a qu'une seule valeurDISPOSED.disposeLa méthode consiste à citer un atomefieldSet toDISPOSED,C'est l'état d'interruption.EtisDisposed()C'est sur la base de ce signe que l'interruption est jugée.

Regarde en arrière.CreateEmiiterClasseonNextCes méthodes

@Override
public void onNext(T t) {
//Omettre les codes non pertinents

if (!isDisposed()) {
//Si ce n'est pas le casdispose(),Pour appeleronNext()
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//Sidispose()C'est,Sera appelé ici,Qui finira par s'effondrer
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
//Omettre les codes non pertinents
if (!isDisposed()) {
try {
//Si ce n'est pas le casdispose(),Pour appeleronError()
observer.onError(t);
} finally {
//onError()Après çadispose()
dispose();
}
//Si ce n'est pas le casdispose(),Retourtrue
return true;
}
//Sidispose()C'est,Retourfalse
return false;
}

@Override
public void onComplete() {
if (!isDisposed()) {
try {
//Si ce n'est pas le casdispose(),Pour appeleronComplete()
observer.onComplete();
} finally {
//onComplete()Après çadispose()
dispose();
}
}
}

C'est facile à dire,

  • Si ce n'est pas le casdispose,observerDeonNextSera appelé
  • onErrorAveconCompleteLes méthodes s'excluent mutuellement,Un seul d'entre eux peut être appelé à,Parce que l'un d'eux est appelé,Ça coupera la connexion,dispose
  • D'abord.onErrorAprèsonComplete- Oui.onCompleteNe sera pas appelé,Dans l'autre sens,Ça va s'effondrer.,Parce queonErrorException lancée dans,En fait,,disposeAppelé aprèsonErrorÇa va s'effondrer.

Regardez encore l'opérateurMap

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, “mapper is null”);
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != NONE) {
downstream.onNext(null);
return;
}

U v;

try {
v = ObjectHelper.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”);
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}


}
}

Je vois.,L'opérateur est en fait le même que dans le cas particulier analysé ci - dessus,L'analyse est omise ici.
AndroidDeRxjava2-Analyse du code source,flutterOptimisation des paquets d'installation_Android

##Trois.RxjavaChangement de fil
C'est comme ça qu'on utilise

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer+1;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {

});

AdoptionsubscribeOnPour changer les fils en amont,observeOnPour changer les fils en aval.

Dans le code source,Qu'est - ce que?

subscribeOnAnalyse des sources
Schedulers.io()

subscribeOnIl existe plusieurs types de,C'est tout ce qu'il fautSchedulers.io()Pour analyser,Tout le reste est en fait similaire,Il suffit d'en analyser un.

@NonNull
public static Scheduler io() {
//Encorehook,C'est l'équivalentIO
return RxJavaPlugins.onIoScheduler(IO);
}

public final class Schedulers {

@NonNull
static final Scheduler IO;

static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}

static {
//Encorehook,C'est l'équivalentnew IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());

}


static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
}

Je vois.,C'est l'équivalent denew IoScheduler,Sans regarder sa mise en œuvre concrète.

subscribeOn

On continue à regarder.subscribeOnSource de

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

Comme avant,C'est un retour en arrière.new ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

observer.onSubscribe(parent);

 //L'extérieurparent.setDisposableC'est pour créer un connecteur,Pour couper les commandes et ainsi de suite plus tard,On peut juste regarder à l'intérieur
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

}

Il est construit poursourceEtschedulerGardez les deux,À utiliser plus tard.

Passons maintenant au processus d'abonnement,Bien qu'il s'agisse d'un changement de fil,Mais ce n'est qu'un opérateur,Comme nous l'avons analysé précédemment,Le processus d'abonnement est le même que ci - dessus,Pour qu'on sache,Après l'abonnement,ObservableSubscribeOnDesubscribeActualLa méthode est appelée.

La même chose.,subscribeActualDans la méthode,Il met notre observateur en aval personnaliséobserverL'emballage est devenuSubscribeOnObserverObjet,Puis appelezobserverDeonSubscribeMéthodes,Je vois.,Jusqu'à présent,Il ne s'est rien passé de Thread - related,Alors...observerDeonSubscribe()La méthode fonctionne dans le thread courant,Alors concentrons - nous surparent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));Méthodes.

Regardons d'abord.SubscribeTaskCatégorie

//- Oui.ObservableSubscribeOnClasse interne,RéalisationrunnableInterface,Regarde ça.,On sent le fil
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
//C'estsourceC'est ce que nous personnalisons.ObservableObjet,C'estObservableCreate
source.subscribe(parent);
}
}

Je vois.,Cette classe est très simple,C'est fait.RunnableInterface,InrunAppelé dans la méthodesource.subscribe(parent);,C'est un appel en chaîne,Il sera appelé couche par couche.

Regarde encore.scheduler.scheduleDirect

C'est la partie centrale du changement de fil,Regardez attentivement

public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

//runC'estSubscribeTask
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorkerInScheduleC'est une méthode abstraite dans la classe,Donc l'implémentation est dans la Sous - classe
//Donc cette méthode est dansIOScheduleRéalisé en
//workerPeut être exécutérunnabale
final Worker w = createWorker();

//En fait,decoratedRunEncore un.runObjet,C'est - à - direSubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

//runnableEtworkerEmballé dans unDisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);

//WorkerExécuter ceciTask
w.schedule(task, delay, unit);

return task;
}

Les notes de code ci - dessus sont très détaillées,scheduleDirectLa méthode est:,newUnworker,Et Utilisez ceciworkerPour mettre en œuvretaskThread.

Regarde encore.IoIoSchedulerMoyenne,createWorkerEtsheduleProcessus

public Worker createWorker() {
//newUnEventLoopWorkerEt passer unworkerDans le pool de cache de
return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//Du cacheworkerPrends - en un dans la piscineworkerSors de là.
this.threadWorker = pool.get();
}

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don’t schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

//RunnableÀ vous.threadWorkerPour mettre en œuvre
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

Notez que,DifférentSchedulerLes classes seront différentesWorkerRéalisation,Parce queSchedulerLa classe finit par être livrée àworkerPour effectuer l'expédition,Mais l'analyse n'est pas très différente.

Ensuite, regardonsworkerOpération de pool de cache pour

static final class CachedWorkerPool implements Runnable {

ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
//Le pool tampon n'est pas vide,Juste un dans le pool de cachethreadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

// No cached worker found, so create a new one.
//Vide en un et retourne
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}

}

Regarde encoreworkerCode d'exécution pourthreadWorker.scheduleActual

Suivi du Code,Vous trouverez l'implémentation dans sa classe mèreNewThreadWorkerMoyenne

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;

volatile boolean disposed;

public NewThreadWorker(ThreadFactory threadFactory) {
//Créer unScheduledExecutorServiceObjet
//Le pool de Threads peut être utilisé
executor = SchedulerPoolFactory.create(threadFactory);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//C'estdecoratedRunC'est l'équivalentrun
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

//Oui.decoratedRunEmballé dans un nouvel objetScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
//Exécution immédiate dans le pool de ThreadsScheduledRunnable
f = executor.submit((Callable<Object>)sr);
} else {
//Exécution retardée dans le pool de ThreadsScheduledRunnable
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {

}

return sr;
}

}

Ici.executorEst d'utiliser un pool de Threads pour exécuter des tâches,FinalsubscribeTaskDerunLa méthode est exécutée dans le pool de Threads,C'est - à - dire en amontObservableDesubscribeLa méthode seraIOAppelé dans le thread.

Résumé

  • ObserverDeonSubscribeLa méthode fonctionne dans le thread courant,Parce qu'il n'y a pas de changement de fil dans le code source
  • Si définisubscribeOn(Spécifiez le thread),AlorsObservableDanssubscribeLa méthode fonctionnera dans le thread spécifié.
  • Quand plusieurssubscribeOnAu moment de l'appel,Parce que d'après le code source,Le changement de fil est de bas en haut,Enfin, c'est le premier processus de basculement d'un appel en chaîne,C'est un changement valide

observeOnAnalyse des sources

.observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()
La même chose.,Regardons d'abordAndroidSchedulers.mainThread()Source de

public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

Ce code croit que si vous regardez l'analyse du code source ci - dessus,Ça se voit en un coup d'oeil.,C'est l'équivalent denew HandlerScheduler(new Handler(Looper.getMainLooper()), false);,Mettre un thread principalHandlerEmballé dansHandlerSchedulerMoyenne.

observeOn
Et on continue à regarderobserveOnSource de

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
ObjectHelper.verifyPositive(bufferSize, “bufferSize”);
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

Vous pouvez également le savoir à partir du code source,C'est l'équivalent direct denew ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
//Déterminer si c'est le fil courant
if (scheduler instanceof TrampolineScheduler) {
//Si c'est le thread actuel,Appelez directement en avalsubscribeMéthodes
//C'est - à - dire appeler le prochainObservableDesubscibeMéthodes
source.subscribe(observer);
} else {
//Créationworker
//Dans cet exempleschedulePourHandlerScheduler
Scheduler.Worker w = scheduler.createWorker();

//C'est un peu similaire à l'analyse ci - dessus,Oui.workerEmballé dansObserveOnObserverMoyenne
//Attention!:source.subscribeNon impliquéworker,Donc il est toujours exécuté dans le thread défini entre
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

}

Tout d'abord,,Décidez si vous êtes déjà sur le fil à basculer,Si c'est le cas,Alors appelez directement.Si ce n'est pas le cas,,Alors, utilisezHandlerSchedulerEmballez - le.worker,Et à traversworkerPour changer les événements en aval,Abonnement direct en amont,Pas de Threading.

Allons voir.ObserveOnObserverCode source de la classe

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
//Mettre l'information dans la file d'attente
queue.offer(t);
}
schedule();
}

void schedule() {
if (getAndIncrement() == 0) {
//Appelez ici
worker.schedule(this);
}
}

void drainNormal() {
int missed = 1;

//File d'attente pour stocker les messages
final SimpleQueue<T> q = queue;
//Ici.downstramEn fait, c'est en avalobserver
final Observer<? super T> a = downstream;

for (; {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (; {
boolean d = done;
T v;

try {
//Extraire le message de la file d'attente
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}

//Appelez en avalObserverDeonNext
a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

@Override
public void run() {
//outputFusedPar défautfalse
if (outputFused) {
drainFused();
} else {
// Donc l'appel par défaut drainNormal
drainNormal();
}
}

}

L'analyse de l'appel de chaîne ci - dessus peut être connue ,source.subscribe(observer)Lorsqu'il est appelé,Va l'appeler

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}

Je sais., Les événements en amont seront envoyés en aval par l'émetteur ,Puis Abonnez - vous,Alors...ObserveOnObserverDans cette classe,onNext Cette méthode appelle ,Et ensuite exécuterschedule,Mise en œuvre finaleworker.schedule(this);,Parce que ce qui arriverunnable- Oui.this, Qui est appelé à cette classe après le dernier thread runMéthodes,Mise en œuvrerun,Exécution finaledrainNormal()Méthodes.
Alors regardons encore workerCommentcreate Et appelé .
On sait déjà ,C'estschedule- Oui.HandlerSchedule

final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;

HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}

@Override
public Worker createWorker() {
//Du fil principalhandler
return new HandlerWorker(handler, async);
}
}

Regarde.HandlerWorkerDescheduleMéthodes

private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;

private volatile boolean disposed;

HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}

@Override
@SuppressLint(“NewApi”) // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException(“run == null”);
if (unit == null) throw new NullPointerException(“unit == null”);

if (disposed) {
Enfin, pour vous aider à comprendreAndroidPrincipes des points de connaissance pertinents et connaissances pertinentes pour l'entrevue,C'est là que j'ai rassemblé les informations pertinentes.24EnsembleTencent、Byte Runout、Ali!、Baidu2020-2021Analyse des vrais problèmes d'entrevue,J'ai rassemblé les points techniquesVidéo etPDF(Il a fallu plus d'énergie que prévu),SacLe contexte de la connaissance + Beaucoup de détails.

Et?Technologie architecturale avancée cerveau、AndroidÉlaboration d'un profil d'entrevue?Aider tout le monde à apprendre à progresser,Vous économisez également du temps en recherchant des documents en ligne pour apprendre,Peut également partager avec les amis autour d'apprendre ensemble.

AndroidDeRxjava2-Analyse du code source,flutterOptimisation des paquets d'installation_Développement mobile_02

AndroidDeRxjava2-Analyse du code source,flutterOptimisation des paquets d'installation_Android_03

AndroidDeRxjava2-Analyse du code source,flutterOptimisation des paquets d'installation_Programmeur_04

Cliquez sur:

 AndroidVidéo de schéma+BATSujet de l'entrevuePDF+Notes d'étude》Disponible gratuitement~

Apprentissage en ligne AndroidUn tas d'informations.,Mais si les connaissances acquises ne sont pas structurées,Quand vous rencontrez des problèmes, vous n'avez qu'à essayer.,Plus d'étude approfondie,Il est donc difficile d'obtenir une véritable amélioration technologique.J'espère que ce système technologique systématique vous donnera une orientation.

版权声明
本文为[Mb61c1dbbb44788]所创,转载请带上原文链接,感谢
https://cdmana.com/2022/01/202201150207009053.html

Scroll to Top