Skip to content
Snippets Groups Projects
Commit c2294074 authored by REIG Julien's avatar REIG Julien
Browse files

Merge branch 'threading' into 'master'

fix ping and parallelize requests

See merge request !18
parents 3faaa52a bf5cd436
Branches
No related tags found
1 merge request!18fix ping and parallelize requests
......@@ -16,13 +16,17 @@ class PingController(Resource):
if get_state() == Status.OFFLINE:
return response_offline, 403
node_addresses = request.headers.getlist("addresses")
if (node_addresses is None or len(node_addresses) == 0):
return {"message": "No addresses provided."}, 400
node_addresses = request.headers.get("addresses", None)
print('node_addresses', node_addresses)
if (node_addresses is None):
return jsonify({ "reachable": True, "results": { normalize_url(request.host_url): True } })
node_addresses = node_addresses.split(",")
node_addresses = list(map(normalize_url, node_addresses))
results = ping(node_addresses)
if (normalize_url(request.host_url) in node_addresses):
node_addresses.remove(normalize_url(request.host_url))
results = ping(node_addresses, normalize_url(request.host_url))
results[normalize_url(request.host_url)] = True
return jsonify({ "reachable": True, "results": results })
......
import json
import sys
import threading
from urllib.parse import urlparse
import requests
......@@ -9,6 +10,7 @@ from models.command import Command
from models.resource import Resource
from utils.database_utils import (insert_command_queue, insert_resource_queue,
update_state)
from utils.thread_value import Thread_value
from utils.time_utils import RandomNetworkFailure, random_sleep
nodes = {
......@@ -40,89 +42,99 @@ def normalize_url(url: str, inter: bool = False):
return format
def send_resource_to_other_nodes(addresses: list[str], resource: Resource, source_address: str):
for address in addresses:
def send_data_in_thread(address: str, data: str, source_address: str, on_error_callback: callable = None):
try:
random_sleep()
result = requests.put(
f"{address}/api/duplicate/resource",
data=json.dumps({
"resource": resource.to_full_dict(include_presence=True),
}),
address,
data=data,
headers={"Content-Type": "application/json"})
if result.status_code != 201:
print('Address', address, 'Status code:',
result.status_code, file=sys.stderr)
insert_resource_queue(address, resource)
update_state(Status.UPDATING, source_address, {"message": "Status code not 201", "statusCode": result.status_code, "node_failed": address})
if on_error_callback:
on_error_callback()
except RandomNetworkFailure as e:
print('Address (intentional network fail)', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address})
insert_resource_queue(address, resource)
if on_error_callback:
on_error_callback()
except requests.exceptions.RequestException as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
insert_resource_queue(address, resource)
if on_error_callback:
on_error_callback()
except Exception as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
insert_resource_queue(address, resource)
if on_error_callback:
on_error_callback()
def send_resource_to_other_nodes(addresses: list[str], resource: Resource, source_address: str):
'''Send resource to other nodes using threads'''
all_threads = []
for address in addresses:
data = json.dumps({ "resource": resource.to_full_dict(include_presence=True),})
on_error_callback = lambda: insert_resource_queue(address, resource)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource", data, source_address, on_error_callback))
all_threads.append(th)
th.start()
for th in all_threads:
th.join()
def send_command_to_other_nodes(addresses: list[str], resource_id: str, command: Command, source_address: str):
'''Send command to other nodes using threads'''
all_threads = []
for address in addresses:
data = json.dumps({ "command": command.to_dict(),})
on_error_callback = lambda: insert_command_queue(address, resource_id, command)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource/{resource_id}/command", data, source_address, on_error_callback))
all_threads.append(th)
th.start()
for th in all_threads:
th.join()
def ping_in_thread(address: str, source_address: str):
try:
random_sleep()
result = requests.put(
f"{address}/api/duplicate/resource/{resource_id}/command",
data=json.dumps({
"command": command.to_dict(),
}),
headers={"Content-Type": "application/json"})
if result.status_code != 201:
print('Address', address, 'Status code:',
result.status_code, file=sys.stderr)
insert_command_queue(address, resource_id, command)
addr = f"{normalize_url(address, True)}/api/ping"
result = requests.get(addr)
if result.status_code != 200:
print('Address', addr, 'Status code:',
result.status_code, result.reason, file=sys.stderr)
update_state(Status.ONLINE, source_address, {"message": "Status code not 200", "statusCode": result.status_code, "node_failed": address})
return address, result.status_code == 200
except RandomNetworkFailure as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address})
insert_command_queue(address, resource_id, command)
print('RandomNetworkFailure: Address', address, e, file=sys.stderr)
update_state(Status.ONLINE, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": address})
return address, False
except requests.exceptions.RequestException as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
insert_command_queue(address, resource_id, command)
print('RequestException: Address', addr, e, file=sys.stderr)
update_state(Status.ONLINE, source_address, {"message": e, "statusCode": 500, "node_failed": address})
return address, False
except Exception as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
insert_command_queue(address, resource_id, command)
print('Exception: Address', addr, e, file=sys.stderr)
update_state(Status.ONLINE, source_address, {"message": e, "statusCode": 500, "node_failed": address})
return address, False
def ping(addresses: list[str], source_address: str):
all_threads = []
results = {}
print('pinging', addresses, flush=True)
for address in addresses:
try:
random_sleep()
result = requests.get(f"{address}/api/ping")
if result.status_code != 200:
print('Address', address, 'Status code:',
result.status_code, file=sys.stderr)
results[address] = result.status_code == 200
except RandomNetworkFailure as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.ONLINE, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address})
results[address] = False
except requests.exceptions.RequestException as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.ONLINE, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
results[address] = False
except Exception as e:
print('Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.ONLINE, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
results[address] = False
print('ping', address, flush=True)
th = Thread_value(target=ping_in_thread, args=(address, source_address))
all_threads.append(th)
th.start()
for th in all_threads:
address, value = th.join()
results[address] = value
return results
\ No newline at end of file
from threading import Thread
class Thread_value(Thread):
def __init__(self, target, args):
super().__init__(target=target, args=args)
self._return = None
def run(self):
self._return = self._target(*self._args)
def join(self):
super().join()
return self._return
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment