Skip to content
Snippets Groups Projects
Unverified Commit 667336f2 authored by REIG Julien's avatar REIG Julien
Browse files

fix resource queue

parent ea554048
No related branches found
No related tags found
1 merge request!16Node knowledge
......@@ -21,7 +21,8 @@ def update_state(state: Status, node_address: str = None, extra_data: dict = Non
db_conn.commit()
cursor.close()
if node_address is not None:
send_status_change(node_address=node_address, status=state, extra_data=extra_data)
send_status_change(node_address=node_address,
status=state, extra_data=extra_data)
return True
except Exception as e:
print(e, file=sys.stderr)
......@@ -106,6 +107,7 @@ def get_resource(id: str, with_commands: bool = False, with_presence: bool = Fal
print(e, file=sys.stderr)
return None
def resource_id_is_deleted(resource_id: int) -> bool:
with sqlite3.connect(db_file) as db_conn:
try:
......@@ -119,6 +121,7 @@ def resource_id_is_deleted(resource_id: int) -> bool:
print(e, file=sys.stderr)
return True
def resource_is_deleted(resource: Resource) -> bool:
return resource_id_is_deleted(resource.id)
......@@ -168,6 +171,7 @@ def get_commands_count(resource_id: str) -> int:
print(e, file=sys.stderr)
return -1
def insert_presence(resource_id: str, node_address: str) -> bool:
with sqlite3.connect(db_file) as db_conn:
try:
......@@ -181,6 +185,7 @@ def insert_presence(resource_id: str, node_address: str) -> bool:
print(e, file=sys.stderr)
return False
def insert_presence_bulk(resource: Resource) -> bool:
with sqlite3.connect(db_file) as db_conn:
try:
......@@ -194,6 +199,7 @@ def insert_presence_bulk(resource: Resource) -> bool:
print(e, file=sys.stderr)
return False
def get_presence(resource_id: str) -> Optional[list[str]]:
with sqlite3.connect(db_file) as db_conn:
try:
......@@ -207,6 +213,7 @@ def get_presence(resource_id: str) -> Optional[list[str]]:
print(e, file=sys.stderr)
return None
def get_command_queue(node_address: str, clear: bool = False) -> Optional[list]:
with sqlite3.connect(db_queue_file) as db_conn:
try:
......@@ -223,7 +230,7 @@ def get_command_queue(node_address: str, clear: bool = False) -> Optional[list]:
return [{
"resource_id": command[0],
"command": Command.from_full_dict(eval(command[1]))
"command": Command.from_full_dict(json.load(command[1]))
} for command in commands]
except Exception as e:
print(e, file=sys.stderr)
......@@ -237,16 +244,16 @@ def get_resource_queue(node_address: str, clear: bool = False) -> Optional[list[
cursor.execute(
"SELECT resource_object FROM resource_queue WHERE node_address = ? ORDER BY id DESC", (node_address,))
resources_json: str = cursor.fetchall()
if clear:
cursor.execute(
"DELETE FROM resource_queue WHERE node_address = ?", (node_address,))
db_conn.commit()
cursor.close()
resources_str = [resource[0] for resource in resources_json]
resources_dict: list[dict] = [json.loads(
resource) for resource in resources_str]
resources: list[dict] = [eval(resource[0])
for resource in resources_json]
return [Resource.from_full_dict(resource) for resource in resources]
return [Resource.from_full_dict(resource) for resource in resources_dict]
except Exception as e:
print(e, file=sys.stderr)
return None
......
from utils.database_utils import (clear_command_queue, clear_resource_queue,
get_command_queue, get_resource_queue,
insert_command, insert_resource)
insert_command, insert_presence_bulk, insert_resource)
def fetch_insert_queues(intern_node_address: str):
......@@ -10,6 +10,7 @@ def fetch_insert_queues(intern_node_address: str):
fetch_insert_resource_queue(intern_node_address)
fetch_insert_command_queue(intern_node_address)
def fetch_insert_resource_queue(intern_node_address: str):
"""
Fetches the resources from the queue and inserts them into the database
......@@ -19,12 +20,16 @@ def fetch_insert_resource_queue(intern_node_address: str):
return
for resource in resources:
insert_resource(resource)
insert_presence_bulk(resource)
insert_command(resource.id, resource.commands[0] if len(
resource.commands) > 0 else None)
def fetch_insert_command_queue(intern_node_address: str):
"""
Fetches the commands from the queue and inserts them into the database
"""
commands = get_command_queue(intern_node_address, True)
commands = get_command_queue(intern_node_address, False)
if (commands is None):
return
for command in commands:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment