Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
Python
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
xAAL
Code
Python
Commits
a3a33c11
Commit
a3a33c11
authored
5 months ago
by
KERDREUX Jerome
Browse files
Options
Downloads
Patches
Plain Diff
Format
Just a test for new rules
parent
e8c5d98b
No related branches found
No related tags found
1 merge request
!1
First try of type hints
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
libs/monitor/xaal/monitor/monitor.py
+79
-55
79 additions, 55 deletions
libs/monitor/xaal/monitor/monitor.py
with
79 additions
and
55 deletions
libs/monitor/xaal/monitor/monitor.py
+
79
−
55
View file @
a3a33c11
...
@@ -7,6 +7,7 @@ from xaal.lib import Message
...
@@ -7,6 +7,7 @@ from xaal.lib import Message
import
logging
import
logging
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
# how often we force refresh the devices attributes/description/keyvalues
# how often we force refresh the devices attributes/description/keyvalues
...
@@ -22,7 +23,7 @@ def now():
...
@@ -22,7 +23,7 @@ def now():
class
TimedDict
(
dict
):
class
TimedDict
(
dict
):
def
__init__
(
self
,
refresh_rate
=
REFRESH_RATE
,
data
=
{}):
def
__init__
(
self
,
refresh_rate
=
REFRESH_RATE
,
data
=
{}):
dict
.
__init__
(
self
,
data
)
dict
.
__init__
(
self
,
data
)
self
.
last_update
=
0
self
.
last_update
=
0
self
.
next_update
=
0
self
.
next_update
=
0
...
@@ -31,12 +32,12 @@ class TimedDict(dict):
...
@@ -31,12 +32,12 @@ class TimedDict(dict):
self
.
next_update
=
self
.
last_update
+
REFRESH_RATE
+
random
.
randint
(
-
30
,
30
)
self
.
next_update
=
self
.
last_update
+
REFRESH_RATE
+
random
.
randint
(
-
30
,
30
)
def
__setitem__
(
self
,
key
,
item
):
def
__setitem__
(
self
,
key
,
item
):
super
().
__setitem__
(
key
,
item
)
super
().
__setitem__
(
key
,
item
)
self
.
updated
()
self
.
updated
()
def
update
(
self
,
dict_
):
def
update
(
self
,
dict_
):
changed
=
False
if
self
.
last_update
!=
0
else
True
changed
=
False
if
self
.
last_update
!=
0
else
True
if
dict_
!=
self
:
if
dict_
!=
self
:
changed
=
True
changed
=
True
super
().
update
(
dict_
)
super
().
update
(
dict_
)
self
.
updated
()
self
.
updated
()
...
@@ -52,15 +53,15 @@ class Device:
...
@@ -52,15 +53,15 @@ class Device:
self
.
short_address
=
tools
.
reduce_addr
(
addr
)
self
.
short_address
=
tools
.
reduce_addr
(
addr
)
self
.
dev_type
=
dev_type
self
.
dev_type
=
dev_type
# device cache
# device cache
self
.
attributes
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
)
self
.
attributes
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
)
self
.
description
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
*
3
)
self
.
description
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
*
3
)
self
.
db
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
*
3
)
self
.
db
=
TimedDict
(
refresh_rate
=
REFRESH_RATE
*
3
)
# Alive management
# Alive management
self
.
last_alive
=
now
()
self
.
last_alive
=
now
()
self
.
next_alive
=
0
self
.
next_alive
=
0
def
update_attributes
(
self
,
data
):
def
update_attributes
(
self
,
data
):
"""
rude update attributes. Return true if updated
"""
"""
rude update attributes. Return true if updated
"""
# really not the best comparaison, but we just need a flag
# really not the best comparaison, but we just need a flag
return
self
.
attributes
.
update
(
data
)
return
self
.
attributes
.
update
(
data
)
...
@@ -72,10 +73,10 @@ class Device:
...
@@ -72,10 +73,10 @@ class Device:
"""
"""
return
self
.
description
.
update
(
data
)
return
self
.
description
.
update
(
data
)
def
update_db
(
self
,
data
):
def
update_db
(
self
,
data
):
return
self
.
db
.
update
(
data
)
return
self
.
db
.
update
(
data
)
def
set_db
(
self
,
data
):
def
set_db
(
self
,
data
):
purge
=
[]
purge
=
[]
for
k
in
data
:
for
k
in
data
:
if
data
[
k
]
is
None
:
if
data
[
k
]
is
None
:
...
@@ -90,15 +91,15 @@ class Device:
...
@@ -90,15 +91,15 @@ class Device:
return
True
return
True
return
False
return
False
def
alive
(
self
,
value
):
def
alive
(
self
,
value
):
self
.
last_alive
=
int
(
time
.
time
())
self
.
last_alive
=
int
(
time
.
time
())
self
.
next_alive
=
self
.
last_alive
+
value
self
.
next_alive
=
self
.
last_alive
+
value
def
get_kv
(
self
,
key
):
def
get_kv
(
self
,
key
):
return
self
.
db
.
get
(
key
,
None
)
return
self
.
db
.
get
(
key
,
None
)
def
dump
(
self
):
def
dump
(
self
):
print
(
"
*** %s %s **
"
%
(
self
.
address
,
self
.
dev_type
))
print
(
"
*** %s %s **
"
%
(
self
.
address
,
self
.
dev_type
))
print
(
"
Description : %s
"
%
self
.
description
)
print
(
"
Description : %s
"
%
self
.
description
)
print
(
"
Attributes : %s
"
%
self
.
attributes
)
print
(
"
Attributes : %s
"
%
self
.
attributes
)
print
()
print
()
...
@@ -106,19 +107,21 @@ class Device:
...
@@ -106,19 +107,21 @@ class Device:
@property
@property
def
display_name
(
self
):
def
display_name
(
self
):
result
=
tools
.
reduce_addr
(
self
.
address
)
result
=
tools
.
reduce_addr
(
self
.
address
)
result
=
self
.
db
.
get
(
'
nickname
'
,
result
)
result
=
self
.
db
.
get
(
'
nickname
'
,
result
)
result
=
self
.
db
.
get
(
'
name
'
,
result
)
result
=
self
.
db
.
get
(
'
name
'
,
result
)
return
result
return
result
class
Devices
:
class
Devices
:
"""
Device List for monitoring
"""
"""
Device List for monitoring
"""
def
__init__
(
self
):
def
__init__
(
self
):
self
.
__devs
=
{}
self
.
__devs
=
{}
self
.
__list_cache
=
None
self
.
__list_cache
=
None
def
add
(
self
,
addr
,
dev_type
):
def
add
(
self
,
addr
,
dev_type
):
dev
=
Device
(
addr
,
dev_type
)
dev
=
Device
(
addr
,
dev_type
)
self
.
__devs
.
update
({
addr
:
dev
})
self
.
__devs
.
update
({
addr
:
dev
})
self
.
__list_cache
=
None
self
.
__list_cache
=
None
return
dev
return
dev
...
@@ -128,9 +131,9 @@ class Devices:
...
@@ -128,9 +131,9 @@ class Devices:
def
get
(
self
):
def
get
(
self
):
if
not
self
.
__list_cache
:
if
not
self
.
__list_cache
:
#print("Refresh cache")
#
print("Refresh cache")
res
=
list
(
self
.
__devs
.
values
())
res
=
list
(
self
.
__devs
.
values
())
res
.
sort
(
key
=
lambda
d
:
d
.
dev_type
)
res
.
sort
(
key
=
lambda
d
:
d
.
dev_type
)
self
.
__list_cache
=
res
self
.
__list_cache
=
res
return
self
.
__list_cache
return
self
.
__list_cache
...
@@ -143,7 +146,7 @@ class Devices:
...
@@ -143,7 +146,7 @@ class Devices:
def
get_with_group
(
self
,
addr
):
def
get_with_group
(
self
,
addr
):
r
=
[]
r
=
[]
for
d
in
self
.
get
():
for
d
in
self
.
get
():
if
addr
==
d
.
description
.
get
(
'
group_id
'
,
None
):
if
addr
==
d
.
description
.
get
(
'
group_id
'
,
None
):
r
.
append
(
d
)
r
.
append
(
d
)
return
r
return
r
...
@@ -164,7 +167,7 @@ class Devices:
...
@@ -164,7 +167,7 @@ class Devices:
def
get_with_key_value
(
self
,
key
,
value
):
def
get_with_key_value
(
self
,
key
,
value
):
r
=
[]
r
=
[]
for
d
in
self
.
get
():
for
d
in
self
.
get
():
if
(
key
in
d
.
db
)
and
(
d
.
db
[
key
]
==
value
):
if
(
key
in
d
.
db
)
and
(
d
.
db
[
key
]
==
value
):
r
.
append
(
d
)
r
.
append
(
d
)
return
r
return
r
...
@@ -176,7 +179,7 @@ class Devices:
...
@@ -176,7 +179,7 @@ class Devices:
return
None
return
None
def
get_dev_types
(
self
):
def
get_dev_types
(
self
):
"""
return the list of distinct dev_types
"""
"""
return the list of distinct dev_types
"""
ll
=
[]
ll
=
[]
for
dev
in
self
.
__devs
.
values
():
for
dev
in
self
.
__devs
.
values
():
if
dev
.
dev_type
not
in
ll
:
if
dev
.
dev_type
not
in
ll
:
...
@@ -199,7 +202,7 @@ class Devices:
...
@@ -199,7 +202,7 @@ class Devices:
return
key
in
self
.
__devs
return
key
in
self
.
__devs
def
auto_wash
(
self
):
def
auto_wash
(
self
):
now_
=
now
()
now_
=
now
()
result
=
[]
result
=
[]
for
dev
in
self
.
get
():
for
dev
in
self
.
get
():
if
dev
.
next_alive
<
now_
:
if
dev
.
next_alive
<
now_
:
...
@@ -209,22 +212,23 @@ class Devices:
...
@@ -209,22 +212,23 @@ class Devices:
def
dump
(
self
):
def
dump
(
self
):
for
d
in
self
.
get
():
for
d
in
self
.
get
():
print
(
"
%s %s
"
%
(
d
.
address
,
d
.
dev_type
))
print
(
"
%s %s
"
%
(
d
.
address
,
d
.
dev_type
))
class
Notification
(
Enum
):
class
Notification
(
Enum
):
new_device
=
0
new_device
=
0
drop_device
=
1
# sending drop_device notif is not implemented yet,
drop_device
=
1
# sending drop_device notif is not implemented yet,
attribute_change
=
2
attribute_change
=
2
description_change
=
3
description_change
=
3
metadata_change
=
4
metadata_change
=
4
class
Monitor
:
class
Monitor
:
"""
"""
use this class to monitor a xAAL network
use this class to monitor a xAAL network
"""
"""
def
__init__
(
self
,
device
,
filter_func
=
None
,
db_server
=
None
):
def
__init__
(
self
,
device
,
filter_func
=
None
,
db_server
=
None
):
self
.
dev
=
device
self
.
dev
=
device
self
.
engine
=
device
.
engine
self
.
engine
=
device
.
engine
self
.
db_server
=
db_server
self
.
db_server
=
db_server
...
@@ -240,8 +244,8 @@ class Monitor:
...
@@ -240,8 +244,8 @@ class Monitor:
self
.
engine
.
disable_msg_filter
()
self
.
engine
.
disable_msg_filter
()
# only send isAlive message every 2 expirations
# only send isAlive message every 2 expirations
self
.
send_is_alive
()
self
.
send_is_alive
()
self
.
engine
.
add_timer
(
self
.
refresh_alives
,
REFRESH_TIMER
)
self
.
engine
.
add_timer
(
self
.
refresh_alives
,
REFRESH_TIMER
)
# delete expired device every 10s
# delete expired device every 10s
self
.
engine
.
add_timer
(
self
.
auto_wash
,
AUTOWASH_TIMER
)
self
.
engine
.
add_timer
(
self
.
auto_wash
,
AUTOWASH_TIMER
)
# wait x seconds for the first isAlive answers before the initial crawl
# wait x seconds for the first isAlive answers before the initial crawl
self
.
refresh_timer
=
self
.
engine
.
add_timer
(
self
.
refresh_devices
,
BOOT_TIMER
)
self
.
refresh_timer
=
self
.
engine
.
add_timer
(
self
.
refresh_devices
,
BOOT_TIMER
)
...
@@ -252,7 +256,7 @@ class Monitor:
...
@@ -252,7 +256,7 @@ class Monitor:
return
return
if
msg
.
source
not
in
self
.
devices
:
if
msg
.
source
not
in
self
.
devices
:
dev
=
self
.
add_device
(
msg
)
dev
=
self
.
add_device
(
msg
)
self
.
notify
(
Notification
.
new_device
,
dev
)
self
.
notify
(
Notification
.
new_device
,
dev
)
dev
=
self
.
devices
.
get_with_addr
(
msg
.
source
)
dev
=
self
.
devices
.
get_with_addr
(
msg
.
source
)
if
not
dev
:
if
not
dev
:
...
@@ -266,11 +270,11 @@ class Monitor:
...
@@ -266,11 +270,11 @@ class Monitor:
elif
msg
.
is_attributes_change
()
or
msg
.
is_get_attribute_reply
():
elif
msg
.
is_attributes_change
()
or
msg
.
is_get_attribute_reply
():
if
dev
.
update_attributes
(
msg
.
body
):
if
dev
.
update_attributes
(
msg
.
body
):
self
.
notify
(
Notification
.
attribute_change
,
dev
)
self
.
notify
(
Notification
.
attribute_change
,
dev
)
elif
msg
.
is_get_description_reply
():
elif
msg
.
is_get_description_reply
():
if
dev
.
update_description
(
msg
.
body
):
if
dev
.
update_description
(
msg
.
body
):
self
.
notify
(
Notification
.
description_change
,
dev
)
self
.
notify
(
Notification
.
description_change
,
dev
)
elif
self
.
is_from_metadb
(
msg
):
elif
self
.
is_from_metadb
(
msg
):
addr
=
msg
.
body
.
get
(
'
device
'
)
addr
=
msg
.
body
.
get
(
'
device
'
)
...
@@ -282,7 +286,7 @@ class Monitor:
...
@@ -282,7 +286,7 @@ class Monitor:
if
self
.
is_update_metadb
(
msg
):
if
self
.
is_update_metadb
(
msg
):
changed
=
target
.
update_db
(
msg
.
body
[
'
map
'
])
changed
=
target
.
update_db
(
msg
.
body
[
'
map
'
])
if
changed
:
if
changed
:
self
.
notify
(
Notification
.
metadata_change
,
target
)
self
.
notify
(
Notification
.
metadata_change
,
target
)
def
subscribe
(
self
,
func
):
def
subscribe
(
self
,
func
):
self
.
subscribers
.
append
(
func
)
self
.
subscribers
.
append
(
func
)
...
@@ -292,11 +296,11 @@ class Monitor:
...
@@ -292,11 +296,11 @@ class Monitor:
def
notify
(
self
,
ev_type
,
device
):
def
notify
(
self
,
ev_type
,
device
):
for
s
in
self
.
subscribers
:
for
s
in
self
.
subscribers
:
#logger.warning(f"{s} {ev_type}")
#
logger.warning(f"{s} {ev_type}")
s
(
ev_type
,
device
)
s
(
ev_type
,
device
)
def
add_device
(
self
,
msg
):
def
add_device
(
self
,
msg
):
return
self
.
devices
.
add
(
msg
.
source
,
msg
.
dev_type
)
return
self
.
devices
.
add
(
msg
.
source
,
msg
.
dev_type
)
def
auto_wash
(
self
):
def
auto_wash
(
self
):
"""
call the Auto-wash on devices List
"""
"""
call the Auto-wash on devices List
"""
...
@@ -310,11 +314,11 @@ class Monitor:
...
@@ -310,11 +314,11 @@ class Monitor:
self
.
last_isalive
=
now
()
self
.
last_isalive
=
now
()
def
refresh_alives
(
self
):
def
refresh_alives
(
self
):
"""
every REFRESH we check if need to send a isAlive
"""
"""
every REFRESH we check if need to send a isAlive
"""
tmp
=
self
.
last_isalive
+
config
.
alive_timer
*
2
tmp
=
self
.
last_isalive
+
config
.
alive_timer
*
2
if
tmp
<
now
():
if
tmp
<
now
():
self
.
send_is_alive
()
self
.
send_is_alive
()
def
refresh_devices
(
self
):
def
refresh_devices
(
self
):
now_
=
now
()
now_
=
now
()
cnt
=
0
cnt
=
0
...
@@ -323,40 +327,57 @@ class Monitor:
...
@@ -323,40 +327,57 @@ class Monitor:
if
dev
.
description
.
next_update
<
now_
:
if
dev
.
description
.
next_update
<
now_
:
self
.
request_description
(
dev
.
address
)
self
.
request_description
(
dev
.
address
)
dev
.
description
.
next_update
=
now_
+
REFRESH_RATE
dev
.
description
.
next_update
=
now_
+
REFRESH_RATE
cnt
=
cnt
+
1
cnt
=
cnt
+
1
# metadata
# metadata
if
self
.
db_server
and
dev
.
db
.
next_update
<
now_
:
if
self
.
db_server
and
dev
.
db
.
next_update
<
now_
:
self
.
request_metadb
(
dev
.
address
)
self
.
request_metadb
(
dev
.
address
)
dev
.
db
.
next_update
=
now_
+
REFRESH_RATE
dev
.
db
.
next_update
=
now_
+
REFRESH_RATE
cnt
=
cnt
+
1
cnt
=
cnt
+
1
# attributes
# attributes
if
dev
.
attributes
.
next_update
<
now_
:
if
dev
.
attributes
.
next_update
<
now_
:
self
.
request_attributes
(
dev
.
address
)
self
.
request_attributes
(
dev
.
address
)
dev
.
attributes
.
next_update
=
now_
+
REFRESH_RATE
dev
.
attributes
.
next_update
=
now_
+
REFRESH_RATE
cnt
=
cnt
+
1
cnt
=
cnt
+
1
if
cnt
>
40
:
if
cnt
>
40
:
break
break
# switch to normal timer after boot
# switch to normal timer after boot
if
not
self
.
boot_finished
and
cnt
==
0
and
len
(
self
.
devices
)
!=
0
:
if
not
self
.
boot_finished
and
cnt
==
0
and
len
(
self
.
devices
)
!=
0
:
self
.
refresh_timer
.
period
=
REFRESH_TIMER
self
.
refresh_timer
.
period
=
REFRESH_TIMER
logger
.
debug
(
"
Switching to slow refresh timer
"
)
logger
.
debug
(
"
Switching to slow refresh timer
"
)
self
.
boot_finished
=
True
self
.
boot_finished
=
True
elif
cnt
!=
0
:
elif
cnt
!=
0
:
logger
.
debug
(
"
request queued: %d
"
%
cnt
)
logger
.
debug
(
"
request queued: %d
"
%
cnt
)
def
request_metadb
(
self
,
addr
):
def
request_metadb
(
self
,
addr
):
if
self
.
db_server
:
if
self
.
db_server
:
self
.
engine
.
send_request
(
self
.
dev
,
[
self
.
db_server
,],
'
get_keys_values
'
,
{
'
device
'
:
addr
})
self
.
engine
.
send_request
(
self
.
dev
,
[
self
.
db_server
,
],
'
get_keys_values
'
,
{
'
device
'
:
addr
},
)
def
request_attributes
(
self
,
addr
):
def
request_attributes
(
self
,
addr
):
self
.
engine
.
send_get_attributes
(
self
.
dev
,[
addr
,])
self
.
engine
.
send_get_attributes
(
self
.
dev
,
[
addr
,
],
)
def
request_description
(
self
,
addr
):
def
request_description
(
self
,
addr
):
self
.
engine
.
send_get_description
(
self
.
dev
,[
addr
,])
self
.
engine
.
send_get_description
(
self
.
dev
,
[
addr
,
],
)
def
is_from_metadb
(
self
,
msg
):
def
is_from_metadb
(
self
,
msg
):
if
(
msg
.
is_notify
()
or
msg
.
is_reply
())
and
msg
.
source
==
self
.
db_server
:
if
(
msg
.
is_notify
()
or
msg
.
is_reply
())
and
msg
.
source
==
self
.
db_server
:
return
True
return
True
return
False
return
False
...
@@ -372,4 +393,7 @@ class Monitor:
...
@@ -372,4 +393,7 @@ class Monitor:
def
debug_timers
(
self
):
def
debug_timers
(
self
):
for
dev
in
self
.
devices
:
for
dev
in
self
.
devices
:
print
(
"
%s
\t
%s
\t
%d
\t
%d
\t
%d
"
%
(
dev
.
address
,
dev
.
dev_type
,
dev
.
description
.
last_update
,
dev
.
db
.
last_update
,
dev
.
attributes
.
last_update
))
print
(
"
%s
\t
%s
\t
%d
\t
%d
\t
%d
"
%
(
dev
.
address
,
dev
.
dev_type
,
dev
.
description
.
last_update
,
dev
.
db
.
last_update
,
dev
.
attributes
.
last_update
)
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment