《C'est démodé.》

Il n'y a rien de plus long que d'attendre

On ne peut pas attendre éternellement quelqu'un

Comme une demande.

Toujours attendre la réponse

Traitement des délais

java Écrire à partir de zéro RPC (01) Basé sur socket Réalisation

java Écrire à partir de zéro RPC (02)-netty4 Mise en œuvre du client et du serveur

java Écrire à partir de zéro RPC (03) Comment implémenter le serveur d'appel client?

java Écrire à partir de zéro RPC (04) Sérialisation

java Écrire à partir de zéro RPC (05) Réalisation universelle basée sur la réflexion

Nécessité

Nous avons mis en place rpc,Mais il y a un problème.,Il n'y a pas de délai pour synchroniser la réponse.

Si server C'est parti., Ou trop lentement. ,Le client ne peut pas non plus rester stupide et attendre.

Lorsque l'appel externe dépasse le temps spécifié, C'est une erreur. , Éviter la consommation inutile de ressources .

Idées

Au moment de l'appel, Conserver l'heure de début .

Détecter les temps d'arrêt lors de l'acquisition .

Créer un thread en même temps , Utilisé pour détecter les demandes de temporisation .

Réalisation

Idées

Au moment de l'appel, Conserver l'heure de début .

Détecter les temps d'arrêt lors de l'acquisition .

Créer un thread en même temps , Utilisé pour détecter les demandes de temporisation .

Fil de détection des temps d'arrêt

Afin de ne pas affecter le rendement des activités normales,Un autre thread vérifie si l'appel a expiré.

package com.github.houbb.rpc.client.invoke.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
import com.github.houbb.rpc.common.support.time.impl.Times; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; /**
* Fil de détection des temps d'arrêt
* @author binbin.hou
* @since 0.0.7
*/
public class TimeoutCheckThread implements Runnable{ /**
* Demande d'information
* @since 0.0.7
*/
private final ConcurrentHashMap<String, Long> requestMap; /**
* Demande d'information
* @since 0.0.7
*/
private final ConcurrentHashMap<String, RpcResponse> responseMap; /**
* Nouveau
* @param requestMap Demande Map
* @param responseMap Résultats map
* @since 0.0.7
*/
public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,
ConcurrentHashMap<String, RpcResponse> responseMap) {
ArgUtil.notNull(requestMap, "requestMap");
this.requestMap = requestMap;
this.responseMap = responseMap;
} @Override
public void run() {
for(Map.Entry<String, Long> entry : requestMap.entrySet()) {
long expireTime = entry.getValue();
long currentTime = Times.time(); if(currentTime > expireTime) {
final String key = entry.getKey();
// Résultat réglé à Timeout , De la demande map Supprimer dans
responseMap.putIfAbsent(key, RpcResponseFactory.timeout());
requestMap.remove(key);
}
}
} }

Principales demandes de stockage ici ,Temps de réponse,Si le temps d'arrêt, Supprimer la demande correspondante .

Thread Start

In DefaultInvokeService Démarrer lors de l'initialisation :

final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);

DefaultInvokeService

Résultats de configuration originaux , Il n'y a pas de temps pour obtenir les résultats , Ajoutez le jugement correspondant ici. .

Définir l'heure de la demande

  • Ajouter une demande addRequest

Met le temps écoulé directement dans map Moyenne.

Parce que le placement est une opération , La requête peut être multiple .

Le temps est donc calculé au moment de la mise en place.

@Override
public InvokeService addRequest(String seqId, long timeoutMills) {
LOG.info("[Client] start add request for seqId: {}, timeoutMills: {}", seqId,
timeoutMills);
final long expireTime = Times.time()+timeoutMills;
requestMap.putIfAbsent(seqId, expireTime);
return this;
}

Définir les résultats de la demande

  • Ajouter une réponse addResponse
  1. Si requestMap Cette demande d'information n'existe plus, Indique un délai possible , Ignorer directement les résultats du dépôt .

  2. Le temps d'arrêt est détecté pour le moment , Timeout renvoie directement l'information Timeout .

  3. Après avoir placé le message , Aviser tous les autres processus en attente .

@Override
public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
// 1. Pour juger de l'efficacité
Long expireTime = this.requestMap.get(seqId);
// Si vide,Peut - être que le résultat a expiré., Chronométré job Après enlèvement, Les résultats arrivent. .Ignorer directement
if(ObjectUtil.isNull(expireTime)) {
return this;
} //2. Déterminer s'il y a un délai
if(Times.time() > expireTime) {
LOG.info("[Client] seqId:{} Message expiré , Renvoie directement le résultat du délai .", seqId);
rpcResponse = RpcResponseFactory.timeout();
} // Mettez - le ici avant,Peut ajouter un jugement.
// Si seqId La collecte des demandes doit être traitée,Pour permettre l'insertion de.Ou simplement ignorer le rejet.
// Informer toutes les parties en attente
responseMap.putIfAbsent(seqId, rpcResponse);
LOG.info("[Client] Obtenir des informations sur les résultats,seqId: {}, rpcResponse: {}", seqId, rpcResponse);
LOG.info("[Client] seqId:{} L'information a été placée dans,Informer toutes les parties en attente", seqId);
// Supprimer la correspondance requestMap
requestMap.remove(seqId);
LOG.info("[Client] seqId:{} remove from request map", seqId);
synchronized (this) {
this.notifyAll();
}
return this;
}

Obtenir les résultats de la demande

  • Obtenir la correspondance getResponse
  1. Si le résultat existe , Renvoie directement les résultats de la réponse

  2. Sinon, entrez et attendez. .

  3. Obtenir les résultats après la fin de l'attente .

@Override
public RpcResponse getResponse(String seqId) {
try {
RpcResponse rpcResponse = this.responseMap.get(seqId);
if(ObjectUtil.isNotNull(rpcResponse)) {
LOG.info("[Client] seq {} Les résultats correspondants ont été obtenus: {}", seqId, rpcResponse);
return rpcResponse;
}
// Entrez et attendez
while (rpcResponse == null) {
LOG.info("[Client] seq {} Le résultat correspondant est vide,Entrez et attendez", seqId);
// Synchroniser les serrures d'attente
synchronized (this) {
this.wait();
}
rpcResponse = this.responseMap.get(seqId);
LOG.info("[Client] seq {} Les résultats correspondants ont été obtenus: {}", seqId, rpcResponse);
}
return rpcResponse;
} catch (InterruptedException e) {
throw new RpcRuntimeException(e);
}
}

Vous pouvez voir que la logique de la partie get n'a pas changé,Parce que timeout renvoie un objet timeout:RpcResponseFactory.timeout();

C'est une mise en œuvre très simple ,Comme suit:

package com.github.houbb.rpc.common.rpc.domain.impl;

import com.github.houbb.rpc.common.exception.RpcTimeoutException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse; /**
* Classe d'usine de réponse
* @author binbin.hou
* @since 0.0.7
*/
public final class RpcResponseFactory { private RpcResponseFactory(){} /**
* Informations sur l'exception de temporisation
* @since 0.0.7
*/
private static final DefaultRpcResponse TIMEOUT; static {
TIMEOUT = new DefaultRpcResponse();
TIMEOUT.error(new RpcTimeoutException());
} /**
* Obtenir les résultats de la réponse Timeout
* @return Résultats de la réponse
* @since 0.0.7
*/
public static RpcResponse timeout() {
return TIMEOUT;
} }

Le résultat de la réponse spécifie une exception de délai ,Cette exception sera lancée lorsque l'agent traitera les résultats:

RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
Throwable error = rpcResponse.error();
if(ObjectUtil.isNotNull(error)) {
throw error;
}
return rpcResponse.result();

Code d'essai

Serveur

Nous avons délibérément ajouté l'implémentation côté serveur à Sleepy, Les autres restent inchangés .

public class CalculatorServiceImpl implements CalculatorService {

    public CalculateResponse sum(CalculateRequest request) {
int sum = request.getOne()+request.getTwo(); // Somnolence délibérée 3s
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} return new CalculateResponse(true, sum);
} }

Client

Définir le délai correspondant à 1S,Autres inchangés:

public static void main(String[] args) {
// Informations sur la configuration du Service
ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
config.serviceId(ServiceIdConst.CALC);
config.serviceInterface(CalculatorService.class);
config.addresses("localhost:9527");
// Définir le délai à1S
config.timeout(1000); CalculatorService calculatorService = config.reference();
CalculateRequest request = new CalculateRequest();
request.setOne(10);
request.setTwo(20); CalculateResponse response = calculatorService.sum(request);
System.out.println(response);
}

Les journaux sont les suivants:

.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 14:59:40.974] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC Démarrage du service client
...
[INFO] [2021-10-05 14:59:42.504] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC Client de démarrage du Service terminé,Adresse d'écoute localhost:9527
[INFO] [2021-10-05 14:59:42.533] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb', createTime=1633417182525, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 14:59:42.534] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 62e126d9a0334399904509acf8dfe0bb, timeoutMills: 1000
[INFO] [2021-10-05 14:59:42.535] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
...
Exception in thread "main" com.github.houbb.rpc.common.exception.RpcTimeoutException
at com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.<clinit>(RpcResponseFactory.java:23)
at com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
at com.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
...
[INFO] [2021-10-05 14:59:45.615] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb Message expiré , Renvoie directement le résultat du délai .
[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] Obtenir des informations sur les résultats,seqId: 62e126d9a0334399904509acf8dfe0bb, rpcResponse: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb L'information a été placée dans,Informer toutes les parties en attente
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb remove from request map
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:59:45.619] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 62e126d9a0334399904509acf8dfe0bb Les résultats correspondants ont été obtenus: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
...

On peut le découvrir.,Exception de délai.

Lacunes

Le traitement des temps d'arrêt peut être étendu dans les deux sens,Par exemple, le serveur peut également spécifier une limite de temps d'arrêt,Éviter le gaspillage des ressources.

Résumé

Pour que tout le monde puisse apprendre,Le code source ci - dessus est déjà ouvert:

https://github.com/houbb/rpc

J'espère que cet article vous aidera,Si ça vous plaît,Bienvenue à la collection d'accueil.

Je suis un vieux cheval.,J'ai hâte de te revoir..

java Écrire à partir de zéro RPC (07)-timeout Autre article Afghanistan

  1. ManuscritRPCUn autre ensemble de codes d'annotation attentifs est envoyé au nord du cadre.

    Angular8Officiellement publié,Java13Dans quelques mois.,Les itérations techniques sont si rapides,Le cadre devient de plus en plus complexe,Mais le principe est fondamentalement le même.Il est donc important de se concentrer sur la nature du Code,Cette fois, c'est écrit à la main.RPCCadre. Code complet et ...

  2. Tu écriras quand tu liras ça.RPCCadre

    Un..Que pouvez - vous apprendre de cet article?? RPCConcept et processus opérationnel RPCAccord etRPCLe concept de cadre NettyUtilisation de base JavaTechniques de sérialisation et de désrialisation ZookeeperUtilisation de base(Registre) Mise en œuvre de la logique opérationnelle spéciale par annotation personnalisée ...

  3. Écrire à partir de zéro dubbo rpc Cadre

    rpc rpc Est basé sur netty Réalisé java rpc Cadre,Similaire à dubbo. Principalement pour l'apprentissage personnel,De la profondeur à la profondeur,Compris. rpc Principes de mise en œuvre sous - jacents. Préface Travail à ce jour,Contact rpc Ça fait longtemps.. ...

  4. java - day015 - Liste bidirectionnelle manuscrite, Anomalie((suite)), IO(I / o)

    Allocation de mémoire pour la classe Charger dans la zone de méthode Objet en mémoire tas Variable locale en mémoire de pile Déterminer le type réel,Classes chargées dans la zone de méthode Objet.getClass(); Nom de la classe.class; Liste bidirectionnelle manuscrite package day1501_Liste bidirectionnelle manuscrite ...

  5. ManuscritRPC

    Code du serveur package com.peiyu.rpcs.bios; import java.io.IOException; public interface IRpcServers { void s ...

  6. ManuscritRPCCadre(netty+zookeeper)

    RPCQu'est - ce que c'est??Appel de procédure à distance,Le processus est le traitement des affaires.Tâches de calcul,Appeler une procédure distante comme appeler une méthode locale. RMIEtRPCQuelle est la différence entre?RMIEst un appel de méthode distant,- Oui.oopDans le domaineRPCUne réalisation de,Ce que nous connaissonsrestfull ...

  7. Écrire à partir de zéro spring ioc Cadre,En savoir plus spring Code source

    IoC Ioc C'est un spring ioc .Fonctionnalité de base version simplifiée de la mise en oeuvre,Faciliter l'apprentissage et la compréhension des principes. But de la création Utiliser spring Très longtemps.,Pour spring Très souvent utilisé,En fait, le code source n'a jamais été calme pour apprendre ...

  8. JavaCulture——Projet de serveur d'écriture

    Aperçu des travaux du projet: 1.DispatcherCatégorie(Une demande et une réponse sontDispatcher) package com.bjsxt.server; import java.io.IOException; i ...

  9. Basé surnettyManuscritRPCCadre

    Structure du Répertoire de codes rpc-commonStockage public rpc-interfacePourrpcInterface que l'appelant doit appeler rpc-registerEnregistrement et découverte des services fournis rpc-clientPourrpcImplémentation sous - jacente de l'appelant rpc- ...

  10. javaExamen écrit algorithmes manuscrits questions d'entrevue contenant des réponses

    1.Compter le nombre de mots d'un article en anglais.public class WordCounting {public static void main(String[] args) {try(FileReader fr ...

Recommandation aléatoire

  1. linuxCommandes courantes(2)pwdLes ordres

    pwd Les ordres1 Format de commande:pwd [Options]2 La fonction de commande affiche le chemin complet du Répertoire de travail actuel3 Les paramètres communs n'ont généralement aucun paramètre si le répertoire est lié:pwd -P Afficher le chemin réel , Au lieu d'utiliser le chemin du lien 4 Exemples courants:4.1 Avecpwd ...

  2. RGB Lorsque le paramètre dans la couleur est une variable , Pourquoi en ajouter deux? + C'est à gauche et à droite. ?

    <script> function draw(){ var c=document.getElementById("mycanvas"); var cxt=c.getCo ...

  3. À propos dewebpack Meilleure documentation

    Ces jours de recherche webpackOutils d'emballage, J'ai fait des recherches sur Internet. ,Mélange de poissons et de dragons. J'ai lu des dizaines de documents. ,Il n'y a toujours pas de description complète. Ça fait si longtemps. , J'ai abandonné le traitement. . Retour au site officiel , Lire mot pour mot , Dans une heure. . Apprendre nouveau ...

  4. titlebarEtactionbar Paramètres du bouton sur

    ---Début de la récupération du contenu--- Actionbar Ajouter un bouton : InresNouveau sous le dossiermenuDossiers(Si tu n'avais pas),Et ajouter unXMLDocumentation <?xml version="1.0" enc ...

  5. lvchangeDeavailableParamètres

    available Les paramètres sont man info help Aucun de ces paramètres , En fait, le paramètre est :activate Écris ça. . Ça vaut la peine d'être remarqué. . available Paramètre réel :  -a, --activate [a|e|l] ...

  6. SqlDataAdapter.Update Mise à jour des données par lots

    SqlDataAdapter.Update Mise à jour des données par lots UtiliserSqlDataAdapter.UpdateIl est facile d'accéder rapidement à la base de données. Mise à jour des données par lots .Notre méthode de mise à jour multi - données la plus courante est d'effectuer plusieurs fois avec des bouclesSQLDéclarations ...

  7. BZOJ 2134: Dislocation unidirectionnelle ( Attentes )

    Noi À remplir i+1 Le score attendu pour 1/max(a[i],a[i+1]). Selon la linéarité souhaitée , Il suffit d'additionner les attentes pour chaque option. ---------------------------------- ...

  8. HandlerMécanisme de messagerie——Handler Classe concise

    Handler La classe a deux rôles principaux : Envoyer un message dans un thread nouvellement lancé . Obtenir dans le fil principal .Traitement des messages. C'est très simple. , C'est en deux étapes. : Envoyer un message dans un thread nouvellement lancé : Puis obtenir sur le fil principal .Et traiter les messages. Mais ce processus implique ...

  9. Le vin vieilli - Winform ListView Contrôle double click Obtenir sélectionné dans l'événement rowAveccolumn

    Contexte Une demande de maintenance a été reçue récemment pour un projet précédent,Le propriétaire était un jeune homme qui venait de travailler~~~ Avant le projet, .net/winform. Rouvrir le Code aujourd'hui , Avant de regarder FrameWork2.0Le code suivant, Plein de souvenirs ...

  10. JavaChapitre avancé(Quatre)——JavaGestion des exceptions

    Il y a toujours des problèmes dans la procédure. ,Pour un fonctionnement normal pendant l'exécution du programme,UtiliserJavaLe mécanisme de gestion des exceptions fourni capture les exceptions possibles,Gérer les exceptions et permettre au programme de fonctionner correctement.C'est ça.JavaGestion des exceptions pour. Un.. Exception traçable JavaMoyenne ...