Skip to content
Snippets Groups Projects
Commit 1ab38663 authored by Quentin ANDRE's avatar Quentin ANDRE
Browse files

12

parent ac5fc561
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ import json
from opensky_api import OpenSkyApi
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="pi-node07:9092")
producer = KafkaProducer(bootstrap_servers="pi-node07:9092,pi-node07:9093")
api = OpenSkyApi()
while 1:
......@@ -15,7 +15,7 @@ while 1:
plane[key] = getattr(state, key)
producer.send("air-traffic", json.dumps(plane).encode())
print("{} Produced {} air traffic records".format(time(), len(states.states)))
sleep(10.01)
sleep(12)
if __name__ == '__main__':
......
......@@ -5,7 +5,7 @@ ssh pi@10.29.227.239
start-dfs.sh
$SPARK_HOME/sbin/start-all.sh
# launch spark_streaming in node 29
spark-submit SparkStreamingRadio/spark_streaming_radio.py
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 SparkStreamingRadio/spark_streaming_radio.py
# stop
$SPARK_HOME/sbin/stop-all.sh
......
......@@ -18,7 +18,7 @@ def find_country(lat, long):
spark = SparkSession.builder.appName("Spark Structured Streaming from Kafka").getOrCreate()
sdfPlanes = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "pi-node07:9092") \
.option("kafka.bootstrap.servers", "pi-node07:9092, pi-node07:9093") \
.option("subscribe", "air-traffic") \
.option("startingOffsets", "latest") \
.load().selectExpr("CAST(value AS STRING)")
......@@ -59,5 +59,12 @@ def parse_data_from_kafka_message(sdf, schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))'''
return sdf.withColumn("value", from_json("value", schema)).select([field.name for field in schema])
sdfPlanes = parse_data_from_kafka_message(sdfPlanes, planesSchema)
sdfPlanes.show()
\ No newline at end of file
#query = parse_data_from_kafka_message(sdfPlanes, planesSchema)
#query.writeStream \
sdfPlanes.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start() \
.awaitTermination()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment