Skip to content
Snippets Groups Projects
Commit 80deb151 authored by Helene Coullon's avatar Helene Coullon
Browse files

tp6 base + Naolib

parent 0c97d4a1
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags:
# Correction TP6
%% Cell type:code id: tags:
``` python
### Configuration Mac : utilisation de notebook Jupyter
from pyspark import SparkContext, SparkConf
conf = SparkConf() \
.setAppName('SparkApp') \
.setMaster('spark://spark:7077') \
.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
.set("spark.sql.shuffle.partitions", "10")
sc = SparkContext.getOrCreate(conf=conf)
from pyspark.sql import SQLContext
# Créer un SQLContext pour les opérations SQL
sql_context = SQLContext(sc)
minio_ip_address = "minio"
```
%% Cell type:code id: tags:
``` python
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"http://{minio_ip_address}:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "root")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
from minio import Minio
client_minio = Minio(
f"{minio_ip_address}:9000",
access_key="root",
secret_key="password",
secure=False
)
# Création du bucket tp6
if client_minio.bucket_exists("tp6") == False:
client_minio.make_bucket("tp6")
```
%% Cell type:markdown id: tags:
## Exercice 1
%% Cell type:code id: tags:
``` python
client_minio.fput_object("tp6", "160109-histoire.txt", "allData/160109-histoire.txt")
```
%% Cell type:code id: tags:
``` python
# Chargement des données
file_path = "s3a://tp6/160109-histoire.txt"
#lines = sql_context.read.text(file_path).collect()
#print(lines)
lines = sql_context.read.text(file_path).rdd.map(lambda r: r[0]) # pourquoi le map ??? pourquoi le read et
lines = sql_context.read.text(file_path).rdd.map(lambda r: r[0])
print(lines.collect())
#pas textfile comme TP avant ?
```
%% Cell type:code id: tags:
``` python
# Transformation : Diviser les lignes en mots
words = lines.flatMap(lambda line: line.split(" "))
# Agrégation : Compter les mots
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# Trier par fréquence décroissante
sorted_word_counts = word_counts.sortBy(lambda x: x[1], ascending=False)
# Afficher les résultats
print("Table des mots et leurs comptes :")
for word, count in sorted_word_counts.take(10):
print(f"{word}: {count}")
# Filtrer les mots avec une longueur >= 6
filtered_word_counts = sorted_word_counts.filter(lambda x: len(x[0]) >= 6)
# Afficher les résultats filtrés
print("Mots de longueur >= 6 et leurs comptes :")
for word, count in filtered_word_counts.take(10):
print(f"{word}: {count}")
```
%% Cell type:markdown id: tags:
## Exercice 2 : Stream
%% Cell type:code id: tags:
``` python
from pyspark.sql.functions import explode, split, col
# Étape 2 : Définir la source (le répertoire "data" contenant les fichiers texte)
input_dir = "s3a://tp6/allData" # Répertoire de surveillance
checkpoint_dir = "checkpoint" # Répertoire pour la reprise
lines = sql_context.readStream \
.format("text") \
.load(input_dir)
# Étape 3 : Transformer les données
# Diviser les lignes en mots
words = lines.select(explode(split(col("value"), " ")).alias("word")) # pas clair pour moi
words = lines.select(explode(split(col("value"), " ")).alias("word"))
# Compter les occurrences des mots
word_counts = words.groupBy("word").count()
# Sort word counts in descending order
sorted_word_counts = word_counts.orderBy(col("count").desc())
# Étape 4 : Définir la sortie
# Sortie sur la console avec affichage de 20 lignes par micro-batch
query = sorted_word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("numRows", 20) \
.option("truncate", False) \
.start()
# Étape 5 : Démarrer la requête
query.awaitTermination()
# https://stackoverflow.com/questions/61463554/structured-streaming-output-is-not-showing-on-jupyter-notebook
```
%% Cell type:code id: tags:
``` python
query.stop()
```
%% Cell type:code id: tags:
``` python
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, regexp_replace, length
# Définir les chemins
input_dir = "data" # Répertoire surveillé
checkpoint_dir = "checkpoint" # Répertoire de checkpoint
# Définir les mots à exclure
excluded_words = {"quelques", "toujours", "ceci", "cela", "mais", "donc", "or", "ni", "car"}
input_dir = "s3a://tp6/allData" # Répertoire de surveillance
lines = sql_context.readStream \
.format("text") \
.load(input_dir)
# Transformation : nettoyer les mots et filtrer
words = lines.select(
explode(
split(
regexp_replace(col("value"), r"[^\w\s]", ""), # Supprimer la ponctuation
" "
)
).alias("word")
)
# Filtrer les mots
filtered_words = words.filter(
(length(col("word")) > 7) & # Garde les mots avec plus de 7 lettres
(~col("word").isin(*excluded_words)) & # Exclut les mots dans excludedWords
(col("word") != "") # Exclut les mots vides
)
# Compter les occurrences des mots filtrés
word_counts = filtered_words.groupBy("word").count()
# Trier les mots par fréquence décroissante
sorted_word_counts = word_counts.orderBy(col("count").desc())
# Écrire le résultat sur la console avec les 20 mots les plus fréquents
query = sorted_word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("numRows", 20) \
.option("truncate", False) \
.start()
# Attendre la fin de l'exécution
query.awaitTermination()
```
%% Cell type:code id: tags:
``` python
query.stop()
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment