" File \"/opt/conda/lib/python3.12/socketserver.py\", line 761, in __init__\n",
" self.handle()\n",
" File \"/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py\", line 295, in handle\n",
" poll(accum_updates)\n",
" File \"/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py\", line 267, in poll\n",
" if self.rfile in r and func():\n",
" ^^^^^^\n",
" File \"/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py\", line 271, in accum_updates\n",
" num_updates = read_int(self.rfile)\n",
" ^^^^^^^^^^^^^^^^^^^^\n",
" File \"/opt/conda/lib/python3.12/site-packages/pyspark/serializers.py\", line 596, in read_int\n",
" raise EOFError\n",
"EOFError\n",
"----------------------------------------\n"
]
}
],
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext, SparkConf\n",
"conf = SparkConf() \\\n",
...
...
@@ -130,78 +56,6 @@
"sc = SparkContext(conf=conf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pagerank\n",
"\n",
"PageRank est un algorithme développé par Google pour mesurer l'importance relative de chaque page web en fonction de son nombre et de la qualité des liens entrants. Il attribue un score de popularité à chaque page, influençant son classement dans les résultats de recherche."
" ranks = contribs.reduceByKey(lambda x, y: x + y).mapValues(lambda total: a / N + (1 - a) * total)\n",
"\n",
"# Output final ranks\n",
"print(ranks.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
...
...
@@ -218,17 +72,9 @@
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Count of numbers from 1 to 1000 is: 999\n"
]
}
],
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create an RDD containing numbers from 1 to 1000\n",
"numbers_rdd = sc.parallelize(range(1, 1000))\n",
...
...
@@ -245,19 +91,22 @@
"### Calcul de moyenne"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Quelques liens utiles pour comprendre :\n",
"- la fonction de création d'un RDD `parallelize` : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html \n",
"- la transformation `map` : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html\n",
"- la transformation `reduceByKey` : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html\n",
"- la fonction `mapValues` : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapValues.html"
"print(\"Array(\", \", \".join([str(age) for age in ages]), \")\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pagerank\n",
"\n",
"PageRank est un algorithme développé par Google pour mesurer l'importance relative de chaque page web en fonction de son nombre et de la qualité des liens entrants. Il attribue un score de popularité à chaque page, influençant son classement dans les résultats de recherche.\n",
"\n",
"Quelques liens en plus pour comprendre :\n",
"- la transformation `join` sur les RDD : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.join.html\n",
"- la transformation `flatmap` sur les RDD : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap.html\n",
"\n",
"N'hésitez pas à essayer de décomposer les calculs pour comprendre ! Attention à bien faire une action de type `collect` pour voir les résultats intermédiaires."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext\n",
"from pyspark.sql import SparkSession\n",
"\n",
"sc = SparkContext.getOrCreate()\n",
"spark = SparkSession.builder.getOrCreate()\n",
"\n",
"# Parameters\n",
"ITERATIONS, a, N = 10, 0.15, 5\n",
"\n",
"# Inline example data: (Website, List[Linked Websites])\n",
" ranks = contribs.reduceByKey(lambda x, y: x + y).mapValues(lambda total: a / N + (1 - a) * total)\n",
"\n",
"# Output final ranks\n",
"print(ranks.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
...
...
@@ -306,31 +205,21 @@
"source": [
"### Exercice: Moby Dick\n",
"\n",
"Prérequis : télécharger le livre à l'[emplacement suivant](https://nyu-cds.github.io/python-bigdata/files/pg2701.txt), et placer le fichier `pg2701.txt` dans le dossier du TP5. Ceci est automatisé par la commande suivante :"
"Prérequis : le contenu du livre est présent dans le fichier `pg2701.txt` ([lien internet](https://nyu-cds.github.io/python-bigdata/files/pg2701.txt)) du TP4."
]
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" % Total % Received % Xferd Average Speed Time Time Time Current\n",
"client_minio.fput_object(\"tp4\", \"pg2701.txt\", \"pg2701.txt\") # copie du fichier local dans le bucket"
]
},
{
...
...
@@ -395,39 +273,16 @@
"metadata": {},
"source": [
"Minio reprend le principe du stockage cloud S3 : il permet de stocker des fichiers dans des \"buckets\". Les buckets dans MinIO sont des conteneurs de stockage pour organiser et gérer des objets (fichiers) de manière structurée, similaires aux dossiers dans un système de fichiers, mais optimisés pour le stockage objet.\n",
"Vérifiez que le fichier est bien présent dans le bucket `tp5` de Minio : [http://localhost:19001/browser/tp5/](http://localhost:19001/browser/tp5/). Vous pouvez utiliser l'utilisateur `root` et le mot de passe `password` pour vous connecter."
"Vérifiez que le fichier est bien présent dans le bucket `tp4` de Minio : [http://localhost:19001/browser/tp4/](http://localhost:19001/browser/tp4/). Vous pouvez utiliser l'utilisateur `root` et le mot de passe `password` pour vous connecter."
"['The Project Gutenberg EBook of Moby Dick; or The Whale, by Herman Melville', '', 'This eBook is for the use of anyone anywhere at no cost and with', 'almost no restrictions whatsoever. You may copy it, give it away or', 're-use it under the terms of the Project Gutenberg License included', 'with this eBook or online at www.gutenberg.org', '', '', 'Title: Moby Dick; or The Whale', '']\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"minio_file = \"s3a://tp5/pg2701.txt\"\n",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"minio_file = \"s3a://tp4/pg2701.txt\"\n",
"# adresse du fichier dans le bucket minio\n",
"text = sc.textFile(minio_file) \n",
"print(text.take(10))"
...
...
@@ -439,26 +294,85 @@
"source": [
"### Exercice 1\n",
"\n",
"Compter le nombre de mots total du livre.\n",
"Compter le nombre d'occurrences par mot, trier par nombre décroissant (prendre les 10 premiers).\n",
"Compter le nombre de mots par phrase.\n",
"\n",
"1. Compter le nombre de mots total du livre.\n",
"2. Compter le nombre d'occurrences par mot, trier par nombre décroissant (prendre les 10 premiers).\n",
"3. Compter le nombre de mots moyen par phrase."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Télécharger un autre livre (en trouver un sur https://www.gutenberg.org/browse/scores/top par exemple, télécharger au format \"Plain Text UTF-8\"), et lancer les jobs dessus."
"Charger le fichier `countries.json` à l'adresse suivante : http://api.worldbank.org/v2/countries?per_page=304&format=json, et chargez le dans Minio comme fait précédemment. \n",
"Charger le fichier `countries.json` ((adresse)[http://api.worldbank.org/v2/countries?per_page=304&format=json]), et chargez le dans Minio comme fait précédemment. \n",
"\n",
"Calculez ensuite le nombre de pays par niveau de revenu à l'aide d'un job Pyspark.\n",
"\n",
"Vous pouvez vous aider des lignes suivantes en premier lieu (afin de se concentrer directement sur le tableau des pays, contenu dans le deuxième élément du tableau racine) :\n",
"```python\n",
"rdd = sc.textFile(f\"s3a://tp5/countries.json\") # chargement du fichier countries.json\n",
"rdd = sc.textFile(f\"s3a://tp4/countries.json\") # chargement du fichier countries.json\n",
"mapped_rdd = rdd.map(lambda f: json.loads(f)) # chargement du fichier json dans un dictionnaire\n",
"country_rdd = mapped_rdd.flatMap(lambda x: x[1]) # on récupère le tableau des pays\n",
"Réalisez trois requêtes non triviales sur un fichier JSON de votre projet du TP4.\n"
"Réalisez trois requêtes non triviales sur les fichiers json des [pokemon](https://github.com/fanzeyi/pokemon.json).\n"
]
}
],
...
...
@@ -492,7 +433,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.8"
"version": "3.12.9"
}
},
"nbformat": 4,
...
...
%% Cell type:markdown id: tags:
# TP Python
PySpark est une interface pour Apache Spark en Python, permettant de traiter de grandes quantités de données en parallèle sur des clusters, en combinant la puissance de calcul de Spark avec la simplicité de Python.
%% Cell type:markdown id: tags:
## Déploiement
Le déploiement se fait comme pour les TP précédents, à l'aide de Docker Compose. N'oubliez pas de lancer Docker Desktop en premier lieu !
```bash
docker compose -f docker-compose-jupyter.yml up
docker compose up --build
```
Sont déployés :
- un master Spark
- un worker Spark
- une installation de [Minio](https://min.io/) servant au stockage des données accédée par Spark
- une installation de Jupyter comprenant les bibliothèques nécessaires pour le TP
L'UI de Spark est disponible à l'adresse suivante : http://localhost:8080.
Nous utilisons la solution de stockage objet Minio, accessible à l'adresse suivante : http://localhost:19001.
%% Cell type:markdown id: tags:
### Problèmes possibles
Quelques messages d'erreurs que vous pouvez rencontrer, et comment les gérer :
- Message d'erreur "Cannot run multiple SparkContexts at once" : vous ne pouvez initialiser la connexion avec Spark qu'une fois par notebook, la solution est simplement de faire reset du notebook (bouton restart en haut du notebook).
- Plus d'exécuteur disponible dans Spark : un job est problablement déjà en cours. Coupez le sur l'[interface de Spark](http://127.0.0.1:8080)(ou coupez l'ensemble de l'installation avec `docker compose down`), puis faites un reset du notebook (bouton restart en haut du notebook).
- L'option restart est grisée : fermez l'onglet du notebook et ouvrez-le à nouveau
%% Cell type:code id: tags:
``` python
frompysparkimportSparkContext,SparkConf
conf=SparkConf() \
.setAppName('SparkApp') \
.setMaster('spark://spark:7077') \
.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4")# utilisé pour le stockage
File "/opt/conda/lib/python3.12/socketserver.py", line 761, in __init__
self.handle()
File "/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
poll(accum_updates)
File "/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py", line 267, in poll
if self.rfile in r and func():
^^^^^^
File "/opt/conda/lib/python3.12/site-packages/pyspark/accumulators.py", line 271, in accum_updates
num_updates = read_int(self.rfile)
^^^^^^^^^^^^^^^^^^^^
File "/opt/conda/lib/python3.12/site-packages/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
----------------------------------------
%% Cell type:markdown id: tags:
## Pagerank
PageRank est un algorithme développé par Google pour mesurer l'importance relative de chaque page web en fonction de son nombre et de la qualité des liens entrants. Il attribue un score de popularité à chaque page, influençant son classement dans les résultats de recherche.
%% Cell type:code id: tags:
``` python
frompysparkimportSparkContext
frompyspark.sqlimportSparkSession
sc=SparkContext.getOrCreate()
spark=SparkSession.builder.getOrCreate()
# Parameters
ITERATIONS,a,N=10,0.15,5
# Inline example data: (Website, List[Linked Websites])
PageRank est un algorithme développé par Google pour mesurer l'importance relative de chaque page web en fonction de son nombre et de la qualité des liens entrants. Il attribue un score de popularité à chaque page, influençant son classement dans les résultats de recherche.
Quelques liens en plus pour comprendre :
- la transformation `join` sur les RDD : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.join.html
- la transformation `flatmap` sur les RDD : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap.html
N'hésitez pas à essayer de décomposer les calculs pour comprendre ! Attention à bien faire une action de type `collect` pour voir les résultats intermédiaires.
%% Cell type:code id: tags:
``` python
frompysparkimportSparkContext
frompyspark.sqlimportSparkSession
sc=SparkContext.getOrCreate()
spark=SparkSession.builder.getOrCreate()
# Parameters
ITERATIONS,a,N=10,0.15,5
# Inline example data: (Website, List[Linked Websites])
Nous allons utiliser le stockage objet Minio pour héberger les données de notre installation Spark.
MinIO est une solution de stockage objet haute performance, compatible avec l'API S3 d'AWS, permettant de gérer des données non structurées à grande échelle. Il est conçu pour des environnements cloud, hybrides ou sur site, offrant une infrastructure de stockage distribuée et évolutive.
La copie de fichier peut se faire par la bibliothèque Minio Python, ou alors par le biais de l'UI Web, accessible sur http://localhost:19001. Les identifiants sont "root" et "password" (on peut les retrouver dans le fichier [docker-compose.yml](docker-compose.yml)).
Le principe général de MinIO (comme pour les autres systèmes de stockage objet tels qu'AWS S3) est d'organiser les données en buckets, qui sont des conteneurs virtuels pour le stockage de fichiers ou d’objets. Chaque bucket est unique dans le système et peut contenir un nombre illimité d'objets, identifiés par des clés uniques.
%% Cell type:markdown id: tags:
### Exercice: Moby Dick
Prérequis : télécharger le livre à l'[emplacement suivant](https://nyu-cds.github.io/python-bigdata/files/pg2701.txt), et placer le fichier `pg2701.txt` dans le dossier du TP5. Ceci est automatisé par la commande suivante :
Prérequis : le contenu du livre est présent dans le fichier `pg2701.txt` ([lien internet](https://nyu-cds.github.io/python-bigdata/files/pg2701.txt)) du TP4.
client_minio.fput_object("tp5","pg2701.txt","pg2701.txt")# copie du fichier local dans le bucket
ifclient_minio.bucket_exists("tp4")==False:
client_minio.make_bucket("tp4")
client_minio.fput_object("tp4","pg2701.txt","pg2701.txt")# copie du fichier local dans le bucket
```
%% Output
<minio.helpers.ObjectWriteResult at 0x781478258fb0>
%% Cell type:markdown id: tags:
Minio reprend le principe du stockage cloud S3 : il permet de stocker des fichiers dans des "buckets". Les buckets dans MinIO sont des conteneurs de stockage pour organiser et gérer des objets (fichiers) de manière structurée, similaires aux dossiers dans un système de fichiers, mais optimisés pour le stockage objet.
Vérifiez que le fichier est bien présent dans le bucket `tp5` de Minio : [http://localhost:19001/browser/tp5/](http://localhost:19001/browser/tp5/). Vous pouvez utiliser l'utilisateur `root` et le mot de passe `password` pour vous connecter.
Vérifiez que le fichier est bien présent dans le bucket `tp4` de Minio : [http://localhost:19001/browser/tp4/](http://localhost:19001/browser/tp4/). Vous pouvez utiliser l'utilisateur `root` et le mot de passe `password` pour vous connecter.
['The Project Gutenberg EBook of Moby Dick; or The Whale, by Herman Melville', '', 'This eBook is for the use of anyone anywhere at no cost and with', 'almost no restrictions whatsoever. You may copy it, give it away or', 're-use it under the terms of the Project Gutenberg License included', 'with this eBook or online at www.gutenberg.org', '', '', 'Title: Moby Dick; or The Whale', '']
1. Compter le nombre de mots total du livre.
2. Compter le nombre d'occurrences par mot, trier par nombre décroissant (prendre les 10 premiers).
3. Compter le nombre de mots moyen par phrase.
%% Cell type:code id: tags:
%% Cell type:markdown id: tags:
``` python
# 1
```
### Exercice 1
%% Cell type:code id: tags:
Compter le nombre de mots total du livre.
Compter le nombre d'occurrences par mot, trier par nombre décroissant (prendre les 10 premiers).
Compter le nombre de mots par phrase.
``` python
# 2
```
%% Cell type:code id: tags:
``` python
# 3
```
%% Cell type:markdown id: tags:
Télécharger un autre livre (en trouver un sur https://www.gutenberg.org/browse/scores/top par exemple, télécharger au format "Plain Text UTF-8"), et lancer les jobs dessus.
Charger le fichier `countries.json`à l'adresse suivante : http://api.worldbank.org/v2/countries?per_page=304&format=json, et chargez le dans Minio comme fait précédemment.
Charger le fichier `countries.json`((adresse)[http://api.worldbank.org/v2/countries?per_page=304&format=json]), et chargez le dans Minio comme fait précédemment.
Calculez ensuite le nombre de pays par niveau de revenu à l'aide d'un job Pyspark.
Vous pouvez vous aider des lignes suivantes en premier lieu (afin de se concentrer directement sur le tableau des pays, contenu dans le deuxième élément du tableau racine) :
```python
rdd=sc.textFile(f"s3a://tp5/countries.json")# chargement du fichier countries.json
rdd=sc.textFile(f"s3a://tp4/countries.json")# chargement du fichier countries.json
mapped_rdd=rdd.map(lambdaf:json.loads(f))# chargement du fichier json dans un dictionnaire
country_rdd=mapped_rdd.flatMap(lambdax:x[1])# on récupère le tableau des pays