Select Git revision
outgoing_ping_processor.go
agent.c 19.15 KiB
/* db - xAAL agent
* Part of the 'majordom' software
* (c) 2019 Christophe Lohr <christophe.lohr@imt-atlantique.fr>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <uuid/uuid.h>
#include <regex.h>
#include <stdbool.h>
#include <unistd.h>
#include <sys/timerfd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sphinxbase/err.h>
#include "agent.h"
#define ALIVE_PERIOD 120
#define CHECKING_PERIOD 10
#define GRACE_DELAY 120
/*
* Section Keys-Values of a Device
*/
/* insert a value in the list */
void insert_value(vals_t *vals, const char *value) {
val_t *np;
TAILQ_FOREACH(np, vals, entries)
if ( (strcmp(value, np->value) == 0) )
return;
np = (val_t *)malloc(sizeof(val_t));
np->value = strdup(value);
TAILQ_INSERT_TAIL(vals, np, entries);
}
/* delete a value in the list */
void delete_value(vals_t *vals, const char *value) {
val_t *np;
TAILQ_FOREACH(np, vals, entries)
if ( (strcmp(value, np->value) == 0) ) {
TAILQ_REMOVE(vals, np, entries);
free(np->value);
free(np);
return;
}
}
/* delete the list of values */
void delete_vals(vals_t *vals) {
val_t *np;
TAILQ_FOREACH(np, vals, entries) {
TAILQ_REMOVE(vals, np, entries);
free(np->value);
free(np);
}
}
void print_vals(vals_t *vals) {
val_t *np;
TAILQ_FOREACH(np, vals, entries)
printf("%s ", np->value);
printf("\n");
}
/* insert a kv in the list */
void insert_kv(kvs_t *kvs, const char *key, const char *value) {
kv_t *np;
TAILQ_FOREACH(np, kvs, entries)
if (strcmp(key, np->key) == 0) {
free(np->value);
np->value = strdup(value);
return;
}
np = (kv_t *)malloc(sizeof(kv_t));
np->key = strdup(key);
np->value = strdup(value);
TAILQ_INSERT_TAIL(kvs, np, entries);
}
/* check if a value is in the list of keys-values */
bool check_value(kvs_t *kvs, const char *value) {
kv_t *np;
TAILQ_FOREACH(np, kvs, entries)
if ( (strcmp(value, np->value) == 0) )
return true;
return false;
}
/* delete a kv in the list */
void delete_kv(kvs_t *kvs, const char *key) {
kv_t *np;
TAILQ_FOREACH(np, kvs, entries)
if (strcmp(key, np->key) == 0) {
TAILQ_REMOVE(kvs, np, entries);
free(np->key);
free(np->value);
free(np);
return;
}
}
/* delete the list of kvs */
void delete_kvs(kvs_t *kvs) {
kv_t *np;
TAILQ_FOREACH(np, kvs, entries) {
TAILQ_REMOVE(kvs, np, entries);
free(np->key);
free(np->value);
free(np);
}
}
void print_kvs(kvs_t *kvs) {
kv_t *np;
TAILQ_FOREACH(np, kvs, entries)
printf("%s:%s ", np->key, np->value);
printf("\n");
}
/* check if all values of subset are presents into a kv overset */
bool matching_values(kvs_t *overset, vals_t *subset) {
val_t *np;
TAILQ_FOREACH(np, subset, entries)
if ( !check_value(overset, np->value) )
return false;
return true;
}
/* serialize kvs of a device */
cbor_item_t *get_kvs(device_t *device) {
cbor_item_t *cmap = cbor_new_indefinite_map();
kv_t *np = NULL;
TAILQ_FOREACH(np, &(device->kvs), entries)
cbor_map_add(cmap, (struct cbor_pair){ cbor_move(cbor_build_string(np->key)), cbor_move(cbor_build_string(np->key)) });
return cmap;
}
/* get kvs of a device by addr */
cbor_item_t *get_kvs_by_addr(devices_t *devices, const uuid_t *addr) {
device_t *device = select_device(devices, addr);
if (device)
return get_kvs(device);
return NULL;
}
/* Check if a key belong to the lists of wanted keys */
bool goodkey(const char **keylist, const char *key) {
const char **iter;
for (iter = keylist; *iter; iter++) {
if (strcmp(*iter, key) == 0)
return true;
};
return false;
}
/* update the list of kvs (merge) with values of wanted keys */
void update_kvs(kvs_t *kvs, const char **keylist, cbor_item_t *cmap) {
size_t i, len, sz = cbor_map_size(cmap);
struct cbor_pair *pairs = cbor_map_handle(cmap);
char *key, *value;
for (i=0; i<sz; i++) {
key = xAAL_cbor_string_dup(pairs[i].key, &len);
if (goodkey(keylist, key)) {
if ( cbor_is_null(pairs[i].value) )
delete_kv(kvs, key);
else if ( cbor_isa_string(pairs[i].value) ) {
value = xAAL_cbor_string_dup(pairs[i].value, &len);
insert_kv(kvs, key, value);
free(value);
}
}
free(key);
}
}
/*
* Section Devices
*/
/* validate the name of a dev_type */
bool validate_dev_type(const char *dev_type) {
static regex_t *preg = NULL;
int r;
if (!preg) {
preg = (regex_t *)malloc(sizeof(regex_t));
r = regcomp(preg, "^[a-zA-Z][a-zA-Z0-9_-]*.[a-zA-Z][a-zA-Z0-9_-]*$", REG_NOSUB);
if ( r != 0 ) {
preg = NULL;
E_ERROR("Error regcomp: %d\n", r);
return false;
}
}
return regexec(preg, dev_type, 0, NULL, 0) != REG_NOMATCH;
}
/* select a device */
device_t *select_device(devices_t *devices, const uuid_t *addr) {
device_t *np;
TAILQ_FOREACH(np, devices, entries)
if (uuid_compare(np->addr, *addr) == 0)
return np;
return NULL;
}
/* select or insert a device (its addr) */
device_t *add_device(devices_t *devices, const uuid_t *addr) {
device_t *device = select_device(devices, addr);
if (device)
return device;
device = (device_t *)malloc(sizeof(device_t));
uuid_copy(device->addr, *addr);
device->dev_type = NULL;
device->timeout = 0;
TAILQ_INIT(&(device->kvs));
TAILQ_INSERT_TAIL(devices, device, entries);
return device;
}
/* update a device */
void update_device(devices_t *devices, const char **keylist, cbor_item_t *cdevice) {
cbor_item_t *caddress, *cdev_type, *ctimeout, *cmap;
char *dev_type;
uuid_t addr;
device_t *device;
size_t len;
caddress = xAAL_cbor_map_get(cdevice, "address");
if ( !xAAL_cbor_is_uuid(caddress, &addr) )
return;
cdev_type = xAAL_cbor_map_get(cdevice, "dev_type");
dev_type = xAAL_cbor_string_dup(cdev_type, &len);
if ( !dev_type )
return;
if ( !validate_dev_type(dev_type) ) {
free(dev_type);
return;
}
device = add_device(devices, &addr);
if (device->dev_type) {
if (strcmp(device->dev_type, dev_type) != 0) {
free(dev_type);
return;
}
} else
device->dev_type = dev_type;
ctimeout = xAAL_cbor_map_get(cdevice, "timeout");
if ( cbor_is_int(ctimeout) )
device->timeout = cbor_get_int(ctimeout);
cmap = xAAL_cbor_map_get(cdevice, "map");
if ( cmap && cbor_isa_map(cmap) )
update_kvs(&(device->kvs), keylist, cmap);
}
/* Mostly for debug */
void print_targets(cbor_item_t *ctargets) {
size_t i, sz = cbor_array_size(ctargets);
char uuid[37];
printf("targets:");
for (i=0; i<sz; i++) {
uuid_unparse(cbor_bytestring_handle(cbor_move(cbor_array_get(ctargets, i))), uuid);
printf(" %s", uuid);
}
printf("\n");
}
/* search device having given values */
cbor_item_t *matching_device(devices_t *devices, const char *dev_type, vals_t *vals) {
cbor_item_t *ctargets;
device_t *device;
ctargets = cbor_new_indefinite_array();
TAILQ_FOREACH(device, devices, entries)
if ( (strncmp(device->dev_type, dev_type, strlen(dev_type))==0)
&& (matching_values(&(device->kvs), vals)) )
cbor_array_push(ctargets, cbor_build_bytestring(device->addr, 16));
print_targets(ctargets);//DEBUG
return ctargets;
}
/* cbor serialization of a device */
cbor_item_t *get_device(device_t *device) {
cbor_item_t *kvs, *cobj;
kvs = get_kvs(device);
cobj = cbor_new_definite_map( cbor_map_size(kvs)? 4 : 3 );
cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("address")), cbor_move(cbor_build_bytestring(device->addr, 14)) });
cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("dev_type")), cbor_move(cbor_build_string(device->dev_type)) });
cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("timeout")), cbor_move(xAAL_cbor_build_int(device->timeout)) });
if ( cbor_map_size(kvs) )
cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("map")), cbor_move(kvs) });
return cobj;
}
/* get a device by addr */
cbor_item_t *get_device_by_addr(devices_t *devices, const uuid_t *addr) {
device_t *device = select_device(devices, addr);
if (device)
return get_device(device);
return cbor_new_indefinite_map();;
}
/* delete a device */
void delete_device(devices_t *devices, device_t *device) {
TAILQ_REMOVE(devices, device, entries);
if (device->dev_type)
free(device->dev_type);
delete_kvs(&(device->kvs));
free(device);
}
/*
* Section Database
*/
/* read-write mutex */
void rwinit(xaal_ctx_t *xaal_ctx) {
int rc = pthread_rwlock_init(&(xaal_ctx->rwlock), NULL);
if (rc != 0)
E_FATAL("Error on pthread_rwlock_init: %d\n", rc);
}
void wrlock(xaal_ctx_t *xaal_ctx) {
int rc = pthread_rwlock_wrlock(&(xaal_ctx->rwlock));
if (rc != 0)
E_FATAL("Error on pthread_rwlock_wrlock: %d\n", rc);
}
void rdlock(xaal_ctx_t *xaal_ctx) {
int rc = pthread_rwlock_rdlock(&(xaal_ctx->rwlock));
if (rc != 0)
E_FATAL("Error on pthread_rwlock_rdlock: %d\n", rc);
}
void unlock(xaal_ctx_t *xaal_ctx) {
int rc = pthread_rwlock_unlock(&(xaal_ctx->rwlock));
if (rc != 0)
E_FATAL("Error on pthread_rwlock_unlock: %d\n", rc);
}
void rwdestroy(xaal_ctx_t *xaal_ctx) {
int rc = pthread_rwlock_destroy(&(xaal_ctx->rwlock));
if (rc != 0)
E_FATAL("Error on pthread_rwlock_destroy: %d\n", rc);
}
/* Init DB and load a file (if any) */
void load_db(xaal_ctx_t *xaal_ctx) {
cbor_item_t *cdb;
struct cbor_load_result cresult;
int fd, sz, i;
struct stat sb;
unsigned char *file_map;
TAILQ_INIT(&(xaal_ctx->devices));
rwinit(xaal_ctx);
fd = open(xaal_ctx->dbfile, O_RDONLY);
if (fd == -1) {
E_ERROR("Could not open(r) database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
return;
}
if (fstat(fd, &sb) == -1) { /* get file size */
E_ERROR("Could not do fstat on database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
return;
}
file_map = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (file_map == MAP_FAILED) {
E_ERROR("Could not mmap database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
return;
}
cdb = cbor_load(file_map, sb.st_size, &cresult);
munmap(file_map, sb.st_size);
close(fd);
if ( cdb == NULL ) {
E_ERROR("Could not parse database file %s. CBOR error; code %d, position %lu\n",
xaal_ctx->dbfile, cresult.error.code, cresult.error.position);
return;
}
if ( !cbor_isa_array(cdb) )
return;
sz = cbor_array_size(cdb);
wrlock(xaal_ctx);
for (i = 0; i < sz; i++)
update_device(&(xaal_ctx->devices), xaal_ctx->keylist, cbor_move(cbor_array_get(cdb, i)));
unlock(xaal_ctx);
cbor_decref(&cdb);
}
/* Dump DB to a file */
void dump_db(xaal_ctx_t *xaal_ctx) {
cbor_item_t *cdb;
device_t *device;
unsigned char *buffer;
size_t size, len;
int fd;
cdb = cbor_new_indefinite_array();
rdlock(xaal_ctx);
TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
cbor_array_push(cdb, cbor_move(get_device(device)));
rwdestroy(xaal_ctx);
fd = open(xaal_ctx->dbfile, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
if (fd == -1) {
E_ERROR("Could not open(w+) database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
return;
}
len = cbor_serialize_alloc(cdb, &buffer, &size);
if ( write(fd, buffer, len) != len )
E_ERROR("Writing database file: %s\n", strerror(errno));
free(buffer);
close(fd);
cbor_decref(&cdb);
}
/*
* Section xAAL threads
*/
/* Send is_alive request */
bool request_is_alive(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me) {
cbor_item_t *cbody, *cdev_types;
cdev_types = cbor_new_definite_array(1);
cbor_array_push(cdev_types, cbor_move(cbor_build_string("any.any")));
cbody = cbor_new_definite_map(1);
cbor_map_add(cbody, (struct cbor_pair){ cbor_move(cbor_build_string("dev_types")), cbor_move(cdev_types) });
return xAAL_write_bus(bus, me, xAAL_REQUEST, "is_alive", cbody, NULL);
}
/* Manage is_alive notifications */
bool receive_is_alive(const uuid_t *source, const char *dev_type, cbor_item_t *cbody, xaal_ctx_t *xaal_ctx) {
device_t *device;
wrlock(xaal_ctx);
device = add_device(&(xaal_ctx->devices), source);
if (!device->dev_type)
device->dev_type = strdup(dev_type);
else if (strcmp(device->dev_type, dev_type) != 0) {
unlock(xaal_ctx);
return false;
}
device->timeout = xAAL_read_aliveTimeout(cbody);
unlock(xaal_ctx);
return true;
}
/* Reply to get_attributes (empty answer, for now) */
bool reply_get_attributes(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me,
const uuid_t *target) {
return xAAL_write_busl(bus, me, xAAL_REPLY, "get_attributes", NULL, target, NULL);
}
/* Send get_keys_values request */
bool request_get_keys_values(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me,
cbor_item_t *ctargets, const uuid_t *device) {
cbor_item_t *cbody = cbor_new_definite_map(1);
cbor_map_add(cbody, (struct cbor_pair){ cbor_move(cbor_build_string("device")), cbor_move(cbor_build_bytestring(*device, 16)) });
return xAAL_write_bus(bus, me, xAAL_REQUEST, "get_keys_values", cbody, ctargets);
}
/* Manage get_keys_values replies and keysValusesChanged notifications (metadatadb.any) */
bool receive_KeysValues(cbor_item_t *cqueryBody, xaal_ctx_t *xaal_ctx) {
cbor_item_t *cdev, *cmap;
uuid_t dev;
device_t *device;
cdev = xAAL_cbor_map_get(cqueryBody, "device");
if ( !xAAL_cbor_is_uuid(cdev, &dev) )
return false;
cmap = xAAL_cbor_map_get(cqueryBody, "map");
if ( !cmap || !cbor_isa_map(cmap) )
return false;
wrlock(xaal_ctx);
device = add_device(&(xaal_ctx->devices), &dev);
update_kvs(&(device->kvs), xaal_ctx->keylist, cmap);
unlock(xaal_ctx);
return true;
}
#define FORCE_CHECK 10 /* counter */
/* Time to time, check missing info in db and send xAAL queries to know more */
void checking(xaal_ctx_t *xaal_ctx) {
cbor_item_t *ctargets;
device_t *device;
time_t deadline;
const int len = strlen("metadatadb.");
static int force_check = FORCE_CHECK;
/* delete dead devices */
deadline = time(NULL) - GRACE_DELAY;
wrlock(xaal_ctx);
TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
if (device->timeout < deadline)
delete_device(&(xaal_ctx->devices), device);
unlock(xaal_ctx);
/* check for kvs */
ctargets = cbor_new_indefinite_array();
wrlock(xaal_ctx);
TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
if (strncmp(device->dev_type, "metadatadb.", len) == 0)
cbor_array_push(ctargets, cbor_build_bytestring(device->addr, 16));
unlock(xaal_ctx);
if (cbor_array_size(ctargets) != 0) {
rdlock(xaal_ctx);
TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
if ( (force_check <= 0) || (device->kvs.tqh_first == NULL) )
request_get_keys_values(&(xaal_ctx->bus), &(xaal_ctx->majordom), cbor_incref(ctargets), &device->addr);
unlock(xaal_ctx);
cbor_decref(&ctargets);
}
if (force_check <= 0) {
force_check = FORCE_CHECK;
request_is_alive(&(xaal_ctx->bus), &(xaal_ctx->majordom));
}
force_check--;
}
/* Manage received message */
void manage_msg(xaal_ctx_t *xaal_ctx) {
xAAL_businfo_t *bus = &(xaal_ctx->bus);
xAAL_devinfo_t *me = &(xaal_ctx->majordom);
cbor_item_t *cbody, *ctargets;
char *dev_type, *action;
xAAL_msg_type_t msg_type;
uuid_t *source;
if (!xAAL_read_bus(bus, &ctargets, &source, &dev_type, &msg_type, &action, &cbody))
return;
if (xAAL_targets_match(ctargets, &me->addr)) {
if (msg_type == xAAL_REQUEST) {
if ( (strcmp(action, "is_alive") == 0)
&& xAAL_is_aliveDevType_match(cbody, me->dev_type) ) {
if ( !xAAL_notify_alive(bus, me) )
E_ERROR("Could not reply to is_alive\n");
} else if ( strcmp(action, "get_description") == 0 ) {
if ( !xAAL_reply_get_description(bus, me, source) )
E_ERROR("Could not reply to get_description\n");
} else if ( strcmp(action, "get_attributes") == 0 ) {
if ( !reply_get_attributes(bus, me, source) )
E_ERROR("Could not reply to get_attributes\n");
}
}
}
/* I am interested by messages from metadatadb to me or to others */
if ( (strncmp(dev_type, "metadatadb.", strlen("metadatadb.")) == 0)
&& (
( (msg_type == xAAL_REPLY) && (strcmp(action, "get_keys_values") == 0) )
|| ( (msg_type == xAAL_NOTIFY) && (strcmp(action, "keys_values_changed") == 0) ) ) )
receive_KeysValues(cbody, xaal_ctx);
/* Manage interesting notifications */
if ( (msg_type == xAAL_NOTIFY)
&& (strcmp(action, "alive") == 0) )
receive_is_alive(source, dev_type, cbody, xaal_ctx);
xAAL_free_msg(ctargets, source, dev_type, action, cbody);
}
/* main thread on the xAAL bus */
void *xaal_agent(void *data) {
xaal_ctx_t *xaal_ctx = data;
xAAL_businfo_t *bus = &(xaal_ctx->bus);
xAAL_devinfo_t *me = &(xaal_ctx->majordom);
fd_set rfds, rfds_;
int fd_max;
struct itimerspec timerspec;
uint64_t exp;
int alive_fd, checking_fd;
/* Set alive timer */
alive_fd = timerfd_create(CLOCK_REALTIME, 0);
if (alive_fd == -1)
perror("Could not create timer for alive messages");
timerspec.it_interval.tv_sec = ALIVE_PERIOD;
timerspec.it_interval.tv_nsec = 0;
timerspec.it_value.tv_sec = 0;
timerspec.it_value.tv_nsec = 1;
if ( timerfd_settime(alive_fd, 0, &timerspec, NULL) == -1 )
perror("Could not configure timer for alive messages");
/* Set timer for 'checking queries' */
checking_fd = timerfd_create(CLOCK_REALTIME, 0);
if (checking_fd == -1)
perror("Could not create timer for checking");
timerspec.it_interval.tv_sec = CHECKING_PERIOD;
timerspec.it_interval.tv_nsec = 0;
timerspec.it_value.tv_sec = CHECKING_PERIOD;
timerspec.it_value.tv_nsec = 0;
if ( timerfd_settime(checking_fd, 0, &timerspec, NULL) == -1 )
perror("Could not configure timer for checking");
/* sends a broadcast is_alive */
request_is_alive(bus, me);
/* xAAL loop */
FD_ZERO(&rfds);
FD_SET(bus->sfd, &rfds);
fd_max = bus->sfd;
FD_SET(alive_fd, &rfds);
fd_max = (fd_max > alive_fd)? fd_max : alive_fd;
FD_SET(checking_fd, &rfds);
fd_max = (fd_max > checking_fd)? fd_max : checking_fd;
for (;;) {
rfds_ = rfds;
if ( (select(fd_max+1, &rfds_, NULL, NULL, NULL) == -1) && (errno != EINTR) )
E_ERROR("select(): %s\n", strerror(errno));
if (FD_ISSET(alive_fd, &rfds_)) {
if ( read(alive_fd, &exp, sizeof(uint64_t)) == -1 )
E_ERROR("Alive timer\n");
if ( !xAAL_notify_alive(bus, me) )
E_ERROR("Could not send alive notification.\n");
}
if (FD_ISSET(checking_fd, &rfds_)) {
if ( read(checking_fd, &exp, sizeof(uint64_t)) == -1 )
E_ERROR("Checking timer\n");
checking(xaal_ctx);
}
if (FD_ISSET(bus->sfd, &rfds_))
manage_msg(xaal_ctx);
}
return NULL;
}