Skip to content
Snippets Groups Projects
Commit 7c86926e authored by Quentin ANDRE's avatar Quentin ANDRE
Browse files

spark

parent 9eb6eb1a
No related branches found
No related tags found
No related merge requests found
...@@ -3,7 +3,7 @@ from geopy.geocoders import Nominatim ...@@ -3,7 +3,7 @@ from geopy.geocoders import Nominatim
import sys import sys
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.types import * from pyspark.sql.types import *
from pyspark.sql.functions import split from pyspark.sql.functions import split, from_json, col
geolocator = Nominatim(user_agent="quentin.andre@imt-atlantique.net") geolocator = Nominatim(user_agent="quentin.andre@imt-atlantique.net")
...@@ -17,27 +17,47 @@ def find_country(lat, long): ...@@ -17,27 +17,47 @@ def find_country(lat, long):
return None return None
spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate() spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate()
planes = spark.readStream.format("kafka") \ sdfPlanes = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \ .option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "air-traffic") \ .option("subscribe", "air-traffic") \
.option("startingOffsets", "latest") \ .option("startingOffsets", "latest") \
.load().selectExpr("CAST(value AS JSON)") .load().selectExpr("CAST(value AS STRING)")
taxiFaresSchema = StructType([ planesSchema = StructType([
StructField("on_ground", BoolType()), StructField("on_ground", BooleanType()),
StructField("icao24", LongType()), StructField("icao24", LongType()),
StructField("sensors", LongType()), StructField("sensors", LongType()),
StructField("vertical_rate", TimestampType()), StructField("vertical_rate", FloatType()),
StructField("origin_country", StringType()), StructField("origin_country", StringType()),
StructField("squawk", LongType()),
StructField("geo_altitude", FloatType()),
StructField("baro_altitude", FloatType()),
StructField("velocity", FloatType()),
StructField("latitude", FloatType()),
StructField("spi", BooleanType()),
StructField("position_source", IntegerType()),
StructField("last_contact", LongType()),
StructField("time_position", LongType()),
StructField("heading", FloatType()),
StructField("time", LongType()),
StructField("longitude", FloatType()),
StructField("callsign", StringType()),
]) ])
'''{"on_ground": false, "icao24": "407182", "sensors": null, "vertical_rate": 0, "origin_country": "United Kingdom", "squawk": "7755", "geo_altitude": 8214.36, "baro_altitude": 8229.6, "velocity": 176.26, "latitude": 54.1107, "spi": false, "position_source": 0, "last_contact": 1646922076, "time_position": 1646922076, "heading": 158.6, "time": 1646922077, "longitude": -2.8725, "callsign": "EXS1LY "}''' '''{"on_ground": false, "icao24": "407182", "sensors": null, "vertical_rate": 0, "origin_country": "United Kingdom",
"squawk": "7755", "geo_altitude": 8214.36, "baro_altitude": 8229.6, "velocity": 176.26, "latitude": 54.1107, "spi": false,
"position_source": 0, "last_contact": 1646922076, "time_position": 1646922076, "heading": 158.6, "time": 1646922077,
"longitude": -2.8725, "callsign": "EXS1LY "}'''
def parse_data_from_kafka_message(sdf, schema): def parse_data_from_kafka_message(sdf, schema):
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data" assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
col = split(sdf['value'], ',') #split attributes to nested array in one Column
'''col = split(sdf['value'], ',') #split attributes to nested array in one Column
#now expand col to multiple top-level columns #now expand col to multiple top-level columns
for idx, field in enumerate(schema): for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType)) sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))'''
return sdf.select([field.name for field in schema]) return sdf.withColumn("value", from_json("value", schema)).select([field.name for field in schema])
sdfRides = parse_data_from_kafka_message(sdfRides, taxiRidesSchema)
\ No newline at end of file sdfPlanes = parse_data_from_kafka_message(sdfPlanes, planesSchema)
sdfPlanes.show()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment