Skip to content
Snippets Groups Projects
Commit d2a62294 authored by Quentin ANDRE's avatar Quentin ANDRE
Browse files

final

parent 1ab38663
No related branches found
No related tags found
No related merge requests found
# in node7
ssh pi@10.29.227.217
zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
kafka-server-start.sh /opt/kafka/config/server.properties
kafka-server-start.sh /opt/kafka/config/server1.properties
zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
......@@ -5,7 +5,7 @@ ssh pi@10.29.227.239
start-dfs.sh
$SPARK_HOME/sbin/start-all.sh
# launch spark_streaming in node 29
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 SparkStreamingRadio/spark_streaming_radio.py
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 SparkStreamingRadio/spark_streaming_radio.py
# stop
$SPARK_HOME/sbin/stop-all.sh
......
from __future__ import print_function
from re import L
from geopy.geocoders import Nominatim
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import split, from_json, col
geolocator = Nominatim(user_agent="quentin.andre@imt-atlantique.net")
def find_country(lat, long):
coord = f'{lat}, {long}'
location = geolocator.reverse(coord, exactly_one=True)
if location:
return location.raw['address'].get('country', '')
else:
return None
spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate()
sdfPlanes = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "pi-node07:9092, pi-node07:9093") \
......@@ -51,20 +41,44 @@ planesSchema = StructType([
def parse_data_from_kafka_message(sdf, schema):
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
'''col = split(sdf['value'], ',') #split attributes to nested array in one Column
sdf2 = sdf.withColumn("value", from_json("value", schema))
return sdf2.select(col("value.*"))
#now expand col to multiple top-level columns
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))'''
return sdf.withColumn("value", from_json("value", schema)).select([field.name for field in schema])
#query = parse_data_from_kafka_message(sdfPlanes, planesSchema)
df = parse_data_from_kafka_message(sdfPlanes, planesSchema)
'''df.write.csv("hdfs://pi-node30:9000/user/pi/planesRecords")'''
###################### treatment #########################
#query.writeStream \
sdfPlanes.writeStream \
geolocator = Nominatim(user_agent="quentin.andre@imt-atlantique.net")
def find_country(lat, long):
coord = f'{lat}, {long}'
location = geolocator.reverse(coord, exactly_one=True)
if location:
return location.raw['address'].get('country', '')
else:
return None
def planes_to_countries(row):
pass
'''query = df.select([col("value.time_position"), col("value.longitude"), col("value.latitude"), col("value.icao24")]) \
.map(planes_to_countries) \
.reduceByKeyAndWindow(lambda x,y: int(x)+int(y), lambda x,y: int(x)-int(y), 60, 15) \
.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "hdfs://pi-node30:9000/user/pi/checkp") \
.options(table="planes_area", keyspace="planes")\
.option("truncate", False) \
.start().awaitTermination()'''
query = df.select(["time_position", "longitude", "latitude", "icao24"]) \
.withColumn("country", planes_to_countries) \
.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "hdfs://pi-node30:9000/user/pi/checkp") \
.option("truncate", False) \
.start() \
.awaitTermination()
\ No newline at end of file
.start().awaitTermination()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment