Skip to content
Snippets Groups Projects
Commit 73f685d7 authored by RUANO RINCON Santiago's avatar RUANO RINCON Santiago
Browse files

Add count occurrencies function

parent 184554a3
Branches tmp
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/iotaledger/goshimmer/packages/tangle/payload"
gossipPkg "github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/iotaledger/hive.go/stringify"
)
......@@ -51,12 +52,13 @@ type Payload struct {
}
// NewMasterPoWPayload creates a new test echo request payload
func NewMasterPoWPayload(cpuUsage float64, nodeDst string, powdCast uint8) *Payload {
//func NewMasterPoWPayload(cpuUsage float64, nodeDst string, powdCast uint8) *Payload {
func NewMasterPoWPayload(cpuUsage float64, nodeDst *gossipPkg.Neighbor, powdCast uint8) *Payload {
return &Payload{
subType: uint32(1234),
cpuUsage: cpuUsage,
nodeDst: nodeDst,
nodeDstLen: uint32(len([]byte(nodeDst))),
nodeDst: nodeDst.Peer.ID().String(),
nodeDstLen: uint32(len([]byte(nodeDst.Peer.ID().String()))),
powdCast: powdCast,
}
......
......@@ -31,13 +31,16 @@ const (
// TODO: use it or remove it
func neighborAdd(neighbor *gossipPkg.Neighbor) {
log.Debugf("neighbor to be added: %s", neighbor.Peer.ID().String())
currentDifficulty[neighbor.Peer.ID().String()] = 1
log.Infof("neighbor to be added: %s", neighbor.Peer.PublicKey().String())
//currentDifficulty[neighbor.Peer.PublicKey().String()] = 1
//neighbor or neighbor.Peer?
currentDifficulty[neighbor] = 1
}
func neighborRemove(neighbor *gossipPkg.Neighbor) {
log.Debugf("neighbor to be removed: %s", neighbor.Peer.ID().String())
delete(currentDifficulty, neighbor.Peer.ID().String())
log.Infof("neighbor to be removed: %s", neighbor.Peer.PublicKey().String())
//delete(currentDifficulty, neighbor.Peer.PublicKey().String())
delete(currentDifficulty, neighbor)
}
var (
......@@ -46,27 +49,27 @@ var (
once sync.Once
log *logger.Logger
//TODO: this should be actually something like
//map[pubKey]int
currentDifficulty = make(map[string]int)
//TODO: this should be actually something like
//map[pubKey]int
currentDifficulty = make(map[*gossipPkg.Neighbor]int)
// Map that keeps count of the amount of messages sent by each neighbor
senderOccurrences = make(map[string]int)
// Map that keeps count of the amount of messages sent by each neighbor
senderOccurrences = make(map[string]int)
localhost *peer.Local
)
// TODO: why this is a var? is it needed it to move it to a configure function?
var (
//function that is triggered when a new neighbor is added
//function that is triggered when a new neighbor is added
onNeighborAdded = events.NewClosure(func(neighbor *gossipPkg.Neighbor) {
neighborAdd(neighbor)
neighborAdd(neighbor)
})
// TODO
onNeighborRemoved = events.NewClosure(func(neighbor *gossipPkg.Neighbor) {
neighborRemove(neighbor)
})
// TODO
onNeighborRemoved = events.NewClosure(func(neighbor *gossipPkg.Neighbor) {
neighborRemove(neighbor)
})
)
// Plugin gets the plugin instance.
......@@ -85,94 +88,120 @@ func configure(_ *node.Plugin) {
log.Infof("starting the master PoW plugin")
localhost = local.GetInstance()
localhost = local.GetInstance()
}
func increaseOccurrencyCounts(msg *tangle.Message) {
messagePayload := msg.Payload()
/* TODO: Filter specific types of messages
*/
if messagePayload.Type() != Type {
func increaseOccurrencyCounts(msg *tangle.Message) {
/*
// TODO: Filter specific types of messages
if msg.Payload().Type() != Type {
return
}
*/
log.Debugf("issuer: %s", msg.IssuerPublicKey())
msgIssuerPublicKey := msg.IssuerPublicKey().String()
log.Infof("issuer: %s", msg.issuerPublicKey)
for n, _ := range currentDifficulty {
// check if the sender is in our neighbor map
if n.Peer.PublicKey().String() == msgIssuerPublicKey {
senderOccurrences[msgIssuerPublicKey]++
//log.Infof("%s: %d", msgIssuerPublicKey, senderOccurrences[msgIssuerPublicKey])
}
}
}
func configureEvents() {
messagelayer.Tangle().Solidifier.Events.MessageSolid.Attach(events.NewClosure(func(messageID tangle.MessageID) {
messagelayer.Tangle().Storage.Message(messageID).Consume(func(msg *tangle.Message) {
increaseOccurrencyCounts(msg)
func countOccurrencies() {
for issuer, occurrencies := range senderOccurrences{
log.Infof("%s: %d", issuer, occurrencies)
}
}
// Move the rest to a separate function
messagePayload := msg.Payload()
if messagePayload.Type() != Type {
return
}
msgPayload, _, err := FromBytes(msg.Payload().Bytes())
if err != nil {
log.Info("could not cast payload to masterpow object")
return
}
/*
messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMsgEvent *tangle.CachedMessageEvent) { // When receiving a message solid event do
defer cachedMsgEvent.MessageMetadata.Release()
cachedMsgEvent.Message.Consume(func(message *tangle.Message)
*/
func configureEvents() {
messagelayer.Tangle().Solidifier.Events.MessageSolid.Attach(events.NewClosure(func(messageID tangle.MessageID) {
messagelayer.Tangle().Storage.Message(messageID).Consume(func(msg *tangle.Message) {
increaseOccurrencyCounts(msg)
//log.Infof("issuer: %s", msg.IssuerPublicKey())
// Move the rest to a separate function
messagePayload := msg.Payload()
if messagePayload.Type() != Type {
return
}
log.Infof("Master pow message received for nodeDst: %s", msgPayload.nodeDst)
msgPayload, _, err := FromBytes(msg.Payload().Bytes())
if err != nil {
log.Info("could not cast payload to masterpow object")
return
}
if msgPayload.nodeDst == localhost.ID().String() {
pow.Tune(int(msgPayload.PoWdCast()))
log.Info("PoWCast received")
}
log.Infof("Master pow message received for nodeDst: %s", msgPayload.nodeDst)
})
}))
if msgPayload.nodeDst == localhost.PublicKey().String() {
pow.Tune(int(msgPayload.PoWdCast()))
log.Info("PoWCast received")
}
gossip.Manager().NeighborsEvents(gossipPkg.NeighborsGroupAuto).NeighborAdded.Attach(onNeighborAdded)
gossip.Manager().NeighborsEvents(gossipPkg.NeighborsGroupAuto).NeighborRemoved.Attach(onNeighborRemoved)
}
})
}))
// broadcastMasterPoWPayload broadcasts an echo request via communication layer.
func broadcastMasterPoWPayload() (doneSignal chan struct{}) {
doneSignal = make(chan struct{}, 1)
go func() {
defer close(doneSignal)
gossip.Manager().NeighborsEvents(gossipPkg.NeighborsGroupAuto).NeighborAdded.Attach(onNeighborAdded)
gossip.Manager().NeighborsEvents(gossipPkg.NeighborsGroupAuto).NeighborRemoved.Attach(onNeighborRemoved)
}
for neighbor, count := range currentDifficulty {
// broadcastMasterPoWPayload broadcasts an echo request via communication layer.
func broadcastMasterPoWPayload() (doneSignal chan struct{}) {
doneSignal = make(chan struct{}, 1)
go func() {
defer close(doneSignal)
syncMasterPoWPayload := NewMasterPoWPayload(metrics.CPUUsage(), neighbor, uint8(count))
msg, err := messagelayer.Tangle().IssuePayload(syncMasterPoWPayload)
if err != nil {
log.Warnf("error issuing master PoW message: %w", err)
return
}
log.Debugf("issued master PoW message %s", msg.ID())
currentDifficulty[neighbor] = (currentDifficulty[neighbor] + 5) % 22
}
}()
for neighbor, count := range currentDifficulty {
return
}
syncMasterPoWPayload := NewMasterPoWPayload(metrics.CPUUsage(), neighbor, uint8(count))
msg, err := messagelayer.Tangle().IssuePayload(syncMasterPoWPayload)
if err != nil {
log.Warnf("error issuing master PoW message: %w", err)
return
}
log.Debugf("issued master PoW message %s", msg.ID())
currentDifficulty[neighbor] = (currentDifficulty[neighbor] + 5) % 22
}
}()
func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker("MasterPoW", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(42 * time.Second)
defer ticker.Stop()
for {
select {
case <-shutdownSignal:
return
case <-ticker.C:
doneSignal := broadcastMasterPoWPayload()
return
}
func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker("MasterPoW", func(shutdownSignal <-chan struct{}) {
tickerBroadcast := time.NewTicker(42 * time.Second)
defer tickerBroadcast.Stop()
tickerCount := time.NewTicker(60 * time.Second)
for {
select {
case <-shutdownSignal:
return
case <-doneSignal:
// continue with the next beacon
case <-tickerBroadcast.C:
doneSignal := broadcastMasterPoWPayload()
select {
case <-shutdownSignal:
return
case <-doneSignal:
// continue with the next beacon
}
case <-tickerCount.C:
countOccurrencies()
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment