编程知识 cdmana.com

Le chemin vers l'avancement du Big Data - - Spark SQL Summary


Manuscrit WordCount

UtiliserflatMap、reduceByKey Pour calculer

//sc- Oui.SparkContextObjet,L'objet est soumissparkEntrée du programme
sc.textFile("file:///home/hadoop/data/hello.txt") // Lire le fichier,
	.flatMap(line => line.split(" "))  // Placez chaque ligne de mots dans le fichier avec un séparateur(Voici les espaces)Séparation
	.map(word => (word,1))  //Compter chaque mot comme1
	.reduceByKey((x,y) => (x+y))  // Compter le nombre de mots identiques
	.collect

Abréviation

sc.textFile("file:///home/hadoop/data/hello.txt")
	.flatMap(_.split(" "))
	.map((_, 1))
	.reduceByKey(_ + _)
	.collect

RDD、DAG、 Stage、 Task 、 Job

RDD(Resilient Distributed Datasets),Ensembles de données distribués résilients
DAG(Directed Acyclic Graph),Graphique acyclique dirigé

RDD RDD - Oui. Spark L'âme de,Aussi appeléEnsembles de données distribués résilients.Un RDD Représente un ensemble de données en lecture seule qui peut être partitionné.RDD Il peut y avoir beaucoup de partitions à l'intérieur(partitions),Chaque partition a un grand nombre d'enregistrements(records).

DAG Spark MoyenneUtiliser DAG C'est exact. RDD Pour modéliser les relations,Décrit RDD Dépendance,Cette relation est aussi connue sous le nom de lineage(Consanguinité),RDD Utilisation des dépendances Dependency Entretien.

Stage In DAG En cours Stage Division de,La Division est fondée sur la question de savoir si la dépendance est shuffle De,Chaque Stage Peut être divisé en plusieurs Task.La prochaine chose à faire est Driver Envoyer Task À Executor,Executor Pool de Threads pour exécuter ces task,Retourner les résultats à Driver.

Job Spark De Job De l'exécution par l'utilisateur action Fonctionnement(C'est Spark Pratique Job),Depuis RDD Action pour obtenir des résultats,Au lieu d'en faire un RDD Convertir en un autre RDD De transformation Fonctionnement.

Task Un Stage Intérieur,Final RDD Combien? partition,Combien en résultera - t - il? task.

Spark Processus de soumission des emplois

img

  1. spark-submit Code de soumission,Mise en œuvre new SparkContext(),In SparkContext Structure Li DAGScheduler Et TaskScheduler.
  2. TaskScheduler Va passer par un processus en arrière - plan,Connexion Master,Vers Master Inscription Application.
  3. Master Reçu Application Après la demande,Sera utilisé en conséquenceAlgorithme de programmation des ressources,In Worker C'est pour ça. Application Démarrer plusieurs Executor
  4. Executor Après le démarrage,S'inscrire à l'envers TaskScheduler Moyenne.Tous les Executor Tous inscrits à Driver Après le début,SparkContext Initialisation finale,Ensuite, exécutez notre propre code.
  5. Chaque exécution à un Action,Crée un Job.Job Sera soumis à DAGScheduler.
  6. DAGScheduler Oui. Job Divisé en plusieurs Stage,Et chaque Stage Créer un TaskSet.
  7. TaskScheduler Ils vont mettre TaskSet À l'intérieur. Task,Soumis à Executor Exécution supérieure.
  8. Executor Pool de Threads sur,Reçu un à la fois Task,C'est bon. TaskRunner Encapsulation,Et puis retirer un thread du pool de Threads pour l'exécuter task.(TaskRunner Nous avons écrit le Code,Copie,Désérialisation,Mise en œuvre Task,Chaque Task Mise en œuvre RDD L'un des partition)

Spark De Local Et Standalone

SparkC'est tout.6Mode de fonctionnement:Local,Standalone,Yarn-Cluster,Yarn-Client, Mesos, Kubernetes

  1. Local: Local Mode autonome,Si aucune configuration n'est ajoutée à l'instruction de commande,La valeur par défaut est Local Mode,Fonctionnement local.C'est aussi un déploiement.、Définir le mode le plus simple,Tous les Spark Les processus fonctionnent sur une machine ou une machine virtuelle.
  2. Standalone: Standalone - Oui. Spark Cadre d'ordonnancement des ressources auto - implémenté.Si nous n'utilisons que Spark Effectuer des calculs de Big Data,Lorsqu'aucun autre cadre de calcul n'est utilisé,C'est fait. Standalone Le modèle suffit.,Surtout dans le cas d'un seul utilisateur.Standalone Le modèle est Spark Cadre d'ordonnancement des ressources mis en oeuvre,Ses principaux noeuds sont: Client Noeud、Master Noeuds et Worker Noeud.Parmi eux Driver Peut être exécuté en Master Au milieu du noeud,Peut également fonctionner localement Client Fin.- Oui. spark-shell Soumission interactive d'outils Spark De Job Heure,Driver In Master Exécuter sur le noeud;Lorsqu'il est utilisé spark-submit Présentation des outils Job Ou Eclipse、IDEA Attendre d'être utilisé sur la plateforme de développement new SparkConf.setManager(“spark://master:7077”) Mode de fonctionnement Spark Au moment de la Mission,Driver C'est local. Client À bout portant.

Standalone Le déploiement du modèle est fastidieux,Mais les scripts de déploiement sont officiellement disponibles,J'ai besoin d'aide. Spark Des paquets de déploiement sont installés sur chaque machine de noeud,Et le répertoire déployé doit être le même,Et il faut Master Noeuds et autres implémentations de noeuds SSH Connexion sans mot de passe.Au démarrage,Il faut d'abord démarrer Spark De Master Et Slave Noeud.Soumettre une commande est similaire à:

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://Oscar-2.local:7077 \
  /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
  100

Parmi eux master:7077- Oui. Spark De Master Nom d'hôte et numéro de port du noeud,Bien sûr, le cluster doit être lancé à l'avance.

Quel que soit le mode utilisé,SparkLe Code de la demande est identique,Il suffit de passer au moment de la soumission–masterParamètre pour spécifier notre mode de fonctionnement

  • Client
    DriverEn coursClientFin(SoumettreSparkMachine de travail)
    ClientEt la demandeContainerCommuniquer pour terminer l'ordonnancement et l'exécution des tâches,ClientOn ne peut pas sortir.
    Les informations du journal sont sorties sur la console:Facile à tester

  • Cluster
    DriverEn coursApplicationMasterMoyenne
    ClientIl suffit de soumettre le travail et de l'éteindre,Parce que le travail est déjà en coursYARNC'est parti.
    Le journal n'est pas visible au terminal,Parce que le journal estDriverAllez.,Uniquement paryarn logs -applicationIdapplication_id

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \

IciyarnC'est notreyarn clientMode
Si ouiyarn clusterLes mots du modèle,yarn-cluster

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

Si vous voulez courirYARNAu - dessus de,Alors vous devez définirHADOOP_CONF_DIROuYARN_CONF_DIR

1) export HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
2) $SPARK_HOME/conf/spark-env.sh

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \
4

yarn logs -applicationId application_1495632775836_0002

Large dépendance、Dépendance étroite

  1. Une dépendance étroite fait référence à chaque Parent RDD De Partition Plus de courtepointes RDD Un Partition Utiliser(Un par un.)

  2. Une large dépendance fait référence à plusieurs enfants RDD De Partition Dépend du même parent RDDDe partition(Un fils.)

RDD Comme structure de données,Est essentiellement unCollection d'enregistrements de partitions en lecture seule.Un RDD Peut contenir plusieurs partitions,Chaque partition est un fragment d'ensemble de données.

Tout d'abord,,Une dépendance étroite peut être prise en charge dansMême noeudAllez.,Par pipeline Exécution formelle de plusieurs commandes(C'est le même nom. Stage Fonctionnement),Par exemple, map Après,Immédiatement après la mise en œuvre filter.Au contraire.,Une large dépendance exige que toutes les partitions parent soient disponibles,Il faudra peut - être aussi appeler quelque chose comme MapReduce Ce genre d'opérationPassage à travers les noeuds.

Deuxièmement,,Du point de vue de la récupération échouée.La récupération échouée avec une dépendance étroite est plus efficace,Parce qu'il suffit de recalculer ce qui manque parent partition C'est tout.,Et peut être recalculé en parallèle à différents noeuds(Une machine trop lente peut être reprogrammée à plusieurs noeuds).

Spark SQLQue Hive Où est - il?

QuandMapLa sortie deReduceLors de l'utilisation,Les résultats de sortie doivent êtrekeyHashi,Et distribué à chaqueReducerMonte.,Ce processus estshuffle.
Parce queshuffleIl s'agit de la lecture et de l'écriture des disques et de la transmission du réseau,Donc,shuffleLe niveau de performance affecte directement l'efficacité de l'ensemble du programme.

Spark SQL Que Hadoop Hive Allez,Il y a des conditions.,Et non. Spark SQL Du moteur Hive Le moteur!,Au contraire.,Hive De HQL Le moteur est plus puissant que Spark SQL Moteur plus rapide.En fait...,Le fait est que Spark Rapide en soi.

  1. Suppression de la redondance HDFS Lire et écrire: Hadoop Chaque fois shuffle Après l'opération,Doit écrire sur le disque,Et Spark In shuffle Il n'est pas nécessaire de laisser tomber la plaque arrière,C'est bon. persist En mémoire,Pour l'itération.Si l'opération est compliquée,Beaucoup. shufle Fonctionnement,Alors Hadoop Lire et écrire IO Le temps va augmenter considérablement,C'est aussi Hive La principale raison de ce ralentissement.

  2. Suppression de la redondance MapReduce Phase: Hadoop De shuffle L'opération doit être complète MapReduce Fonctionnement,Redondance lourde.Et Spark Basé sur RDD Offre une grande variété d'opérations d'opérateur,Et reduce Production opérationnelle shuffle Données,Peut être mis en cache en mémoire .

  3. JVM Optimisation de: Hadoop Chaque fois MapReduce Fonctionnement,Démarrer un Task Ça va commencer une fois. JVM,Opérations axées sur les processus.Et Spark Chaque fois MapReduce L'opération est basée sur le thread,Seulement au démarrage Executor Oui, une fois. JVM,Mémoire Task L'opération est effectuéeRéutilisation des filsDe.Chaque fois qu'on démarre JVM Ça peut prendre quelques secondes, voire une douzaine de secondes,Alors quand Task Beaucoup.,C'est l'heure. Hadoop Je ne sais pas. Spark C'est un peu lent..

Précautions d'emballage

Soyez prudent lors de l'emballage,pom.xmlAjouter ce qui suitplugin

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

mvn assembly:assembly

./bin/spark-submit \
--class com.hiszm.log.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /home/hadoop/lib/ipDatabase.csv,/home/hadoop/lib/ipRegion.xlsx \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/input/* hdfs://hadoop001:8020/hiszm/clean 

Attention!:–filesInsparkUtilisation dans

spark.read.format("parquet").load("/hiszm/clean/day=20170511/part-00000-71d465d1-7338-4016-8d1a-729504a9f95e.snappy.parquet").show(false)

./bin/spark-submit \
--class com.hiszm.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/clean 20170511 

Sélection du format de stockage:http://www.infoq.com/cn/articles/bigdata-store-choose/
Sélection du format de compression:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/

Ajuster le parallélisme

./bin/spark-submit \
--class com.hiszm.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/clean 20170511 

版权声明
本文为[Sun zhongming]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/10/20211013211945026u.html

Scroll to Top