Skip to content
Snippets Groups Projects
Select Git revision
  • c9d5301325ccabe3862a6f208056b43e7b3f008f
  • without_tipselection default
  • develop protected
  • fix/grafana-local-dashboard
  • wasp
  • fix/dashboard-explorer-freeze
  • master
  • feat/timerqueue
  • test/sync_debug_and_650
  • feat/sync_revamp_inv
  • wip/sync
  • tool/db-recovery
  • portcheck/fix
  • fix/synchronization
  • feat/new-dashboard-analysis
  • feat/refactored-analysis-dashboard
  • feat/new-analysis-dashboard
  • test/demo-prometheus-fpc
  • prometheus_metrics
  • wip/analysis-server
  • merge/fpc-test-value-transfer
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.3
  • v0.1.2
  • v0.1.1
  • v0.1.0
28 results

plugin.go

Blame
  • agent.c 19.15 KiB
    /* db - xAAL agent
     * Part of the 'majordom' software
     * (c) 2019 Christophe Lohr <christophe.lohr@imt-atlantique.fr>
     *
     * This program is free software: you can redistribute it and/or modify
     * it under the terms of the GNU General Public License as published by
     * the Free Software Foundation, either version 3 of the License, or
     * (at your option) any later version.
     *
     * This program is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU General Public License for more details.
     *
     * You should have received a copy of the GNU General Public License
     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
     */
    
    #include <pthread.h>
    #include <string.h>
    #include <errno.h>
    #include <uuid/uuid.h>
    #include <regex.h>
    #include <stdbool.h>
    #include <unistd.h>
    #include <sys/timerfd.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <sys/mman.h>
    #include <fcntl.h>
    
    #include <sphinxbase/err.h>
    
    #include "agent.h"
    
    
    #define ALIVE_PERIOD	120
    #define CHECKING_PERIOD	 10
    #define GRACE_DELAY	120
    
    
    /*
     * Section Keys-Values of a Device
     */
    
    
    /* insert a value in the list */
    void insert_value(vals_t *vals, const char *value) {
      val_t *np;
    
      TAILQ_FOREACH(np, vals, entries)
        if ( (strcmp(value, np->value) == 0) )
          return;
      np = (val_t *)malloc(sizeof(val_t));
      np->value = strdup(value);
      TAILQ_INSERT_TAIL(vals, np, entries);
    }
    
    
    /* delete a value in the list */
    void delete_value(vals_t *vals, const char *value) {
     val_t *np;
    
      TAILQ_FOREACH(np, vals, entries)
        if ( (strcmp(value, np->value) == 0) ) {
          TAILQ_REMOVE(vals, np, entries);
          free(np->value);
          free(np);
          return;
        }
    
    }
    
    
    /* delete the list of values */
    void delete_vals(vals_t *vals) {
      val_t *np;
    
      TAILQ_FOREACH(np, vals, entries) {
        TAILQ_REMOVE(vals, np, entries);
        free(np->value);
        free(np);
      }
    }
    
    
    void print_vals(vals_t *vals) {
      val_t *np;
    
      TAILQ_FOREACH(np, vals, entries)
        printf("%s ", np->value);
      printf("\n");
    }
    
    
    
    /* insert a kv in the list */
    void insert_kv(kvs_t *kvs, const char *key, const char *value) {
      kv_t *np;
    
      TAILQ_FOREACH(np, kvs, entries)
        if (strcmp(key, np->key) == 0) {
          free(np->value);
          np->value = strdup(value);
          return;
        }
      np = (kv_t *)malloc(sizeof(kv_t));
      np->key = strdup(key);
      np->value = strdup(value);
      TAILQ_INSERT_TAIL(kvs, np, entries);
    }
    
    
    /* check if a value is in the list of keys-values */
    bool check_value(kvs_t *kvs, const char *value) {
      kv_t *np;
    
      TAILQ_FOREACH(np, kvs, entries)
        if ( (strcmp(value, np->value) == 0) )
          return true;
      return false;
    }
    
    
    /* delete a kv in the list */
    void delete_kv(kvs_t *kvs, const char *key) {
      kv_t *np;
    
      TAILQ_FOREACH(np, kvs, entries)
        if (strcmp(key, np->key) == 0) {
          TAILQ_REMOVE(kvs, np, entries);
          free(np->key);
          free(np->value);
          free(np);
          return;
        }
    }
    
    
    /* delete the list of kvs */
    void delete_kvs(kvs_t *kvs) {
      kv_t *np;
    
      TAILQ_FOREACH(np, kvs, entries) {
        TAILQ_REMOVE(kvs, np, entries);
        free(np->key);
        free(np->value);
        free(np);
      }
    }
    
    
    void print_kvs(kvs_t *kvs) {
      kv_t *np;
    
      TAILQ_FOREACH(np, kvs, entries)
        printf("%s:%s ", np->key, np->value);
      printf("\n");
    }
    
    
    /* check if all values of subset are presents into a kv overset */
    bool matching_values(kvs_t *overset, vals_t *subset) {
      val_t *np;
    
      TAILQ_FOREACH(np, subset, entries)
        if ( !check_value(overset, np->value) )
          return false;
      return true;
    }
    
    
    /* serialize kvs of a device */
    cbor_item_t *get_kvs(device_t *device) {
      cbor_item_t *cmap = cbor_new_indefinite_map();
      kv_t *np = NULL;
    
      TAILQ_FOREACH(np, &(device->kvs), entries)
        cbor_map_add(cmap, (struct cbor_pair){ cbor_move(cbor_build_string(np->key)), cbor_move(cbor_build_string(np->key)) });
      return cmap;
    }
    
    
    /* get kvs of a device by addr */
    cbor_item_t *get_kvs_by_addr(devices_t *devices, const uuid_t *addr) {
      device_t *device = select_device(devices, addr);
      if (device)
        return get_kvs(device);
      return NULL;
    }
    
    
    
    
    /* Check if a key belong to the lists of wanted keys */
    bool goodkey(const char **keylist, const char *key) {
      const char **iter;
      for (iter = keylist; *iter; iter++) {
        if (strcmp(*iter, key) == 0)
          return true;
      };
      return false;
    }
    
    
    /* update the list of kvs (merge) with values of wanted keys */
    void update_kvs(kvs_t *kvs, const char **keylist, cbor_item_t *cmap) {
      size_t i, len, sz = cbor_map_size(cmap);
      struct cbor_pair *pairs = cbor_map_handle(cmap);
      char *key, *value;
      
      for (i=0; i<sz; i++) {
        key = xAAL_cbor_string_dup(pairs[i].key, &len);
        if (goodkey(keylist, key)) {
          if ( cbor_is_null(pairs[i].value) )
            delete_kv(kvs, key);
          else if ( cbor_isa_string(pairs[i].value) ) {
            value = xAAL_cbor_string_dup(pairs[i].value, &len);
            insert_kv(kvs, key, value);
            free(value);
          }
        }
        free(key);
      }
    }
    
    
    
    /*
     * Section Devices
     */
    
    /* validate the name of a dev_type */
    bool validate_dev_type(const char *dev_type) {
      static regex_t *preg = NULL;
      int r;
    
      if (!preg) {
        preg = (regex_t *)malloc(sizeof(regex_t));
        r = regcomp(preg, "^[a-zA-Z][a-zA-Z0-9_-]*.[a-zA-Z][a-zA-Z0-9_-]*$", REG_NOSUB);
        if ( r != 0 ) {
          preg = NULL;
          E_ERROR("Error regcomp: %d\n", r);
          return false;
        }
      }
      return regexec(preg, dev_type, 0, NULL, 0) != REG_NOMATCH;
    }
    
    
    /* select a device */
    device_t *select_device(devices_t *devices, const uuid_t *addr) {
      device_t *np;
    
      TAILQ_FOREACH(np, devices, entries)
        if (uuid_compare(np->addr, *addr) == 0)
          return np;
      return NULL;
    }
    
    
    /* select or insert a device (its addr) */
    device_t *add_device(devices_t *devices, const uuid_t *addr) {
      device_t *device = select_device(devices, addr);
    
      if (device)
        return device;
    
      device = (device_t *)malloc(sizeof(device_t));
      uuid_copy(device->addr, *addr);
      device->dev_type = NULL;
      device->timeout = 0;
      TAILQ_INIT(&(device->kvs));
      TAILQ_INSERT_TAIL(devices, device, entries);
      return device;
    }
    
    
    /* update a device */
    void update_device(devices_t *devices, const char **keylist, cbor_item_t *cdevice) {
      cbor_item_t *caddress, *cdev_type, *ctimeout, *cmap;
      char *dev_type;
      uuid_t addr;
      device_t *device;
      size_t len;
    
      caddress = xAAL_cbor_map_get(cdevice, "address");
      if ( !xAAL_cbor_is_uuid(caddress, &addr) )
        return;
        
      cdev_type = xAAL_cbor_map_get(cdevice, "dev_type");
      dev_type = xAAL_cbor_string_dup(cdev_type, &len);
      if ( !dev_type )
        return;
    
      if ( !validate_dev_type(dev_type) ) {
        free(dev_type);
        return;
      }
    
      device = add_device(devices, &addr);
    
      if (device->dev_type) {
        if (strcmp(device->dev_type, dev_type) != 0) {
          free(dev_type);
          return;
        }
      } else
        device->dev_type = dev_type;
    
      ctimeout = xAAL_cbor_map_get(cdevice, "timeout");
      if ( cbor_is_int(ctimeout) )
        device->timeout = cbor_get_int(ctimeout);
    
      cmap = xAAL_cbor_map_get(cdevice, "map");
      if ( cmap && cbor_isa_map(cmap) )
        update_kvs(&(device->kvs), keylist, cmap);
    }
    
    
    
    /* Mostly for debug */
    void print_targets(cbor_item_t *ctargets) {
      size_t i, sz = cbor_array_size(ctargets);
      char uuid[37];
      
      printf("targets:");
      for (i=0; i<sz; i++) {
        uuid_unparse(cbor_bytestring_handle(cbor_move(cbor_array_get(ctargets, i))), uuid);
        printf(" %s", uuid);
      }
      printf("\n");
    }
    
    
    /* search device having given values */
    cbor_item_t *matching_device(devices_t *devices, const char *dev_type, vals_t *vals) {
      cbor_item_t *ctargets;
      device_t *device;
    
      ctargets = cbor_new_indefinite_array();
      TAILQ_FOREACH(device, devices, entries)
        if (    (strncmp(device->dev_type, dev_type, strlen(dev_type))==0)
    	 && (matching_values(&(device->kvs), vals)) )
          cbor_array_push(ctargets, cbor_build_bytestring(device->addr, 16));
    print_targets(ctargets);//DEBUG
      return ctargets;
    }
    
    
    /* cbor serialization of a device */
    cbor_item_t *get_device(device_t *device) {
      cbor_item_t *kvs, *cobj;
      
      kvs = get_kvs(device);
      cobj = cbor_new_definite_map( cbor_map_size(kvs)? 4 : 3 );
    
      cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("address")), cbor_move(cbor_build_bytestring(device->addr, 14)) });
      cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("dev_type")), cbor_move(cbor_build_string(device->dev_type)) });
      cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("timeout")), cbor_move(xAAL_cbor_build_int(device->timeout)) });
      if ( cbor_map_size(kvs) )
        cbor_map_add(cobj, (struct cbor_pair){ cbor_move(cbor_build_string("map")),     cbor_move(kvs) });
      return cobj;
    }
    
    
    /* get a device by addr */
    cbor_item_t *get_device_by_addr(devices_t *devices, const uuid_t *addr) {
      device_t *device = select_device(devices, addr);
      if (device)
        return get_device(device);
      return cbor_new_indefinite_map();;
    }
    
    
    /* delete a device */
    void delete_device(devices_t *devices, device_t *device) {
      TAILQ_REMOVE(devices, device, entries);
      if (device->dev_type)
        free(device->dev_type);
      delete_kvs(&(device->kvs));
      free(device);
    }
    
    
    
    /*
     * Section Database
     */
    
    /* read-write mutex */
    void rwinit(xaal_ctx_t *xaal_ctx) {
      int rc = pthread_rwlock_init(&(xaal_ctx->rwlock), NULL);
      if (rc != 0)
        E_FATAL("Error on pthread_rwlock_init: %d\n", rc);
    }
    
    void wrlock(xaal_ctx_t *xaal_ctx) {
      int rc = pthread_rwlock_wrlock(&(xaal_ctx->rwlock));
      if (rc != 0)
        E_FATAL("Error on pthread_rwlock_wrlock: %d\n", rc);
    }
    
    void rdlock(xaal_ctx_t *xaal_ctx) {
      int rc = pthread_rwlock_rdlock(&(xaal_ctx->rwlock));
      if (rc != 0)
        E_FATAL("Error on pthread_rwlock_rdlock: %d\n", rc);
    }
    
    void unlock(xaal_ctx_t *xaal_ctx) {
      int rc = pthread_rwlock_unlock(&(xaal_ctx->rwlock));
      if (rc != 0)
        E_FATAL("Error on pthread_rwlock_unlock: %d\n", rc);
    }
    
    void rwdestroy(xaal_ctx_t *xaal_ctx) {
      int rc = pthread_rwlock_destroy(&(xaal_ctx->rwlock));
      if (rc != 0)
        E_FATAL("Error on pthread_rwlock_destroy: %d\n", rc);
    }
    
    
    /* Init DB and load a file (if any) */
    void load_db(xaal_ctx_t *xaal_ctx) {
      cbor_item_t *cdb;
      struct cbor_load_result cresult;
      int fd, sz, i;
      struct stat sb;
      unsigned char *file_map;
    
      TAILQ_INIT(&(xaal_ctx->devices));
      rwinit(xaal_ctx);
    
      fd = open(xaal_ctx->dbfile, O_RDONLY);
      if (fd == -1) {
        E_ERROR("Could not open(r) database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
        return;
      }
    
      if (fstat(fd, &sb) == -1) { /* get file size */
        E_ERROR("Could not do fstat on database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
        return;
      }
     
      file_map = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); 
      if (file_map == MAP_FAILED) {
        E_ERROR("Could not mmap database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
        return;
      }
      
      cdb = cbor_load(file_map, sb.st_size, &cresult);
      munmap(file_map, sb.st_size);
      close(fd);
      if ( cdb == NULL ) {
        E_ERROR("Could not parse database file %s. CBOR error; code %d, position %lu\n", 
                xaal_ctx->dbfile, cresult.error.code, cresult.error.position);
        return;
      }
    
      if ( !cbor_isa_array(cdb) )
        return;
    
      sz = cbor_array_size(cdb);
      wrlock(xaal_ctx);
      for (i = 0; i < sz; i++)
          update_device(&(xaal_ctx->devices), xaal_ctx->keylist, cbor_move(cbor_array_get(cdb, i)));
      unlock(xaal_ctx);
      cbor_decref(&cdb);
    }
    
    
    /* Dump DB to a file */
    void dump_db(xaal_ctx_t *xaal_ctx) {
      cbor_item_t *cdb;
      device_t *device;
      unsigned char *buffer;
      size_t size, len;
      int fd;
    
      cdb = cbor_new_indefinite_array();
    
      rdlock(xaal_ctx);
      TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
        cbor_array_push(cdb, cbor_move(get_device(device)));
      rwdestroy(xaal_ctx);
    
      fd = open(xaal_ctx->dbfile, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
      if (fd == -1) {
        E_ERROR("Could not open(w+) database file %s. %s\n", xaal_ctx->dbfile, strerror(errno));
        return;
      }
    
      len = cbor_serialize_alloc(cdb, &buffer, &size);
      if ( write(fd, buffer, len) != len )
        E_ERROR("Writing database file: %s\n", strerror(errno));
      free(buffer);
      close(fd);
    
      cbor_decref(&cdb);
    }
    
    
    
    
    /*
     * Section xAAL threads
     */
    
    
    /* Send is_alive request */
    bool request_is_alive(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me) {
      cbor_item_t *cbody, *cdev_types;
    
      cdev_types = cbor_new_definite_array(1);
      cbor_array_push(cdev_types, cbor_move(cbor_build_string("any.any")));
    
      cbody = cbor_new_definite_map(1);
      cbor_map_add(cbody, (struct cbor_pair){ cbor_move(cbor_build_string("dev_types")), cbor_move(cdev_types) });
    
      return xAAL_write_bus(bus, me, xAAL_REQUEST, "is_alive", cbody, NULL);
    }
    
    
    /* Manage is_alive notifications */
    bool receive_is_alive(const uuid_t *source, const char *dev_type, cbor_item_t *cbody, xaal_ctx_t *xaal_ctx) {
      device_t *device;
    
      wrlock(xaal_ctx);
      device = add_device(&(xaal_ctx->devices), source);
      if (!device->dev_type)
        device->dev_type = strdup(dev_type);
      else if (strcmp(device->dev_type, dev_type) != 0) {
        unlock(xaal_ctx);
        return false;
      }
      device->timeout = xAAL_read_aliveTimeout(cbody);
      unlock(xaal_ctx);
      return true;
    }
    
    
    /* Reply to get_attributes (empty answer, for now) */
    bool reply_get_attributes(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me,
    			 const uuid_t *target) {
      return xAAL_write_busl(bus, me, xAAL_REPLY, "get_attributes", NULL, target, NULL);
    }
    
    
    
    /* Send get_keys_values request */
    bool request_get_keys_values(const xAAL_businfo_t *bus, const xAAL_devinfo_t *me,
    		     cbor_item_t *ctargets, const uuid_t *device) {
      cbor_item_t *cbody = cbor_new_definite_map(1);
      cbor_map_add(cbody, (struct cbor_pair){ cbor_move(cbor_build_string("device")), cbor_move(cbor_build_bytestring(*device, 16)) });
      return xAAL_write_bus(bus, me, xAAL_REQUEST, "get_keys_values", cbody, ctargets);
    }
    
    
    /* Manage get_keys_values replies and keysValusesChanged notifications (metadatadb.any) */
    bool receive_KeysValues(cbor_item_t *cqueryBody, xaal_ctx_t *xaal_ctx) {
      cbor_item_t *cdev, *cmap;
      uuid_t dev;
      device_t *device;
    
      cdev = xAAL_cbor_map_get(cqueryBody, "device");
      if ( !xAAL_cbor_is_uuid(cdev, &dev) )
        return false;
        
      cmap = xAAL_cbor_map_get(cqueryBody, "map");
      if ( !cmap || !cbor_isa_map(cmap) )
        return false;
    
      wrlock(xaal_ctx);
      device = add_device(&(xaal_ctx->devices), &dev);
      update_kvs(&(device->kvs), xaal_ctx->keylist, cmap);
      unlock(xaal_ctx);
      return true;
    }
    
    
    
    
    #define FORCE_CHECK	10	/* counter */
    
    /* Time to time, check missing info in db and send xAAL queries to know more */
    void checking(xaal_ctx_t *xaal_ctx) {
      cbor_item_t *ctargets;
      device_t *device;
      time_t deadline;
      const int len = strlen("metadatadb.");
      static int force_check = FORCE_CHECK;
    
      /* delete dead devices */
      deadline = time(NULL) - GRACE_DELAY;
      wrlock(xaal_ctx);
      TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
        if (device->timeout < deadline)
          delete_device(&(xaal_ctx->devices), device);
      unlock(xaal_ctx);
    
      /* check for kvs */
      ctargets = cbor_new_indefinite_array();
      wrlock(xaal_ctx);
      TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
        if (strncmp(device->dev_type, "metadatadb.", len) == 0)
          cbor_array_push(ctargets, cbor_build_bytestring(device->addr, 16));
      unlock(xaal_ctx);
      if (cbor_array_size(ctargets) != 0) {
        rdlock(xaal_ctx);
        TAILQ_FOREACH(device, &(xaal_ctx->devices), entries)
          if ( (force_check <= 0) || (device->kvs.tqh_first == NULL) )
    	request_get_keys_values(&(xaal_ctx->bus), &(xaal_ctx->majordom), cbor_incref(ctargets), &device->addr);
        unlock(xaal_ctx);
        cbor_decref(&ctargets);
      }
      if (force_check <= 0) {
        force_check = FORCE_CHECK;
        request_is_alive(&(xaal_ctx->bus), &(xaal_ctx->majordom));
      }
      force_check--;
    }
    
    
    
    
    /* Manage received message */
    void manage_msg(xaal_ctx_t *xaal_ctx) {
      xAAL_businfo_t *bus = &(xaal_ctx->bus);
      xAAL_devinfo_t *me = &(xaal_ctx->majordom);
      cbor_item_t *cbody, *ctargets;
      char *dev_type, *action;
      xAAL_msg_type_t msg_type;
      uuid_t *source;
    
      if (!xAAL_read_bus(bus, &ctargets, &source, &dev_type, &msg_type, &action, &cbody))
        return;
    
      if (xAAL_targets_match(ctargets, &me->addr)) {
    
        if (msg_type == xAAL_REQUEST) {
    
          if ( (strcmp(action, "is_alive") == 0)
    	   && xAAL_is_aliveDevType_match(cbody, me->dev_type) ) {
    	if ( !xAAL_notify_alive(bus, me) )
    	  E_ERROR("Could not reply to is_alive\n");
    
          } else if ( strcmp(action, "get_description") == 0 ) {
    	if ( !xAAL_reply_get_description(bus, me, source) )
    	  E_ERROR("Could not reply to get_description\n");
    
          } else if ( strcmp(action, "get_attributes") == 0 ) {
    	if ( !reply_get_attributes(bus, me, source) )
    	  E_ERROR("Could not reply to get_attributes\n");
    
          }
        }
      }
    
      /* I am interested by messages from metadatadb to me or to others */
      if ( (strncmp(dev_type, "metadatadb.", strlen("metadatadb.")) == 0)
    	&& (
    	     ( (msg_type == xAAL_REPLY) && (strcmp(action, "get_keys_values") == 0) )
    	  || ( (msg_type == xAAL_NOTIFY) && (strcmp(action, "keys_values_changed") == 0) ) ) )
          receive_KeysValues(cbody, xaal_ctx);
    
      /* Manage interesting notifications */
      if (  (msg_type == xAAL_NOTIFY)
           && (strcmp(action, "alive") == 0) )
        receive_is_alive(source, dev_type, cbody, xaal_ctx);
    
      xAAL_free_msg(ctargets, source, dev_type, action, cbody);
    }
    
    
    
    /* main thread on the xAAL bus */
    void *xaal_agent(void *data) {
      xaal_ctx_t *xaal_ctx = data;
      xAAL_businfo_t *bus = &(xaal_ctx->bus);
      xAAL_devinfo_t *me = &(xaal_ctx->majordom);
      fd_set rfds, rfds_;
      int fd_max;
      struct itimerspec timerspec;
      uint64_t exp;
      int alive_fd, checking_fd;
    
       /* Set alive timer */
      alive_fd = timerfd_create(CLOCK_REALTIME, 0);
      if (alive_fd == -1)
        perror("Could not create timer for alive messages");
      timerspec.it_interval.tv_sec = ALIVE_PERIOD;
      timerspec.it_interval.tv_nsec = 0;
      timerspec.it_value.tv_sec = 0;
      timerspec.it_value.tv_nsec = 1;
      if ( timerfd_settime(alive_fd, 0, &timerspec, NULL) == -1 )
        perror("Could not configure timer for alive messages");
    
        /* Set timer for 'checking queries' */
      checking_fd = timerfd_create(CLOCK_REALTIME, 0);
      if (checking_fd == -1)
        perror("Could not create timer for checking");
      timerspec.it_interval.tv_sec = CHECKING_PERIOD;
      timerspec.it_interval.tv_nsec = 0;
      timerspec.it_value.tv_sec = CHECKING_PERIOD;
      timerspec.it_value.tv_nsec = 0;
      if ( timerfd_settime(checking_fd, 0, &timerspec, NULL) == -1 )
        perror("Could not configure timer for checking");
    
      /* sends a broadcast is_alive */
      request_is_alive(bus, me);
    
      /* xAAL loop */
      FD_ZERO(&rfds);
      FD_SET(bus->sfd, &rfds);
      fd_max = bus->sfd;
      FD_SET(alive_fd, &rfds);
      fd_max = (fd_max > alive_fd)? fd_max : alive_fd;
      FD_SET(checking_fd, &rfds);
      fd_max = (fd_max > checking_fd)? fd_max : checking_fd;
    
      for (;;) {
        rfds_ = rfds;
        if ( (select(fd_max+1, &rfds_, NULL, NULL, NULL) == -1) && (errno != EINTR) )
          E_ERROR("select(): %s\n", strerror(errno));
    
        if (FD_ISSET(alive_fd, &rfds_)) {
          if ( read(alive_fd, &exp, sizeof(uint64_t)) == -1 )
    	E_ERROR("Alive timer\n");
          if ( !xAAL_notify_alive(bus, me) )
    	E_ERROR("Could not send alive notification.\n");
        }
    
        if (FD_ISSET(checking_fd, &rfds_)) {
          if ( read(checking_fd, &exp, sizeof(uint64_t)) == -1 )
    	E_ERROR("Checking timer\n");
          checking(xaal_ctx);
        }
    
        if (FD_ISSET(bus->sfd, &rfds_))
          manage_msg(xaal_ctx);
      }
      return NULL;
    }