Skip to content
Snippets Groups Projects
Unverified Commit fcbc9e7c authored by REIG Julien's avatar REIG Julien
Browse files

fetch queue every minutes

parent fe37378e
Branches
No related tags found
1 merge request!16Node knowledge
......@@ -4,8 +4,12 @@ from flask import Blueprint, request
from flask_restful import Api, Resource
from consts.status import Status
from utils.database_utils import clear_command_queue, clear_resource_queue, get_command_queue, get_resource_queue, get_state, insert_command, insert_resource, update_state
from utils.database_utils import (clear_command_queue, clear_resource_queue,
get_command_queue, get_resource_queue,
get_state, insert_command, insert_resource,
update_state)
from utils.http_utils import normalize_url
from utils.queue_utils import fetch_insert_queues
state_controller = Blueprint('state_controller', __name__)
api = Api(state_controller)
......@@ -30,17 +34,7 @@ class StartNode(Resource):
node_address_intern = normalize_url(request.host_url, True)
update_state(Status.STARTING, node_address)
resources = get_resource_queue(node_address_intern)
for resource in resources:
insert_resource(resource)
clear_resource_queue(node_address_intern)
commands = get_command_queue(node_address_intern)
for command in commands:
resource_id = command['resource_id']
cm = command['command']
insert_command(resource_id, cm)
clear_command_queue(node_address_intern)
fetch_insert_queues(node_address_intern)
time.sleep(5)
update_state(Status.ONLINE, node_address)
......
import os
import threading
import flask
from flask import Flask
from flask_cors import CORS
from consts.status import Status
from controllers.duplicate_controller import duplicate_controller
from controllers.node_controller import node_controller
from controllers.ping_controller import ping_controller
from controllers.resource_controller import resource_controller
from controllers.resources_controller import resources_controller
from controllers.state_controller import state_controller
from utils.database_utils import get_state
from utils.init_database import init_database, init_queue_db
from utils.MessageAnnouncer import announcer
from utils.queue_utils import fetch_insert_queues
app = Flask(__name__)
CORS(app)
......@@ -34,8 +40,22 @@ def subscribe():
return flask.Response(gen(), mimetype='text/event-stream')
def fetch_queue():
thread = threading.Timer(60.0, fetch_queue)
thread.daemon = True
if get_state() == Status.ONLINE:
print("Fetching queues", flush=True)
url =f'http://{os.environ.get("NODE_FILE")}:3000/'
fetch_insert_queues(url)
thread.start()
if __name__ == '__main__':
init_database()
init_queue_db()
if __name__ == '__main__':
if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
init_database()
init_queue_db()
fetch_queue()
app.run(host="0.0.0.0", port=3000, debug=True, use_reloader=True)
......@@ -207,13 +207,18 @@ def get_presence(resource_id: str) -> Optional[list[str]]:
print(e, file=sys.stderr)
return None
def get_command_queue(node_address: str) -> Optional[list]:
def get_command_queue(node_address: str, clear: bool = False) -> Optional[list]:
with sqlite3.connect(db_queue_file) as db_conn:
try:
cursor = db_conn.cursor()
cursor.execute(
"SELECT resource_id, command_object FROM command_queue WHERE node_address = ? ORDER BY id DESC", (node_address,))
commands = cursor.fetchall()
if clear:
cursor.execute(
"DELETE FROM command_queue WHERE node_address = ?", (node_address,))
db_conn.commit()
cursor.close()
return [{
......@@ -225,13 +230,18 @@ def get_command_queue(node_address: str) -> Optional[list]:
return None
def get_resource_queue(node_address: str) -> Optional[list[Resource]]:
def get_resource_queue(node_address: str, clear: bool = False) -> Optional[list[Resource]]:
with sqlite3.connect(db_queue_file) as db_conn:
try:
cursor = db_conn.cursor()
cursor.execute(
"SELECT resource_object FROM resource_queue WHERE node_address = ? ORDER BY id DESC", (node_address,))
resources_json: str = cursor.fetchall()
if clear:
cursor.execute(
"DELETE FROM resource_queue WHERE node_address = ?", (node_address,))
db_conn.commit()
cursor.close()
resources: list[dict] = [eval(resource[0])
......
......@@ -14,7 +14,7 @@ node4_address = 'http://localhost:3003/'
status = {
"node1": {"state": Status.ONLINE.value},
"node2": {"state": Status.ONLINE.value},
"node3": {"state": Status.ONLINE.value},
"node3": {"state": Status.OFFLINE.value},
"node4": {"state": Status.ONLINE.value},
}
......
from utils.database_utils import (clear_command_queue, clear_resource_queue,
get_command_queue, get_resource_queue,
insert_command, insert_resource)
def fetch_insert_queues(intern_node_address: str):
"""
Fetches the resources and commands from the queue and inserts them into the database
"""
fetch_insert_resource_queue(intern_node_address)
fetch_insert_command_queue(intern_node_address)
def fetch_insert_resource_queue(intern_node_address: str):
"""
Fetches the resources from the queue and inserts them into the database
"""
resources = get_resource_queue(intern_node_address, True)
for resource in resources:
insert_resource(resource)
def fetch_insert_command_queue(intern_node_address: str):
"""
Fetches the commands from the queue and inserts them into the database
"""
commands = get_command_queue(intern_node_address, True)
for command in commands:
resource_id = command['resource_id']
cm = command['command']
insert_command(resource_id, cm)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment