Skip to content
Snippets Groups Projects
Commit 52514a77 authored by Quentin ANDRE's avatar Quentin ANDRE
Browse files

consumer and producer

parent a2768f1d
No related branches found
No related tags found
No related merge requests found
from time import sleep
from time import sleep, time
import json
from opensky_api import OpenSkyApi
......@@ -9,8 +9,12 @@ producer = KafkaProducer(bootstrap_servers="pi-node07:9092")
api = OpenSkyApi()
while 1:
states = api.get_states()
producer.send("air-traffic", json.dumps(states).encode())
print("{} Produced {} air traffic records".format(time.time(), len(states)))
for state in states.states:
plane = {'time': states.time}
for key in state.keys:
plane[key] = getattr(state, key)
producer.send("air-traffic", json.dumps(plane).encode())
print("{} Produced {} air traffic records".format(time(), len(states.states)))
sleep(10.01)
......
import json
from kafka import KafkaConsumer
planes = {}
consumer = KafkaConsumer("air-traffic", bootstrap_servers='pi-node07:9092', group_id="air-traffic-monitor")
for message in consumer:
print(message)
'''plane = json.loads(message.value.decode())
plane_number = plane["number"]
contract = plane["contract_name"]
available_bike_stands = plane["available_bike_stands"]
if contract not in planes:
planes[contract] = {}
city_planes = planes[contract]
if plane_number not in city_planes:
city_planes[plane_number] = available_bike_stands
count_diff = available_bike_stands - city_planes[plane_number]
if count_diff != 0:
city_planes[plane_number] = available_bike_stands
print("{}{} {} ({})".format("+" if count_diff > 0 else "",count_diff, plane["address"], contract))'''
\ No newline at end of file
from time import sleep, time
import json
from opensky_api import OpenSkyApi
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="pi-node07:9092")
api = OpenSkyApi()
while 1:
states = api.get_states()
for state in states.states:
plane = {'time': states.time}
for key in state.keys:
plane[key] = getattr(state, key)
producer.send("air-traffic", json.dumps(plane).encode())
print("{} Produced {} air traffic records".format(time(), len(states.states)))
sleep(10.01)
if __name__ == '__main__':
pass
from time import sleep
from opensky_api import OpenSkyApi
api = OpenSkyApi()
states = api.get_states()
sleep(12)
states2 = api.get_states()
print(len(states.states))
print(len(states2.states))
ids = set()
ids2 = set()
for s in states.states:
ids.add(s.icao24)
for s in states2.states:
ids2.add(s.icao24)
print(len(ids-ids2) + len(ids2-ids))
'''for s in states.states:
print("(%r, %r, %r, %r, %r)" % (s.longitude, s.latitude, s.baro_altitude, s.geo_altitude, s.velocity))
for s in states2.states:
print("(%r, %r, %r, %r, %r)" % (s.longitude, s.latitude, s.baro_altitude, s.geo_altitude, s.velocity))
'''
if __name__ == '__main__':
pass
......@@ -10,4 +10,7 @@ rm -rf /tmp/zookeeper/
ls /opt/kafka-data/
sudo rm -rf /opt/kafka-data/
sudo mkdir /opt/kafka-data/
sudo chown -R pi:pi /opt/kafka-data
\ No newline at end of file
sudo chown -R pi:pi /opt/kafka-data
# listen to a node
nc pi-node07 9092
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment