Skip to content
Snippets Groups Projects
Commit 8bedefda authored by REIG Julien's avatar REIG Julien
Browse files

Merge branch 'fixed' into 'master'

fixed error message on subscribe

See merge request !21
parents e8fc91aa 57720b65
No related branches found
No related tags found
1 merge request!21fixed error message on subscribe
...@@ -29,7 +29,6 @@ class ResourceResource(Resource): ...@@ -29,7 +29,6 @@ class ResourceResource(Resource):
if get_state() == Status.OFFLINE: if get_state() == Status.OFFLINE:
return response_offline, 403 return response_offline, 403
node_address = normalize_url(request.host_url) node_address = normalize_url(request.host_url)
node_address_intern = normalize_url(request.host_url, True)
update_state(Status.UPDATING, node_address) update_state(Status.UPDATING, node_address)
random_sleep(fail_percentage=0) random_sleep(fail_percentage=0)
...@@ -47,10 +46,10 @@ class ResourceResource(Resource): ...@@ -47,10 +46,10 @@ class ResourceResource(Resource):
update_state(Status.ONLINE, node_address, {"message": "Resource not found", "statusCode": 404}) update_state(Status.ONLINE, node_address, {"message": "Resource not found", "statusCode": 404})
return {"message": "Resource not found"}, 404 return {"message": "Resource not found"}, 404
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses)) request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
if len(request_addresses) > 0 and node_address_intern in request_addresses: if len(request_addresses) > 0 and node_address in request_addresses:
request_addresses.remove(node_address_intern) request_addresses.remove(node_address)
command = Command(request_command) command = Command(request_command)
...@@ -69,7 +68,7 @@ class ResourceResource(Resource): ...@@ -69,7 +68,7 @@ class ResourceResource(Resource):
random_sleep(fail_percentage=0) random_sleep(fail_percentage=0)
request_addresses: list[str] = request.get_json().get('addresses', []) request_addresses: list[str] = request.get_json().get('addresses', [])
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses)) request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
resource = get_resource(id) resource = get_resource(id)
if resource is None: if resource is None:
......
...@@ -31,7 +31,6 @@ class ResourcesResource(Resource): ...@@ -31,7 +31,6 @@ class ResourcesResource(Resource):
if get_state() == Status.OFFLINE: if get_state() == Status.OFFLINE:
return response_offline, 403 return response_offline, 403
node_address = normalize_url(request.host_url) node_address = normalize_url(request.host_url)
internal_node_address = normalize_url(request.host_url, True)
update_state(Status.UPDATING, node_address) update_state(Status.UPDATING, node_address)
request_command: Optional[str] = request.get_json().get('command') request_command: Optional[str] = request.get_json().get('command')
...@@ -40,7 +39,7 @@ class ResourcesResource(Resource): ...@@ -40,7 +39,7 @@ class ResourcesResource(Resource):
presences = list(map(lambda x: normalize_url(x), request_addresses)) presences = list(map(lambda x: normalize_url(x), request_addresses))
# Add the current node address to the list of presences # Add the current node address to the list of presences
presences.append(node_address) presences.append(node_address)
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses)) request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
random_sleep(fail_percentage=0) random_sleep(fail_percentage=0)
if request_command is None: if request_command is None:
...@@ -51,8 +50,8 @@ class ResourcesResource(Resource): ...@@ -51,8 +50,8 @@ class ResourcesResource(Resource):
update_state(Status.ONLINE, node_address, {"message": "name is missing", "statusCode": 400}) update_state(Status.ONLINE, node_address, {"message": "name is missing", "statusCode": 400})
return {"message": "name is missing"}, 400 return {"message": "name is missing"}, 400
if len(request_addresses) != 0 and internal_node_address in request_addresses: if len(request_addresses) != 0 and node_address in request_addresses:
request_addresses.remove(internal_node_address) request_addresses.remove(node_address)
resource = ResourceModel( resource = ResourceModel(
request_name, request_name,
......
...@@ -42,7 +42,7 @@ def normalize_url(url: str, inter: bool = False): ...@@ -42,7 +42,7 @@ def normalize_url(url: str, inter: bool = False):
return format return format
def send_data_in_thread(address: str, data: str, source_address: str, on_error_callback: callable = None): def send_data_in_thread(address: str, data: str, source_address: str, on_error_callback: callable = None, external_address: str = None):
try: try:
random_sleep() random_sleep()
result = requests.put( result = requests.put(
...@@ -50,64 +50,61 @@ def send_data_in_thread(address: str, data: str, source_address: str, on_error_c ...@@ -50,64 +50,61 @@ def send_data_in_thread(address: str, data: str, source_address: str, on_error_c
data=data, data=data,
headers={"Content-Type": "application/json"}) headers={"Content-Type": "application/json"})
if result.status_code != 201: if result.status_code != 201:
print('Address', address, 'Status code:', print('Address', external_address, 'Status code:',
result.status_code, file=sys.stderr) result.status_code, file=sys.stderr)
update_state(Status.UPDATING, source_address, {"message": "Status code not 201", "statusCode": result.status_code, "node_failed": normalize_url(address)}) update_state(Status.UPDATING, source_address, {"message": "Status code not 201", "statusCode": result.status_code, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
except RandomNetworkFailure as e: except RandomNetworkFailure as e:
print('Address (intentional network fail)', address, e, file=sys.stderr) print('Address (intentional network fail)', external_address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address}) update_state(Status.UPDATING, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print('[RequestException]Address', address, e, file=sys.stderr) print('[RequestException]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address}) update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
print('[HTTPError]Address', address, e, file=sys.stderr) print('[HTTPError]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address}) update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
print('[Timeout]Address', address, e, file=sys.stderr) print('[Timeout]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address}) update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
except Exception as e: except Exception as e:
print('[Exception]Address', address, e, file=sys.stderr) print('[Exception]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": 'Exception', "statusCode": 500, "node_failed": external_address}) update_state(Status.UPDATING, source_address, {"message": 'Exception', "statusCode": 500, "node_failed": external_address})
if on_error_callback: if on_error_callback:
on_error_callback() on_error_callback()
def send_resource_to_other_nodes(addresses: list[str], resource: Resource, source_address: str): def send_resource_to_other_nodes(external_addresses: list[str], resource: Resource, source_address: str):
'''Send resource to other nodes using threads''' '''Send resource to other nodes using threads'''
all_threads = [] all_threads = []
for address in addresses: for address in external_addresses:
internal_address = normalize_url(address, True)
data = json.dumps({ "resource": resource.to_full_dict(include_presence=True),}) data = json.dumps({ "resource": resource.to_full_dict(include_presence=True),})
on_error_callback = lambda: insert_resource_queue(address, resource) on_error_callback = lambda: insert_resource_queue(internal_address, resource)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource", data, source_address, on_error_callback)) th = threading.Thread(target=send_data_in_thread, args=(f"{internal_address}/api/duplicate/resource", data, source_address, on_error_callback, address))
all_threads.append(th) all_threads.append(th)
th.start() th.start()
for th in all_threads: for th in all_threads:
th.join() th.join()
def send_command_to_other_nodes(addresses: list[str], resource_id: str, command: Command, source_address: str): def send_command_to_other_nodes(external_addresses: list[str], resource_id: str, command: Command, source_address: str):
'''Send command to other nodes using threads''' '''Send command to other nodes using threads'''
all_threads = [] all_threads = []
for address in addresses: for address in external_addresses:
internal_address = normalize_url(address, True)
data = json.dumps({ "command": command.to_dict(),}) data = json.dumps({ "command": command.to_dict(),})
on_error_callback = lambda: insert_command_queue(address, resource_id, command) on_error_callback = lambda: insert_command_queue(internal_address, resource_id, command)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource/{resource_id}/command", data, source_address, on_error_callback)) th = threading.Thread(target=send_data_in_thread, args=(f"{internal_address}/api/duplicate/resource/{resource_id}/command", data, source_address, on_error_callback, address))
all_threads.append(th) all_threads.append(th)
th.start() th.start()
for th in all_threads: for th in all_threads:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment