Skip to content
Snippets Groups Projects
Unverified Commit 57720b65 authored by REIG Julien's avatar REIG Julien
Browse files

fixed error message on subscribe

parent a84cb677
No related branches found
No related tags found
1 merge request!21fixed error message on subscribe
......@@ -29,7 +29,6 @@ class ResourceResource(Resource):
if get_state() == Status.OFFLINE:
return response_offline, 403
node_address = normalize_url(request.host_url)
node_address_intern = normalize_url(request.host_url, True)
update_state(Status.UPDATING, node_address)
random_sleep(fail_percentage=0)
......@@ -47,10 +46,10 @@ class ResourceResource(Resource):
update_state(Status.ONLINE, node_address, {"message": "Resource not found", "statusCode": 404})
return {"message": "Resource not found"}, 404
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses))
request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
if len(request_addresses) > 0 and node_address_intern in request_addresses:
request_addresses.remove(node_address_intern)
if len(request_addresses) > 0 and node_address in request_addresses:
request_addresses.remove(node_address)
command = Command(request_command)
......@@ -69,7 +68,7 @@ class ResourceResource(Resource):
random_sleep(fail_percentage=0)
request_addresses: list[str] = request.get_json().get('addresses', [])
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses))
request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
resource = get_resource(id)
if resource is None:
......
......@@ -31,7 +31,6 @@ class ResourcesResource(Resource):
if get_state() == Status.OFFLINE:
return response_offline, 403
node_address = normalize_url(request.host_url)
internal_node_address = normalize_url(request.host_url, True)
update_state(Status.UPDATING, node_address)
request_command: Optional[str] = request.get_json().get('command')
......@@ -40,7 +39,7 @@ class ResourcesResource(Resource):
presences = list(map(lambda x: normalize_url(x), request_addresses))
# Add the current node address to the list of presences
presences.append(node_address)
request_addresses = list(map(lambda x: normalize_url(x, True), request_addresses))
request_addresses = list(map(lambda x: normalize_url(x), request_addresses))
random_sleep(fail_percentage=0)
if request_command is None:
......@@ -51,8 +50,8 @@ class ResourcesResource(Resource):
update_state(Status.ONLINE, node_address, {"message": "name is missing", "statusCode": 400})
return {"message": "name is missing"}, 400
if len(request_addresses) != 0 and internal_node_address in request_addresses:
request_addresses.remove(internal_node_address)
if len(request_addresses) != 0 and node_address in request_addresses:
request_addresses.remove(node_address)
resource = ResourceModel(
request_name,
......
......@@ -42,7 +42,7 @@ def normalize_url(url: str, inter: bool = False):
return format
def send_data_in_thread(address: str, data: str, source_address: str, on_error_callback: callable = None):
def send_data_in_thread(address: str, data: str, source_address: str, on_error_callback: callable = None, external_address: str = None):
try:
random_sleep()
result = requests.put(
......@@ -50,64 +50,61 @@ def send_data_in_thread(address: str, data: str, source_address: str, on_error_c
data=data,
headers={"Content-Type": "application/json"})
if result.status_code != 201:
print('Address', address, 'Status code:',
print('Address', external_address, 'Status code:',
result.status_code, file=sys.stderr)
update_state(Status.UPDATING, source_address, {"message": "Status code not 201", "statusCode": result.status_code, "node_failed": normalize_url(address)})
update_state(Status.UPDATING, source_address, {"message": "Status code not 201", "statusCode": result.status_code, "node_failed": external_address})
if on_error_callback:
on_error_callback()
except RandomNetworkFailure as e:
print('Address (intentional network fail)', address, e, file=sys.stderr)
external_address = normalize_url(address)
print('Address (intentional network fail)', external_address, e, file=sys.stderr)
update_state(Status.UPDATING, source_address, {"message": "Network failure (random intentional)", "statusCode": 418, "node_failed": external_address})
if on_error_callback:
on_error_callback()
except requests.exceptions.RequestException as e:
print('[RequestException]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback:
on_error_callback()
except requests.exceptions.HTTPError as e:
print('[HTTPError]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback:
on_error_callback()
except requests.exceptions.Timeout as e:
print('[Timeout]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": e, "statusCode": 500, "node_failed": external_address})
if on_error_callback:
on_error_callback()
except Exception as e:
print('[Exception]Address', address, e, file=sys.stderr)
external_address = normalize_url(address)
update_state(Status.UPDATING, source_address, {"message": 'Exception', "statusCode": 500, "node_failed": external_address})
if on_error_callback:
on_error_callback()
def send_resource_to_other_nodes(addresses: list[str], resource: Resource, source_address: str):
def send_resource_to_other_nodes(external_addresses: list[str], resource: Resource, source_address: str):
'''Send resource to other nodes using threads'''
all_threads = []
for address in addresses:
for address in external_addresses:
internal_address = normalize_url(address, True)
data = json.dumps({ "resource": resource.to_full_dict(include_presence=True),})
on_error_callback = lambda: insert_resource_queue(address, resource)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource", data, source_address, on_error_callback))
on_error_callback = lambda: insert_resource_queue(internal_address, resource)
th = threading.Thread(target=send_data_in_thread, args=(f"{internal_address}/api/duplicate/resource", data, source_address, on_error_callback, address))
all_threads.append(th)
th.start()
for th in all_threads:
th.join()
def send_command_to_other_nodes(addresses: list[str], resource_id: str, command: Command, source_address: str):
def send_command_to_other_nodes(external_addresses: list[str], resource_id: str, command: Command, source_address: str):
'''Send command to other nodes using threads'''
all_threads = []
for address in addresses:
for address in external_addresses:
internal_address = normalize_url(address, True)
data = json.dumps({ "command": command.to_dict(),})
on_error_callback = lambda: insert_command_queue(address, resource_id, command)
th = threading.Thread(target=send_data_in_thread, args=(f"{address}/api/duplicate/resource/{resource_id}/command", data, source_address, on_error_callback))
on_error_callback = lambda: insert_command_queue(internal_address, resource_id, command)
th = threading.Thread(target=send_data_in_thread, args=(f"{internal_address}/api/duplicate/resource/{resource_id}/command", data, source_address, on_error_callback, address))
all_threads.append(th)
th.start()
for th in all_threads:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment