Skip to content
Snippets Groups Projects
Commit f7e0812b authored by Helene Coullon's avatar Helene Coullon
Browse files

tp5 et 6 SparkSQL et Spark streaming

parent c3b91d56
No related branches found
No related tags found
No related merge requests found
# Spark SQL
Dans ce TP, nous utilisons les mêmes outils que dans le TP précédent (WSL ou le conteneur Jupyter dans Docker).
Le notebook [spark_sql.ipynb](spark_sql.ipynb) reprend les jobs vus en cours. Executez-les différentes cellules et observez les résultats obtenus. Vous aurez à retirer les modules Docker des précédents TP. Vous pouvez le faire avec la commande `docker compose down` dans les répertoires appropriés ou en vous servant du plugin Docker de VsCode.
Ensuite, répondez aux questions suivantes. Vous aurez besoin de créer un nouveau notebook.
## Ensemble de données
Le fichier [departuresdelay.csv.zip] comprend des données concernant des vols internes aux Etats-Unis (données extraites de https://catalog.data.gov).
Les données ont 5 colonnes :
- date : contient des chaînes comme 02190925, pour 02-19 09:25am ;
- delay : retard en minutes entre l'heure prévue et l'heure réelle de départ, des départs anticipés sont représentés par des nombres négatifs ;
- distance : la distance en miles entre aéroport de départ et d'arrivée ;
- origin et destination : les codes IATA des aéroports de départ et d'arrivée.
Décompressez le fichier `departuredelays.zip` et placez le fichier `departuredelays.csv` dans le conteneur Jupyter (utilisez la commande `docker cp departuredelays.csv jupyter:/departuredelays.csv` dans ce cas). Utilisez ensuite minio comme dans le TP précédent.
### Première requête
Combien de lignes a cette table ?
Répondre à la requête de deux manières :
- en utilisant une vue et une requête SQL sur cette vue ;
- en utilisant simplement l'API des blocs de données / DataFrames.
## Requêtes
Ecrire les requêtes suivantes des deux manières (cf transparents "DataFrames vs Views" et "Projection et sélection" du cours 6) :
- Quels sont les vols dont la distance est supérieure à 1000 miles ?
- Trier ces vols par ordre descendant. Voici un exemple de tri utilisant l'API où df est un bloc de données comportant une colonne id : df.sort(col("id").asc()).
- Quels sont les vols entre San Francisco (SFO) et Chicago (ORD) qui ont au moins deux heures de retard ?
## Puzzle
En utilisant l'API des blocs de données et les fonctions qui permettent de transformer des chaînes en dates, transformer la colonne date en un format lisible et déterminer quand ces retards sont les plus fréquents.
\ No newline at end of file
File added
This diff is collapsed.
# Spark Streaming
## Requête du cours
Vous aurez à retirer les modules Docker des précédents TP. Vous pouvez le faire avec la commande `docker compose down` dans les répertoires appropriés ou en vous servant du plugin Docker de VsCode.
## Ensemble de données
L'archive disponible sur Moodle contient un ensemble d'articles du journal Le Monde publiés depuis 2016 (les noms de fichiers commencent par la date de parution papier de l'article sous la forme "YYMMdd").
## Calcul des mots les plus fréquents en batch
Pour travailler sur la requête, on va commencer en batch en reprenant le schéma général du TP précédent dans un objet WordCount (qui crée une session Spark) :
- lire dans un bloc de données lines les données d'un des fichiers, par exemple, allData/160109-histoire.txt (comme son extension l'indique, il s'agit d'un fichier au format text) ;
- transformer ces lignes en des mots ;
- à l'aide d'une agrégation (groupBy().count()) et d'un tri, calculer une table des mots et de leur compte en rangeant les mots les plus fréquents en premier.
Dans un deuxième temps, modifier la requête afin de ne sélectionner que les mots de longueur supérieure ou égale à 6.
## Calcul des mots les plus fréquents en micro-batch
Il s'agit maintenant de reprendre la requête précédente pour l'appliquer incrémentalement à tous les fichiers du répertoire data (initialement vide).
Définir un nouvel objet StreamingWordCount et reprendre les 5 "étapes pour définir une requête" :
- définir la source : le répertoire data avec des fichiers au format text ;
- transformer les données : is suffit de reprendre la requête précédente ;
- définir la sortie et son mode : on utilise la console. Il est possible d'augmenter le nombre de lignes affichées en utilisant l'option "numRows" dont la valeur est bien sûr le nombre de lignes que l'on veut afficher ;
- préciser les détails de traitement : on utilise le déclenchement par défaut, avec un répertoire de reprise "checkpoint" à la racine du projet ;
- démarrer la requête.
Il faut ensuite "donner à manger" à la requête. Pour cela, il suffit de copier, petit à petit pour observer l'incrémentalité du calcul, des fichiers du répertoire allData au répertoire data.
Une manière simple de faire est de se placer dans un shell à la racine du projet, puis de copier les fichiers d'un répertoire à l'autre année par année (en attendant que des premiers résultats aient été produits avant de passer à l'année suivante). Pour cela, servez-vous du notebook [tp-files-init.ipynb](tp-files-init.ipynb) dont le but est de copier dans Minio les fichiers à étudier (10 par 10). N'hésitez pas à stopper le traitement et utiliser la cellule de remise à zéro pour la mise au point.
...
Pour mémoire :
- réexécuter le programme depuis le départ demande d'effacer le répertoire qui stocke le point de reprise, sinon l'exécution reprend de ce point de reprise. Utilisez la cellule correspondante dans le notebook [tp-files-init.ipynb](tp-files-init.ipynb).
- il est possible de visualiser des détails concernant l'exécution via http://localhost:4040/ (il n'y a pas de rafraîchissement automatique).
### Variations
Il est possible d'affiner le filtrage. Etendez la requête :
- en considérant un ensemble de mot exclus val excludedWords = Set("quelques", "toujours", ...) et en ne gardant les mots word qui n'appartiennent pas à cet ensemble donc tels que ! excludedWords.contains(word) ;
- en considérant des mots de plus de 7 lettres ;
- en effacant les signes de ponctuation en fin de mot.
## Exercice supplémentaire
De nombreuses API REST sont disponibles dans le cadre de l'OpenData. L'[API TAN/Naolib](https://open.tan.fr/doc/openapi) en est un bon exemple.
- Préparez un job de streaming récupérant les prochains passages à l'arrêt Chantrerie-Grandes Ecoles, et calculant et affichant la durée entre les deux prochains bus C6 (direction Hermeland). Le code de l'arrêt Chantrerie-Grandes Ecoles est "CTRE".
Conseil: utilisez Minio pour et traiter les fichiers JSON de l'API.
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment