diff --git a/packages/daemon/daemon.go b/packages/daemon/daemon.go index 46f6822e9961861b0d41c039a4997606ddfd2fcc..ceeea026850e1e2c66cba58ef9f1b01364b7593a 100644 --- a/packages/daemon/daemon.go +++ b/packages/daemon/daemon.go @@ -5,43 +5,13 @@ import ( ) var ( - running bool - runOnce sync.Once - shutdownOnce sync.Once - wg sync.WaitGroup - installWG sync.WaitGroup - ShutdownSignal = make(chan int, 1) - backgroundWorkers = make([]func(), 0) - backgroundWorkerChan = make(chan func(), 10) + running bool + wg sync.WaitGroup + ShutdownSignal = make(chan int, 1) + backgroundWorkers = make([]func(), 0) + lock = sync.Mutex{} ) -var Events = daemonEvents{ - Run: &callbackEvent{ - callbacks: map[uintptr]Callback{}, - }, - Shutdown: &callbackEvent{ - callbacks: map[uintptr]Callback{}, - }, -} - -func init() { - shutdownOnce.Do(func() {}) - - go func() { - for { - backgroundWorker := <- backgroundWorkerChan - - backgroundWorkers = append(backgroundWorkers, backgroundWorker) - - installWG.Done() - - if IsRunning() { - runBackgroundWorker(backgroundWorker) - } - } - }() -} - func runBackgroundWorker(backgroundWorker func()) { wg.Add(1) @@ -52,45 +22,54 @@ func runBackgroundWorker(backgroundWorker func()) { }() } -func runBackgroundWorkers() { - for _, backgroundWorker := range backgroundWorkers { - runBackgroundWorker(backgroundWorker) - } -} - func BackgroundWorker(handler func()) { - installWG.Add(1) + lock.Lock() - backgroundWorkerChan <- handler + if IsRunning() { + runBackgroundWorker(handler) + } else { + backgroundWorkers = append(backgroundWorkers, handler) + } + + lock.Unlock() } func Run() { - runOnce.Do(func() { - installWG.Wait() + if !running { + lock.Lock() - ShutdownSignal = make(chan int, 1) - running = true + if !running { + ShutdownSignal = make(chan int, 1) - Events.Run.Trigger() + running = true - runBackgroundWorkers() + Events.Run.Trigger() - shutdownOnce = sync.Once{} - }) + for _, backgroundWorker := range backgroundWorkers { + runBackgroundWorker(backgroundWorker) + } + } + + lock.Unlock() + } wg.Wait() } func Shutdown() { - shutdownOnce.Do(func() { - close(ShutdownSignal) + if running { + lock.Lock() - running = false + if running { + close(ShutdownSignal) - Events.Shutdown.Trigger() + running = false - runOnce = sync.Once{} - }) + Events.Shutdown.Trigger() + } + + lock.Unlock() + } } func IsRunning() bool { diff --git a/packages/daemon/events.go b/packages/daemon/events.go index b8cef7a985b6aec434f9c1442f3cd5afe488264b..cebf3a8924f433e7b3488bbfc7b5374157c3de83 100644 --- a/packages/daemon/events.go +++ b/packages/daemon/events.go @@ -1,26 +1,13 @@ package daemon -import "reflect" - -type daemonEvents struct { - Run *callbackEvent - Shutdown *callbackEvent -} - -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } +import ( + "github.com/iotaledger/goshimmer/packages/events" +) + +var Events = struct { + Run *events.Event + Shutdown *events.Event +}{ + Run: events.NewEvent(events.CallbackCaller), + Shutdown: events.NewEvent(events.CallbackCaller), } diff --git a/packages/events/closure.go b/packages/events/closure.go new file mode 100644 index 0000000000000000000000000000000000000000..109725b18a8c6257d59387b0093c23888cc498b6 --- /dev/null +++ b/packages/events/closure.go @@ -0,0 +1,18 @@ +package events + +import "reflect" + +type Closure struct { + Id uintptr + Fnc interface{} +} + +func NewClosure(f interface{}) *Closure { + closure := &Closure{ + Fnc: f, + } + + closure.Id = reflect.ValueOf(closure).Pointer() + + return closure +} \ No newline at end of file diff --git a/packages/events/event.go b/packages/events/event.go new file mode 100644 index 0000000000000000000000000000000000000000..7a19722e571c645e68dff0429ab54ffc3422401f --- /dev/null +++ b/packages/events/event.go @@ -0,0 +1,27 @@ +package events + +type Event struct { + triggerFunc func(handler interface{}, params ...interface{}) + callbacks map[uintptr]interface{} +} + +func (this *Event) Attach(closure *Closure) { + this.callbacks[closure.Id] = closure.Fnc +} + +func (this *Event) Detach(closure *Closure) { + delete(this.callbacks, closure.Id) +} + +func (this *Event) Trigger(params ...interface{}) { + for _, handler := range this.callbacks { + this.triggerFunc(handler, params...) + } +} + +func NewEvent(triggerFunc func(handler interface{}, params ...interface{})) *Event { + return &Event{ + triggerFunc: triggerFunc, + callbacks: make(map[uintptr]interface{}), + } +} diff --git a/packages/events/types.go b/packages/events/types.go new file mode 100644 index 0000000000000000000000000000000000000000..cbb2d79800f340233c896d5fbe4973b562f59fdd --- /dev/null +++ b/packages/events/types.go @@ -0,0 +1,3 @@ +package events + +func CallbackCaller(handler interface{}, params ...interface{}) { handler.(func())() } diff --git a/packages/node/events.go b/packages/node/events.go index 09c7327b31eb1d99bcf7de4b85ea2cece00b25e9..39784286cbe04f6fd96f95d642c250298ff88eef 100644 --- a/packages/node/events.go +++ b/packages/node/events.go @@ -1,28 +1,12 @@ package node import ( - "reflect" + "github.com/iotaledger/goshimmer/packages/events" ) type pluginEvents struct { - Configure *callbackEvent - Run *callbackEvent + Configure *events.Event + Run *events.Event } -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger(plugin *Plugin) { - for _, callback := range this.callbacks { - callback(plugin) - } -} +func pluginCaller(handler interface{}, params ...interface{}) { handler.(func(*Plugin))(params[0].(*Plugin)) } \ No newline at end of file diff --git a/packages/node/plugin.go b/packages/node/plugin.go index f494745f3de5061d06c863cc063463fc04125d9e..a723d517b81a0e512d55755fafc09476a9c9138a 100644 --- a/packages/node/plugin.go +++ b/packages/node/plugin.go @@ -1,6 +1,7 @@ package node import ( + "github.com/iotaledger/goshimmer/packages/events" "sync" ) @@ -8,27 +9,27 @@ type Plugin struct { Node *Node Name string Events pluginEvents - wg *sync.WaitGroup + wg *sync.WaitGroup } func NewPlugin(name string, callback Callback, callbacks ...Callback) *Plugin { plugin := &Plugin{ Name: name, Events: pluginEvents{ - Configure: &callbackEvent{make(map[uintptr]Callback)}, - Run: &callbackEvent{make(map[uintptr]Callback)}, + Configure: events.NewEvent(pluginCaller), + Run: events.NewEvent(pluginCaller), }, } if len(callbacks) >= 1 { - plugin.Events.Configure.Attach(callback) + plugin.Events.Configure.Attach(events.NewClosure(callback)) for _, callback = range callbacks[:len(callbacks)-1] { - plugin.Events.Configure.Attach(callback) + plugin.Events.Configure.Attach(events.NewClosure(callback)) } - plugin.Events.Run.Attach(callbacks[len(callbacks)-1]) + plugin.Events.Run.Attach(events.NewClosure(callbacks[len(callbacks)-1])) } else { - plugin.Events.Run.Attach(callback) + plugin.Events.Run.Attach(events.NewClosure(callback)) } return plugin diff --git a/packages/parameter/api.go b/packages/parameter/api.go deleted file mode 100644 index dc9df16dc445d4c7f1f3308a2b012c3fed5c68f1..0000000000000000000000000000000000000000 --- a/packages/parameter/api.go +++ /dev/null @@ -1,19 +0,0 @@ -package parameter - -var ( - // expose int methods - AddInt = addInt - GetInt = getInt - GetInts = getInts - - // expose string methods - AddString = addString - GetString = getString - GetStrings = getStrings - - // expose events - Events = moduleEvents{ - AddInt: &intParameterEvent{make(map[uintptr]IntParameterConsumer)}, - AddString: &stringParameterEvent{make(map[uintptr]StringParameterConsumer)}, - } -) diff --git a/packages/parameter/events.go b/packages/parameter/events.go index 7ca92df1de48b4fd4f39ebfab0a572a86e633a43..8a23da1cbbc3c1bb7665f6fa8a755cb4a211188a 100644 --- a/packages/parameter/events.go +++ b/packages/parameter/events.go @@ -1,52 +1,16 @@ package parameter -import "reflect" +import ( + "github.com/iotaledger/goshimmer/packages/events" +) -type moduleEvents struct { - AddInt *intParameterEvent - AddString *stringParameterEvent +var Events = struct { + AddInt *events.Event + AddString *events.Event +}{ + events.NewEvent(intParameterCaller), + events.NewEvent(stringParameterCaller), } -//region intParameterEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type intParameterEvent struct { - callbacks map[uintptr]IntParameterConsumer -} - -func (this *intParameterEvent) Attach(callback IntParameterConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *intParameterEvent) Detach(callback IntParameterConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *intParameterEvent) Trigger(param *IntParameter) { - for _, callback := range this.callbacks { - callback(param) - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -//region stringParameterEvent ////////////////////////////////////////////////////////////////////////////////////////// - -type stringParameterEvent struct { - callbacks map[uintptr]StringParameterConsumer -} - -func (this *stringParameterEvent) Attach(callback StringParameterConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *stringParameterEvent) Detach(callback StringParameterConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *stringParameterEvent) Trigger(param *StringParameter) { - for _, callback := range this.callbacks { - callback(param) - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +func intParameterCaller(handler interface{}, params ...interface{}) { handler.(func(*IntParameter))(params[0].(*IntParameter)) } +func stringParameterCaller(handler interface{}, params ...interface{}) { handler.(func(*StringParameter))(params[0].(*StringParameter)) } diff --git a/packages/parameter/parameter.go b/packages/parameter/parameter.go index 07553c5cffabd8e2f823e0caa48297fab1caf63a..2de309abcc39bd8f41a8c3ab250062c48cb762bf 100644 --- a/packages/parameter/parameter.go +++ b/packages/parameter/parameter.go @@ -2,7 +2,7 @@ package parameter var intParameters = make(map[string]*IntParameter) -func addInt(name string, defaultValue int, description string) *IntParameter { +func AddInt(name string, defaultValue int, description string) *IntParameter { if intParameters[name] != nil { panic("duplicate parameter - \"" + name + "\" was defined already") } @@ -21,17 +21,17 @@ func addInt(name string, defaultValue int, description string) *IntParameter { return newParameter } -func getInt(name string) *IntParameter { +func GetInt(name string) *IntParameter { return intParameters[name] } -func getInts() map[string]*IntParameter { +func GetInts() map[string]*IntParameter { return intParameters } var stringParameters = make(map[string]*StringParameter) -func addString(name string, defaultValue string, description string) *StringParameter { +func AddString(name string, defaultValue string, description string) *StringParameter { if intParameters[name] != nil { panic("duplicate parameter - \"" + name + "\" was defined already") } @@ -50,10 +50,10 @@ func addString(name string, defaultValue string, description string) *StringPara return stringParameters[name] } -func getString(name string) *StringParameter { +func GetString(name string) *StringParameter { return stringParameters[name] } -func getStrings() map[string]*StringParameter { +func GetStrings() map[string]*StringParameter { return stringParameters } diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 4463dce891cf10bcde4da7d991470a41a58ef446..243b45b96674e049cd273f1ca701812b7de05f86 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -7,12 +7,12 @@ import ( "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" "github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes" "github.com/iotaledger/goshimmer/plugins/analysis/types/ping" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" "net" "time" ) @@ -44,6 +44,9 @@ func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers { ConnectNodes: func(sourceId []byte, targetId []byte) { conn.Write((&connectnodes.Packet{ SourceId: sourceId, TargetId: targetId }).Marshal()) }, + DisconnectNodes: func(sourceId []byte, targetId []byte) { + conn.Write((&disconnectnodes.Packet{ SourceId: sourceId, TargetId: targetId }).Marshal()) + }, } } @@ -60,27 +63,39 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch eventDispatchers.AddNode(p.Identity.Identifier) } - onIncomingRequestAccepted := func(req *request.Request) { - eventDispatchers.ConnectNodes(req.Issuer.Identity.Identifier, accountability.OWN_ID.Identifier) + onAddAcceptedNeighbor := func(p *peer.Peer) { + eventDispatchers.ConnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) + } + + onRemoveAcceptedNeighbor := func(p *peer.Peer) { + eventDispatchers.DisconnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) + } + + onAddChosenNeighbor := func(p *peer.Peer) { + eventDispatchers.ConnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) } - onOutgoingRequestAccepted := func(res *response.Response) { - eventDispatchers.ConnectNodes(accountability.OWN_ID.Identifier, res.Issuer.Identity.Identifier) + onRemoveChosenNeighbor := func(p *peer.Peer) { + eventDispatchers.DisconnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) } // setup hooks ///////////////////////////////////////////////////////////////////////////////////////////////////// - protocol.Events.DiscoverPeer.Attach(onDiscoverPeer) - protocol.Events.IncomingRequestAccepted.Attach(onIncomingRequestAccepted) - protocol.Events.OutgoingRequestAccepted.Attach(onOutgoingRequestAccepted) + knownpeers.INSTANCE.Events.Add.Attach(onDiscoverPeer) + acceptedneighbors.INSTANCE.Events.Add.Attach(onAddAcceptedNeighbor) + acceptedneighbors.INSTANCE.Events.Remove.Attach(onRemoveAcceptedNeighbor) + chosenneighbors.INSTANCE.Events.Add.Attach(onAddChosenNeighbor) + chosenneighbors.INSTANCE.Events.Remove.Attach(onRemoveChosenNeighbor) // clean up hooks on close ///////////////////////////////////////////////////////////////////////////////////////// var onClose func() onClose = func() { - protocol.Events.DiscoverPeer.Detach(onDiscoverPeer) - protocol.Events.IncomingRequestAccepted.Detach(onIncomingRequestAccepted) - protocol.Events.OutgoingRequestAccepted.Detach(onOutgoingRequestAccepted) + knownpeers.INSTANCE.Events.Add.Detach(onDiscoverPeer) + acceptedneighbors.INSTANCE.Events.Add.Detach(onAddAcceptedNeighbor) + acceptedneighbors.INSTANCE.Events.Remove.Detach(onRemoveAcceptedNeighbor) + chosenneighbors.INSTANCE.Events.Add.Detach(onAddChosenNeighbor) + chosenneighbors.INSTANCE.Events.Remove.Detach(onRemoveChosenNeighbor) conn.Events.Close.Detach(onClose) } diff --git a/plugins/analysis/client/types.go b/plugins/analysis/client/types.go index c6e19fd4c91c060b2a6ce6071fcbb80fbc6cf127..8d24a99d4e991325c4207e8d3cb0f8d2711e6734 100644 --- a/plugins/analysis/client/types.go +++ b/plugins/analysis/client/types.go @@ -1,6 +1,7 @@ package client type EventDispatchers struct { - AddNode func(nodeId []byte) - ConnectNodes func(sourceId []byte, targetId []byte) + AddNode func(nodeId []byte) + ConnectNodes func(sourceId []byte, targetId []byte) + DisconnectNodes func(sourceId []byte, targetId []byte) } diff --git a/plugins/analysis/plugin.go b/plugins/analysis/plugin.go index 15c5d0b368db2dad52ae1cf43a723ed90733b595..e8446869097e194187ec04f35f4923356aa3641c 100644 --- a/plugins/analysis/plugin.go +++ b/plugins/analysis/plugin.go @@ -2,6 +2,7 @@ package analysis import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis/client" "github.com/iotaledger/goshimmer/plugins/analysis/server" @@ -15,13 +16,9 @@ func configure(plugin *node.Plugin) { webinterface.Configure(plugin) server.Configure(plugin) - server.Events.AddNode.Attach(func(nodeId string) { - - }) - - daemon.Events.Shutdown.Attach(func() { + daemon.Events.Shutdown.Attach(events.NewClosure(func() { server.Shutdown(plugin) - }) + })) } } diff --git a/plugins/analysis/server/events.go b/plugins/analysis/server/events.go index c133e4a4967535ca4759a074e27f232157c4cbf5..4fd4714f6aa034f6ab0f280f442aca316ffea3ed 100644 --- a/plugins/analysis/server/events.go +++ b/plugins/analysis/server/events.go @@ -1,85 +1,27 @@ package server import ( - "reflect" + "github.com/iotaledger/goshimmer/packages/events" ) -var Events = &pluginEvents{ - AddNode: &nodeIdEvent{make(map[uintptr]StringConsumer)}, - RemoveNode: &nodeIdEvent{make(map[uintptr]StringConsumer)}, - ConnectNodes: &nodeIdsEvent{make(map[uintptr]StringStringConsumer)}, - DisconnectNodes: &nodeIdsEvent{make(map[uintptr]StringStringConsumer)}, - NodeOnline: &nodeIdEvent{make(map[uintptr]StringConsumer)}, - NodeOffline: &nodeIdEvent{make(map[uintptr]StringConsumer)}, - Error: &errorEvent{make(map[uintptr]ErrorConsumer)}, -} - -type pluginEvents struct { - AddNode *nodeIdEvent - RemoveNode *nodeIdEvent - ConnectNodes *nodeIdsEvent - DisconnectNodes *nodeIdsEvent - NodeOnline *nodeIdEvent - NodeOffline *nodeIdEvent - Error *errorEvent -} - -type nodeIdEvent struct { - callbacks map[uintptr]StringConsumer -} - -func (this *nodeIdEvent) Attach(callback StringConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *nodeIdEvent) Detach(callback StringConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *nodeIdEvent) Trigger(nodeId string) { - for _, callback := range this.callbacks { - callback(nodeId) - } -} - -type nodeIdsEvent struct { - callbacks map[uintptr]StringStringConsumer -} - -func (this *nodeIdsEvent) Attach(callback StringStringConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *nodeIdsEvent) Detach(callback StringStringConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *nodeIdsEvent) Trigger(sourceId string, targetId string) { - for _, callback := range this.callbacks { - callback(sourceId, targetId) - } -} - -type errorEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } -} - -type ErrorConsumer = func(err error) - -type StringConsumer = func(str string) - -type StringStringConsumer = func(sourceId string, targetId string) +var Events = struct { + AddNode *events.Event + RemoveNode *events.Event + ConnectNodes *events.Event + DisconnectNodes *events.Event + NodeOnline *events.Event + NodeOffline *events.Event + Error *events.Event +}{ + events.NewEvent(stringCaller), + events.NewEvent(stringCaller), + events.NewEvent(stringStringCaller), + events.NewEvent(stringStringCaller), + events.NewEvent(stringCaller), + events.NewEvent(stringCaller), + events.NewEvent(errorCaller), +} + +func stringCaller(handler interface{}, params ...interface{}) { handler.(func(string))(params[0].(string)) } +func stringStringCaller(handler interface{}, params ...interface{}) { handler.(func(string, string))(params[0].(string), params[1].(string)) } +func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 58eb2555039b208b9d6a63cdc3a8bc8fc5c49adf..861ffe4fe7c3be369703a67480d36bd595da5102 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -63,7 +63,6 @@ func HandleConnection(conn *network.ManagedConnection) { } onDisconnect = func() { Events.NodeOffline.Trigger(connectedNodeId) - Events.RemoveNode.Trigger(connectedNodeId) conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.Close.Detach(onDisconnect) @@ -120,6 +119,9 @@ func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n case STATE_CONNECT_NODES: *receiveBuffer = make([]byte, connectnodes.MARSHALLED_TOTAL_SIZE) + case STATE_DISCONNECT_NODES: + *receiveBuffer = make([]byte, disconnectnodes.MARSHALLED_TOTAL_SIZE) + case STATE_REMOVE_NODE: *receiveBuffer = make([]byte, removenode.MARSHALLED_TOTAL_SIZE) } @@ -146,6 +148,9 @@ func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n case STATE_CONNECT_NODES: processIncomingConnectNodesPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + case STATE_DISCONNECT_NODES: + processIncomingDisconnectNodesPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + case STATE_REMOVE_NODE: processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) } @@ -171,6 +176,11 @@ func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { connectionState = STATE_CONNECT_NODES + case disconnectnodes.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, disconnectnodes.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_DISCONNECT_NODES + case removenode.MARSHALLED_PACKET_HEADER: receiveBuffer = make([]byte, removenode.MARSHALLED_TOTAL_SIZE) @@ -269,3 +279,32 @@ func processIncomingConnectNodesPacket(connectionState *byte, receiveBuffer *[]b } } } + +func processIncomingDisconnectNodesPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) { + remainingCapacity := int(math.Min(float64(disconnectnodes.MARSHALLED_TOTAL_SIZE- *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < disconnectnodes.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if disconnectNodesPacket, err := disconnectnodes.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(err) + + conn.Close() + + return + } else { + sourceNodeId := hex.EncodeToString(disconnectNodesPacket.SourceId) + targetNodeId := hex.EncodeToString(disconnectNodesPacket.TargetId) + + Events.DisconnectNodes.Trigger(sourceNodeId, targetNodeId) + } + + *connectionState = STATE_CONSECUTIVE + + if *offset + len(data) > disconnectnodes.MARSHALLED_TOTAL_SIZE { + processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId) + } + } +} diff --git a/plugins/analysis/webinterface/httpserver/data_stream.go b/plugins/analysis/webinterface/httpserver/data_stream.go index f628eca4a54b249d363fc5eb31a8f38226fcc97f..296bbc3e90c18d7a15c71e950b091c312406c890 100644 --- a/plugins/analysis/webinterface/httpserver/data_stream.go +++ b/plugins/analysis/webinterface/httpserver/data_stream.go @@ -2,60 +2,55 @@ package httpserver import ( "fmt" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/types" "golang.org/x/net/websocket" ) -var nodes = make(map[string]bool) -var links = make(map[string]map[string]bool) - func dataStream(ws *websocket.Conn) { - eventHandlers := &types.EventHandlers{ - AddNode: func(nodeId string) { - if _, exists := nodes[nodeId]; !exists { - nodes[nodeId] = true + func() { + eventHandlers := &types.EventHandlers{ + AddNode: func(nodeId string) { fmt.Fprint(ws, "A"+nodeId) }, + RemoveNode: func(nodeId string) { fmt.Fprint(ws, "a"+nodeId) }, + ConnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "C"+sourceId+targetId) }, + DisconnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "c"+sourceId+targetId) }, + NodeOnline: func(nodeId string) { fmt.Fprint(ws, "O"+nodeId) }, + NodeOffline: func(nodeId string) { fmt.Fprint(ws, "o"+nodeId) }, + } - fmt.Fprint(ws, "A"+nodeId) - } - }, - RemoveNode: func(nodeId string) { - if _, exists := nodes[nodeId]; exists { - delete(nodes, nodeId) + addNodeClosure := events.NewClosure(eventHandlers.AddNode) + removeNodeClosure := events.NewClosure(eventHandlers.RemoveNode) + connectNodesClosure := events.NewClosure(eventHandlers.ConnectNodes) + disconnectNodesClosure := events.NewClosure(eventHandlers.DisconnectNodes) + nodeOnlineClosure := events.NewClosure(eventHandlers.NodeOnline) + nodeOfflineClosure := events.NewClosure(eventHandlers.NodeOffline) - fmt.Fprint(ws, "a"+nodeId) - } - }, - ConnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "C"+sourceId+targetId) }, - DisconnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "c"+sourceId+targetId) }, - NodeOnline: func(nodeId string) { fmt.Fprint(ws, "O"+nodeId) }, - NodeOffline: func(nodeId string) { fmt.Fprint(ws, "o"+nodeId) }, - } - - server.Events.AddNode.Attach(eventHandlers.AddNode) - server.Events.RemoveNode.Attach(eventHandlers.RemoveNode) - server.Events.ConnectNodes.Attach(eventHandlers.ConnectNodes) - server.Events.DisconnectNodes.Attach(eventHandlers.DisconnectNodes) - server.Events.NodeOnline.Attach(eventHandlers.NodeOnline) - server.Events.NodeOffline.Attach(eventHandlers.NodeOffline) - - go recordedevents.Replay(eventHandlers, 0) - - buf := make([]byte, 1) + server.Events.AddNode.Attach(addNodeClosure) + server.Events.RemoveNode.Attach(removeNodeClosure) + server.Events.ConnectNodes.Attach(connectNodesClosure) + server.Events.DisconnectNodes.Attach(disconnectNodesClosure) + server.Events.NodeOnline.Attach(nodeOnlineClosure) + server.Events.NodeOffline.Attach(nodeOfflineClosure) + + go recordedevents.Replay(eventHandlers) + + buf := make([]byte, 1) readFromWebsocket: - for { - if _, err := ws.Read(buf); err != nil { - break readFromWebsocket - } + for { + if _, err := ws.Read(buf); err != nil { + break readFromWebsocket + } - fmt.Fprint(ws, "_") - } + fmt.Fprint(ws, "_") + } - server.Events.AddNode.Detach(eventHandlers.AddNode) - server.Events.RemoveNode.Detach(eventHandlers.RemoveNode) - server.Events.ConnectNodes.Detach(eventHandlers.ConnectNodes) - server.Events.DisconnectNodes.Detach(eventHandlers.DisconnectNodes) - server.Events.NodeOnline.Detach(eventHandlers.NodeOnline) - server.Events.NodeOffline.Detach(eventHandlers.NodeOffline) + server.Events.AddNode.Detach(addNodeClosure) + server.Events.RemoveNode.Detach(removeNodeClosure) + server.Events.ConnectNodes.Detach(connectNodesClosure) + server.Events.DisconnectNodes.Detach(disconnectNodesClosure) + server.Events.NodeOnline.Detach(nodeOnlineClosure) + server.Events.NodeOffline.Detach(nodeOfflineClosure) + }() } diff --git a/plugins/analysis/webinterface/httpserver/index.go b/plugins/analysis/webinterface/httpserver/index.go index 023ca380b2bc09fd44fdc0cab271d8bada9bd206..288ead40aeb2c5c75972cfd5ad7e3234dfecb255 100644 --- a/plugins/analysis/webinterface/httpserver/index.go +++ b/plugins/analysis/webinterface/httpserver/index.go @@ -20,8 +20,6 @@ func index(w http.ResponseWriter, r *http.Request) { var socket = new WebSocket(((window.location.protocol === "https:") ? "wss://" : "ws://") + window.location.host + "/datastream"); socket.onopen = function () { - console.log("Status: Connected\n"); - setInterval(function() { socket.send("_"); }, 1000); @@ -34,7 +32,7 @@ func index(w http.ResponseWriter, r *http.Request) { break; case "A": - //addNode(e.data.substr(1)); + addNode(e.data.substr(1)); break; case "a": @@ -42,8 +40,6 @@ func index(w http.ResponseWriter, r *http.Request) { break; case "C": - addNode(e.data.substr(1, 40)); - addNode(e.data.substr(41, 40)); connectNodes(e.data.substr(1, 40), e.data.substr(41, 40)); break; @@ -74,9 +70,6 @@ func index(w http.ResponseWriter, r *http.Request) { .enableNodeDrag(false) .onNodeHover(node => elem.style.cursor = node ? 'pointer' : null) .onNodeClick(removeNodeX) - //.linkDirectionalParticles(3) - //.linkDirectionalParticleWidth(0.8) - //.linkDirectionalParticleSpeed(0.01) .nodeColor(node => node.online ? 'rgba(0,255,0,1)' : 'rgba(255,255,255,1)') .graphData(data); diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go index 33c950ff482810c4d5dd20a56cefa033dd23b750..44d844cae1886f3ba5cfd27390578854f09e4f28 100644 --- a/plugins/analysis/webinterface/httpserver/plugin.go +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -2,6 +2,7 @@ package httpserver import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "golang.org/x/net/context" "golang.org/x/net/websocket" @@ -21,12 +22,12 @@ func Configure(plugin *node.Plugin) { router.Handle("/datastream", websocket.Handler(dataStream)) router.HandleFunc("/", index) - daemon.Events.Shutdown.Attach(func() { + daemon.Events.Shutdown.Attach(events.NewClosure(func() { ctx, cancel := context.WithTimeout(context.Background(), 0 * time.Second) defer cancel() httpServer.Shutdown(ctx) - }) + })) } func Run(plugin *node.Plugin) { diff --git a/plugins/analysis/webinterface/recordedevents/recorded_events.go b/plugins/analysis/webinterface/recordedevents/recorded_events.go index b465f4eab2bdbf7acd43b43724a92eae69f48c3a..f9c23b200379c3cb0e6ffa25b1c9896f79ae43e0 100644 --- a/plugins/analysis/webinterface/recordedevents/recorded_events.go +++ b/plugins/analysis/webinterface/recordedevents/recorded_events.go @@ -1,10 +1,11 @@ package recordedevents import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/types" - "time" + "sync" ) var recordedEvents = make([]types.EventHandlersConsumer, 0) @@ -12,44 +13,45 @@ var recordedEvents = make([]types.EventHandlersConsumer, 0) var nodes = make(map[string]bool) var links = make(map[string]map[string]bool) +var lock sync.Mutex + func Configure(plugin *node.Plugin) { - server.Events.AddNode.Attach(func(nodeId string) { - nodes[nodeId] = false - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.AddNode(nodeId) - }) - */ - }) - - server.Events.RemoveNode.Attach(func(nodeId string) { + server.Events.AddNode.Attach(events.NewClosure(func(nodeId string) { + if _, exists := nodes[nodeId]; !exists { + lock.Lock() + defer lock.Unlock() + + if _, exists := nodes[nodeId]; !exists { + nodes[nodeId] = false + } + } + })) + + server.Events.RemoveNode.Attach(events.NewClosure(func(nodeId string) { + lock.Lock() + defer lock.Unlock() + delete(nodes, nodeId) - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.AddNode(nodeId) - }) - */ - }) - - server.Events.NodeOnline.Attach(func(nodeId string) { + })) + + server.Events.NodeOnline.Attach(events.NewClosure(func(nodeId string) { + lock.Lock() + defer lock.Unlock() + nodes[nodeId] = true - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.NodeOnline(nodeId) - }) - */ - }) + })) + + server.Events.NodeOffline.Attach(events.NewClosure(func(nodeId string) { + lock.Lock() + defer lock.Unlock() - server.Events.NodeOffline.Attach(func(nodeId string) { nodes[nodeId] = false - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.NodeOffline(nodeId) - }) - */ - }) + })) + + server.Events.ConnectNodes.Attach(events.NewClosure(func(sourceId string, targetId string) { + lock.Lock() + defer lock.Unlock() - server.Events.ConnectNodes.Attach(func(sourceId string, targetId string) { connectionMap, connectionMapExists := links[sourceId] if !connectionMapExists { connectionMap = make(map[string]bool) @@ -57,27 +59,20 @@ func Configure(plugin *node.Plugin) { links[sourceId] = connectionMap } connectionMap[targetId] = true - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.ConnectNodes(sourceId, targetId) - }) - */ - }) + })) + + server.Events.DisconnectNodes.Attach(events.NewClosure(func(sourceId string, targetId string) { + lock.Lock() + defer lock.Unlock() - server.Events.DisconnectNodes.Attach(func(sourceId string, targetId string) { connectionMap, connectionMapExists := links[sourceId] if connectionMapExists { delete(connectionMap, targetId) } - /* - recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { - handlers.ConnectNodes(sourceId, targetId) - }) - */ - }) + })) } -func Replay(handlers *types.EventHandlers, delay time.Duration) { +func Replay(handlers *types.EventHandlers) { for nodeId, online := range nodes { handlers.AddNode(nodeId) if online { @@ -92,13 +87,4 @@ func Replay(handlers *types.EventHandlers, delay time.Duration) { handlers.ConnectNodes(sourceId, targetId) } } - /* - for _, recordedEvent := range recordedEvents { - recordedEvent(handlers) - - if delay != time.Duration(0) { - time.Sleep(delay) - } - } - */ } diff --git a/plugins/autopeering/instances/acceptedneighbors/distance.go b/plugins/autopeering/instances/acceptedneighbors/distance.go new file mode 100644 index 0000000000000000000000000000000000000000..0dd0aac9926446118fdba415e764b57f5e708cdf --- /dev/null +++ b/plugins/autopeering/instances/acceptedneighbors/distance.go @@ -0,0 +1,32 @@ +package acceptedneighbors + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "hash/fnv" +) + +var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 { + return func(p *peer.Peer) uint64 { + saltedIdentifier := make([]byte, len(anchor.Identity.Identifier) + len(saltmanager.PRIVATE_SALT.Bytes)) + copy(saltedIdentifier[0:], anchor.Identity.Identifier) + copy(saltedIdentifier[len(anchor.Identity.Identifier):], saltmanager.PRIVATE_SALT.Bytes) + + return hash(saltedIdentifier) ^ hash(p.Identity.Identifier) + } +} + +var OWN_DISTANCE func(p *peer.Peer) uint64 + +func configureOwnDistance() { + OWN_DISTANCE = DISTANCE(ownpeer.INSTANCE) +} + +func hash(data []byte) uint64 { + h := fnv.New64a() + h.Write(data) + + return h.Sum64() +} + diff --git a/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go new file mode 100644 index 0000000000000000000000000000000000000000..8dbcf68f9a2f16e383f9def8d49867023cd3b805 --- /dev/null +++ b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go @@ -0,0 +1,43 @@ +package acceptedneighbors + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "sync" +) + +var FURTHEST_NEIGHBOR *peer.Peer + +var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) + +var FurthestNeighborLock sync.RWMutex + +func configureFurthestNeighbor() { + INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + FurthestNeighborLock.Lock() + defer FurthestNeighborLock.Unlock() + + updateFurthestNeighbor(p) + }) + + INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + FurthestNeighborLock.Lock() + defer FurthestNeighborLock.Unlock() + + if p.Identity.StringIdentifier == FURTHEST_NEIGHBOR.Identity.StringIdentifier { + FURTHEST_NEIGHBOR_DISTANCE = uint64(0) + FURTHEST_NEIGHBOR = nil + + for _, furthestNeighborCandidate := range INSTANCE.Peers { + updateFurthestNeighbor(furthestNeighborCandidate) + } + } + }) +} + +func updateFurthestNeighbor(p *peer.Peer) { + distance := OWN_DISTANCE(p) + if distance > FURTHEST_NEIGHBOR_DISTANCE { + FURTHEST_NEIGHBOR = p + FURTHEST_NEIGHBOR_DISTANCE = distance + } +} diff --git a/plugins/autopeering/instances/acceptedneighbors/plugin.go b/plugins/autopeering/instances/acceptedneighbors/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..13cd3a858cb20feeed811ea6502f5177a43802ba --- /dev/null +++ b/plugins/autopeering/instances/acceptedneighbors/plugin.go @@ -0,0 +1,8 @@ +package acceptedneighbors + +import "github.com/iotaledger/goshimmer/packages/node" + +func Configure(plugin *node.Plugin) { + configureOwnDistance() + configureFurthestNeighbor() +} diff --git a/plugins/autopeering/instances/chosenneighborcandidates/instance.go b/plugins/autopeering/instances/chosenneighborcandidates/instance.go deleted file mode 100644 index d57b684c72f7ff54434eecac6a54b567c6beb358..0000000000000000000000000000000000000000 --- a/plugins/autopeering/instances/chosenneighborcandidates/instance.go +++ /dev/null @@ -1,36 +0,0 @@ -package chosenneighborcandidates - -import ( - "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" - "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" - "hash/fnv" -) - -var INSTANCE peerlist.PeerList - -var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 { - return func(p *peer.Peer) uint64 { - return hash(anchor.Identity.Identifier) ^ hash(p.Identity.Identifier) - } -} - -func Configure(plugin *node.Plugin) { - updateNeighborCandidates() - - neighborhood.Events.Update.Attach(updateNeighborCandidates) -} - -func updateNeighborCandidates() { - INSTANCE = neighborhood.LIST_INSTANCE.Sort(DISTANCE(outgoingrequest.INSTANCE.Issuer)) -} - -func hash(data []byte) uint64 { - h := fnv.New64a() - h.Write(data) - - return h.Sum64() -} - diff --git a/plugins/autopeering/instances/chosenneighbors/candidates.go b/plugins/autopeering/instances/chosenneighbors/candidates.go new file mode 100644 index 0000000000000000000000000000000000000000..e26edf16c4a0ce14f6e695a21116255a24aedf41 --- /dev/null +++ b/plugins/autopeering/instances/chosenneighbors/candidates.go @@ -0,0 +1,19 @@ +package chosenneighbors + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" +) + +var CANDIDATES peerlist.PeerList + +func configureCandidates() { + updateNeighborCandidates() + + neighborhood.Events.Update.Attach(updateNeighborCandidates) +} + +func updateNeighborCandidates() { + CANDIDATES = neighborhood.LIST_INSTANCE.Sort(DISTANCE(ownpeer.INSTANCE)) +} diff --git a/plugins/autopeering/instances/chosenneighbors/distance.go b/plugins/autopeering/instances/chosenneighbors/distance.go new file mode 100644 index 0000000000000000000000000000000000000000..cd06265a5dcbe181eb598486ba4b4ffaf6d3d870 --- /dev/null +++ b/plugins/autopeering/instances/chosenneighbors/distance.go @@ -0,0 +1,30 @@ +package chosenneighbors + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "hash/fnv" +) + +var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 { + return func(p *peer.Peer) uint64 { + saltedIdentifier := make([]byte, len(anchor.Identity.Identifier) + len(anchor.Salt.Bytes)) + copy(saltedIdentifier[0:], anchor.Identity.Identifier) + copy(saltedIdentifier[len(anchor.Identity.Identifier):], anchor.Salt.Bytes) + + return hash(anchor.Identity.Identifier) ^ hash(p.Identity.Identifier) + } +} + +var OWN_DISTANCE func(p *peer.Peer) uint64 + +func configureOwnDistance() { + OWN_DISTANCE = DISTANCE(ownpeer.INSTANCE) +} + +func hash(data []byte) uint64 { + h := fnv.New64a() + h.Write(data) + + return h.Sum64() +} diff --git a/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go new file mode 100644 index 0000000000000000000000000000000000000000..107f9eaa772563139e2723c5a456fd6cc9b34ac0 --- /dev/null +++ b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go @@ -0,0 +1,43 @@ +package chosenneighbors + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "sync" +) + +var FURTHEST_NEIGHBOR *peer.Peer + +var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) + +var FurthestNeighborLock sync.RWMutex + +func configureFurthestNeighbor() { + INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + FurthestNeighborLock.Lock() + defer FurthestNeighborLock.Unlock() + + distance := OWN_DISTANCE(p) + if distance > FURTHEST_NEIGHBOR_DISTANCE { + FURTHEST_NEIGHBOR = p + FURTHEST_NEIGHBOR_DISTANCE = distance + } + }) + + INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + FurthestNeighborLock.Lock() + defer FurthestNeighborLock.Unlock() + + if p == FURTHEST_NEIGHBOR { + FURTHEST_NEIGHBOR_DISTANCE = uint64(0) + FURTHEST_NEIGHBOR = nil + + for _, furthestNeighborCandidate := range INSTANCE.Peers { + distance := OWN_DISTANCE(furthestNeighborCandidate) + if distance > FURTHEST_NEIGHBOR_DISTANCE { + FURTHEST_NEIGHBOR = furthestNeighborCandidate + FURTHEST_NEIGHBOR_DISTANCE = distance + } + } + } + }) +} \ No newline at end of file diff --git a/plugins/autopeering/instances/chosenneighbors/plugin.go b/plugins/autopeering/instances/chosenneighbors/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..d28bc57dd8a13352b15f4137396791661697d110 --- /dev/null +++ b/plugins/autopeering/instances/chosenneighbors/plugin.go @@ -0,0 +1,11 @@ +package chosenneighbors + +import ( + "github.com/iotaledger/goshimmer/packages/node" +) + +func Configure(plugin *node.Plugin) { + configureCandidates() + configureOwnDistance() + configureFurthestNeighbor() +} diff --git a/plugins/autopeering/instances/neighborhood/instance.go b/plugins/autopeering/instances/neighborhood/instance.go index 8bc69db00165e0a50d6f809495985252ade34640..0df5bb00a1d27c0b318ea9677fad05afa1fd099b 100644 --- a/plugins/autopeering/instances/neighborhood/instance.go +++ b/plugins/autopeering/instances/neighborhood/instance.go @@ -1,13 +1,14 @@ package neighborhood import ( + "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/timeutil" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerregister" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "time" ) @@ -30,8 +31,12 @@ var lastUpdate = time.Now() func Configure(plugin *node.Plugin) { updateNeighborHood() +} - go timeutil.Ticker(updateNeighborHood, 1 * time.Second) +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(func() { + timeutil.Ticker(updateNeighborHood, 1 * time.Second) + }) } func updateNeighborHood() { diff --git a/plugins/autopeering/instances/plugin.go b/plugins/autopeering/instances/plugin.go index 7b3d1d1b46f2fbedc948b55ac6de09cdaecf2b96..ee110e249ec615bbab0b86e52c485640b5646ac1 100644 --- a/plugins/autopeering/instances/plugin.go +++ b/plugins/autopeering/instances/plugin.go @@ -2,7 +2,8 @@ package instances import ( "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighborcandidates" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/entrynodes" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" @@ -16,5 +17,10 @@ func Configure(plugin *node.Plugin) { knownpeers.Configure(plugin) neighborhood.Configure(plugin) outgoingrequest.Configure(plugin) - chosenneighborcandidates.Configure(plugin) + chosenneighbors.Configure(plugin) + acceptedneighbors.Configure(plugin) +} + +func Run(plugin *node.Plugin) { + neighborhood.Run(plugin) } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index b7cb7fca020a09bfcb78282dd29ef0d9eeadb9ae..8254f34fdb6094986bb3bc5d6e7b5bb23273a3a2 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -2,49 +2,58 @@ package autopeering import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" "github.com/iotaledger/goshimmer/plugins/autopeering/server" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" ) +var PLUGIN = node.NewPlugin("Auto Peering", configure, run) + func configure(plugin *node.Plugin) { + saltmanager.Configure(plugin) instances.Configure(plugin) server.Configure(plugin) protocol.Configure(plugin) - daemon.Events.Shutdown.Attach(func() { + daemon.Events.Shutdown.Attach(events.NewClosure(func() { server.Shutdown(plugin) - }) + })) - protocol.Events.IncomingRequestAccepted.Attach(func(req *request.Request) { - if acceptedneighbors.INSTANCE.AddOrUpdate(req.Issuer) { - plugin.LogSuccess("new neighbor accepted: " + req.Issuer.Address.String() + " / " + req.Issuer.Identity.StringIdentifier) - } - }) - - protocol.Events.OutgoingRequestAccepted.Attach(func(res *response.Response) { - if chosenneighbors.INSTANCE.AddOrUpdate(res.Issuer) { - plugin.LogSuccess("new neighbor chosen: " + res.Issuer.Address.String() + " / " + res.Issuer.Identity.StringIdentifier) - } - }) - - protocol.Events.DiscoverPeer.Attach(func(p *peer.Peer) { - if knownpeers.INSTANCE.AddOrUpdate(p) { - plugin.LogInfo("new peer detected: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - } - }) + configureLogging(plugin) } func run(plugin *node.Plugin) { + instances.Run(plugin) server.Run(plugin) protocol.Run(plugin) } -var PLUGIN = node.NewPlugin("Auto Peering", configure, run) +func configureLogging(plugin *node.Plugin) { + acceptedneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) + acceptedneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) + + chosenneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) + chosenneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) + + knownpeers.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + plugin.LogInfo("peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) + knownpeers.INSTANCE.Events.Update.Attach(func(p *peer.Peer) { + plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + }) +} diff --git a/plugins/autopeering/protocol/accepted_neighbor_dropper.go b/plugins/autopeering/protocol/accepted_neighbor_dropper.go new file mode 100644 index 0000000000000000000000000000000000000000..360ee603c2fdcbde812d8c9df5a4f453ade36b45 --- /dev/null +++ b/plugins/autopeering/protocol/accepted_neighbor_dropper.go @@ -0,0 +1,39 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/timeutil" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" + "time" +) + +func createAcceptedNeighborDropper(plugin *node.Plugin) func() { + return func() { + timeutil.Ticker(func() { + if len(acceptedneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { + defer acceptedneighbors.INSTANCE.Lock()() + for len(acceptedneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { + acceptedneighbors.FurthestNeighborLock.RLock() + furthestNeighbor := acceptedneighbors.FURTHEST_NEIGHBOR + acceptedneighbors.FurthestNeighborLock.RUnlock() + + if furthestNeighbor != nil { + dropMessage := &drop.Drop{Issuer: ownpeer.INSTANCE} + dropMessage.Sign() + + acceptedneighbors.INSTANCE.Remove(furthestNeighbor.Identity.StringIdentifier, false) + go func() { + if _, err := furthestNeighbor.Send(dropMessage.Marshal(), types.PROTOCOL_TYPE_UDP, false); err != nil { + plugin.LogDebug("error when sending drop message to" + acceptedneighbors.FURTHEST_NEIGHBOR.String()) + } + }() + } + } + } + }, 1 * time.Second) + } +} diff --git a/plugins/autopeering/protocol/chosen_neighbor_dropper.go b/plugins/autopeering/protocol/chosen_neighbor_dropper.go new file mode 100644 index 0000000000000000000000000000000000000000..1e070d17970bd34a4425515c6f75b30a1d90c909 --- /dev/null +++ b/plugins/autopeering/protocol/chosen_neighbor_dropper.go @@ -0,0 +1,39 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/timeutil" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" + "time" +) + +func createChosenNeighborDropper(plugin *node.Plugin) func() { + return func() { + timeutil.Ticker(func() { + if len(chosenneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { + defer chosenneighbors.INSTANCE.Lock()() + for len(chosenneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { + chosenneighbors.FurthestNeighborLock.RLock() + furthestNeighbor := chosenneighbors.FURTHEST_NEIGHBOR + chosenneighbors.FurthestNeighborLock.RUnlock() + + if furthestNeighbor != nil { + dropMessage := &drop.Drop{Issuer: ownpeer.INSTANCE} + dropMessage.Sign() + + chosenneighbors.INSTANCE.Remove(furthestNeighbor.Identity.StringIdentifier, false) + go func() { + if _, err := furthestNeighbor.Send(dropMessage.Marshal(), types.PROTOCOL_TYPE_UDP, false); err != nil { + plugin.LogDebug("error when sending drop message to" + chosenneighbors.FURTHEST_NEIGHBOR.String()) + } + }() + } + } + } + }, 1 * time.Second) + } +} diff --git a/plugins/autopeering/protocol/constants/constants.go b/plugins/autopeering/protocol/constants/constants.go index ca9bdc23d8311b7deb81e06c2caa3f68f9c0e729..a95747d811ce8c70fb3ed20428025ef514dc3dd2 100644 --- a/plugins/autopeering/protocol/constants/constants.go +++ b/plugins/autopeering/protocol/constants/constants.go @@ -3,9 +3,9 @@ package constants import "time" const ( - NEIGHBOR_COUNT = 4 + NEIGHBOR_COUNT = 8 - FIND_NEIGHBOR_INTERVAL = 60 * time.Second + FIND_NEIGHBOR_INTERVAL = 10 * time.Second // How often does the outgoing ping processor check if new pings should be sent. PING_PROCESS_INTERVAL = 1 * time.Second diff --git a/plugins/autopeering/protocol/events.go b/plugins/autopeering/protocol/events.go deleted file mode 100644 index 31b3c1f10b5a9db57fb9da3025d239848333434e..0000000000000000000000000000000000000000 --- a/plugins/autopeering/protocol/events.go +++ /dev/null @@ -1,84 +0,0 @@ -package protocol - -import ( - "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" - "reflect" -) - -var Events = protocolEvents{ - DiscoverPeer: &peerEvent{make(map[uintptr]PeerConsumer)}, - IncomingRequestAccepted: &requestEvent{make(map[uintptr]RequestConsumer)}, - IncomingRequestRejected: &requestEvent{make(map[uintptr]RequestConsumer)}, - OutgoingRequestAccepted: &responseEvent{make(map[uintptr]ResponseConsumer)}, - OutgoingRequestRejected: &responseEvent{make(map[uintptr]ResponseConsumer)}, -} - -type protocolEvents struct { - DiscoverPeer *peerEvent - IncomingRequestAccepted *requestEvent - IncomingRequestRejected *requestEvent - OutgoingRequestAccepted *responseEvent - OutgoingRequestRejected *responseEvent -} - -type peerEvent struct { - callbacks map[uintptr]PeerConsumer -} - -func (this *peerEvent) Attach(callback PeerConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *peerEvent) Detach(callback PeerConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *peerEvent) Trigger(p *peer.Peer) { - for _, callback := range this.callbacks { - callback(p) - } -} - -type requestEvent struct { - callbacks map[uintptr]RequestConsumer -} - -func (this *requestEvent) Attach(callback RequestConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *requestEvent) Detach(callback RequestConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *requestEvent) Trigger(req *request.Request) { - for _, callback := range this.callbacks { - callback(req) - } -} - -type responseEvent struct { - callbacks map[uintptr]ResponseConsumer -} - -func (this *responseEvent) Attach(callback ResponseConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *responseEvent) Detach(callback ResponseConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *responseEvent) Trigger(res *response.Response) { - for _, callback := range this.callbacks { - callback(res) - } -} - -type PeerConsumer = func(p *peer.Peer) - -type RequestConsumer = func(req *request.Request) - -type ResponseConsumer = func(res *response.Response) diff --git a/plugins/autopeering/protocol/incoming_drop_processor.go b/plugins/autopeering/protocol/incoming_drop_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..2c40c45eb149c6b3d5c05c6308fe9ffa67afddd3 --- /dev/null +++ b/plugins/autopeering/protocol/incoming_drop_processor.go @@ -0,0 +1,17 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" +) + +func createIncomingDropProcessor(plugin *node.Plugin) func(drop *drop.Drop) { + return func(drop *drop.Drop) { + plugin.LogDebug("received drop message from " + drop.Issuer.String()) + + chosenneighbors.INSTANCE.Remove(drop.Issuer.Identity.StringIdentifier) + acceptedneighbors.INSTANCE.Remove(drop.Issuer.Identity.StringIdentifier) + } +} diff --git a/plugins/autopeering/protocol/incoming_ping_processor.go b/plugins/autopeering/protocol/incoming_ping_processor.go index 38d615333f0c087f952cdff1582946a0d371b215..f9beb33e33fa10e3b6e7793dedc9f5e5c5c49cd8 100644 --- a/plugins/autopeering/protocol/incoming_ping_processor.go +++ b/plugins/autopeering/protocol/incoming_ping_processor.go @@ -2,6 +2,7 @@ package protocol import ( "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/types/ping" ) @@ -9,9 +10,9 @@ func createIncomingPingProcessor(plugin *node.Plugin) func(ping *ping.Ping) { return func(ping *ping.Ping) { plugin.LogDebug("received ping from " + ping.Issuer.String()) - Events.DiscoverPeer.Trigger(ping.Issuer) + knownpeers.INSTANCE.AddOrUpdate(ping.Issuer) for _, neighbor := range ping.Neighbors { - Events.DiscoverPeer.Trigger(neighbor) + knownpeers.INSTANCE.AddOrUpdate(neighbor) } } } diff --git a/plugins/autopeering/protocol/incoming_request_processor.go b/plugins/autopeering/protocol/incoming_request_processor.go index 5d5f2bbef43694692f6a12e64a463e150225771e..53e27cb5b59e6bb1dd35f33537d1bdca72fe131c 100644 --- a/plugins/autopeering/protocol/incoming_request_processor.go +++ b/plugins/autopeering/protocol/incoming_request_processor.go @@ -3,12 +3,13 @@ package protocol import ( "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" - "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighborcandidates" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" + "math/rand" ) func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Request) { @@ -20,17 +21,13 @@ func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Reque func processIncomingRequest(plugin *node.Plugin, req *request.Request) { plugin.LogDebug("received peering request from " + req.Issuer.String()) - Events.DiscoverPeer.Trigger(req.Issuer) + knownpeers.INSTANCE.AddOrUpdate(req.Issuer) - if requestingNodeIsCloser(req) { - acceptedneighbors.INSTANCE.Lock.Lock() - defer acceptedneighbors.INSTANCE.Lock.Unlock() + if requestShouldBeAccepted(req) { + defer acceptedneighbors.INSTANCE.Lock()() - if requestingNodeIsCloser(req) { - acceptedneighbors.INSTANCE.AddOrUpdate(req.Issuer) - if len(acceptedneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { - // drop further away node - } + if requestShouldBeAccepted(req) { + acceptedneighbors.INSTANCE.AddOrUpdate(req.Issuer, false) acceptRequest(plugin, req) @@ -41,11 +38,10 @@ func processIncomingRequest(plugin *node.Plugin, req *request.Request) { rejectRequest(plugin, req) } -func requestingNodeIsCloser(req *request.Request) bool { - //distanceFn := chosenneighborcandidates.DISTANCE(ownpeer.INSTANCE) - - return len(acceptedneighbors.INSTANCE.Peers) <= constants.NEIGHBOR_COUNT / 2 || - acceptedneighbors.INSTANCE.Contains(req.Issuer.Identity.StringIdentifier) +func requestShouldBeAccepted(req *request.Request) bool { + return len(acceptedneighbors.INSTANCE.Peers) < constants.NEIGHBOR_COUNT / 2 || + acceptedneighbors.INSTANCE.Contains(req.Issuer.Identity.StringIdentifier) || + acceptedneighbors.OWN_DISTANCE(req.Issuer) < acceptedneighbors.FURTHEST_NEIGHBOR_DISTANCE } func acceptRequest(plugin *node.Plugin, req *request.Request) { @@ -55,7 +51,7 @@ func acceptRequest(plugin *node.Plugin, req *request.Request) { plugin.LogDebug("sent positive peering response to " + req.Issuer.String()) - Events.IncomingRequestAccepted.Trigger(req) + acceptedneighbors.INSTANCE.AddOrUpdate(req.Issuer, false) } func rejectRequest(plugin *node.Plugin, req *request.Request) { @@ -64,12 +60,15 @@ func rejectRequest(plugin *node.Plugin, req *request.Request) { } plugin.LogDebug("sent negative peering response to " + req.Issuer.String()) - - Events.IncomingRequestRejected.Trigger(req) } func generateProposedPeeringCandidates(req *request.Request) peerlist.PeerList { - return neighborhood.LIST_INSTANCE.Filter(func(p *peer.Peer) bool { + proposedPeers := neighborhood.LIST_INSTANCE.Filter(func(p *peer.Peer) bool { return p.Identity.PublicKey != nil - }).Sort(chosenneighborcandidates.DISTANCE(req.Issuer)) + }) + rand.Shuffle(len(proposedPeers), func(i, j int) { + proposedPeers[i], proposedPeers[j] = proposedPeers[j], proposedPeers[i] + }) + + return proposedPeers } \ No newline at end of file diff --git a/plugins/autopeering/protocol/incoming_response_processor.go b/plugins/autopeering/protocol/incoming_response_processor.go index 9325c655547b844f6803c0f87e0ae2409a92d644..dd732bcb41a639c2f6b0de857f6d3e0b58d4056f 100644 --- a/plugins/autopeering/protocol/incoming_response_processor.go +++ b/plugins/autopeering/protocol/incoming_response_processor.go @@ -2,32 +2,44 @@ package protocol import ( "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/knownpeers" "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" - "strconv" ) func createIncomingResponseProcessor(plugin *node.Plugin) func(peeringResponse *response.Response) { return func(peeringResponse *response.Response) { - Events.DiscoverPeer.Trigger(peeringResponse.Issuer) - for _, peer := range peeringResponse.Peers { - Events.DiscoverPeer.Trigger(peer) - } + go processIncomingResponse(plugin, peeringResponse) + } +} - if peeringResponse.Issuer.Conn == nil { - plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.String()) - } else { - plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.String()) +func processIncomingResponse(plugin *node.Plugin, peeringResponse *response.Response) { + plugin.LogDebug("received peering response from " + peeringResponse.Issuer.String()) - peeringResponse.Issuer.Conn.Close() - } + peeringResponse.Issuer.Conn.Close() + + knownpeers.INSTANCE.AddOrUpdate(peeringResponse.Issuer) + for _, peer := range peeringResponse.Peers { + knownpeers.INSTANCE.AddOrUpdate(peer) + } + + if peeringResponse.Type == response.TYPE_ACCEPT { + defer chosenneighbors.INSTANCE.Lock()() + + chosenneighbors.INSTANCE.AddOrUpdate(peeringResponse.Issuer, false) - switch peeringResponse.Type { - case response.TYPE_ACCEPT: - Events.OutgoingRequestAccepted.Trigger(peeringResponse) - case response.TYPE_REJECT: - Events.OutgoingRequestRejected.Trigger(peeringResponse) - default: - plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) + /* + if len(chosenneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT / 2 { + dropMessage := &drop.Drop{Issuer:ownpeer.INSTANCE} + dropMessage.Sign() + + chosenneighbors.FurthestNeighborLock.RLock() + if _, err := chosenneighbors.FURTHEST_NEIGHBOR.Send(dropMessage.Marshal(), types.PROTOCOL_TYPE_UDP, false); err != nil { + plugin.LogDebug("error when sending drop message to" + chosenneighbors.FURTHEST_NEIGHBOR.String()) + } + chosenneighbors.INSTANCE.Remove(chosenneighbors.FURTHEST_NEIGHBOR.Identity.StringIdentifier, false) + chosenneighbors.FurthestNeighborLock.RUnlock() } + */ } -} +} \ No newline at end of file diff --git a/plugins/autopeering/protocol/outgoing_request_processor.go b/plugins/autopeering/protocol/outgoing_request_processor.go index 7844125b7de0bbd3efb5bb82199368520613d485..617bfaa91f1a614acb29ad0f03407d2a7d8c5128 100644 --- a/plugins/autopeering/protocol/outgoing_request_processor.go +++ b/plugins/autopeering/protocol/outgoing_request_processor.go @@ -5,13 +5,12 @@ import ( "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/acceptedneighbors" - "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighborcandidates" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/chosenneighbors" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "time" ) @@ -40,24 +39,28 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() { } func sendOutgoingRequests(plugin *node.Plugin) { - for _, chosenNeighborCandidate := range chosenneighborcandidates.INSTANCE { - go func(peer *peer.Peer) { - nodeId := peer.Identity.StringIdentifier - - if !acceptedneighbors.INSTANCE.Contains(nodeId) && - !chosenneighbors.INSTANCE.Contains(nodeId) && - accountability.OWN_ID.StringIdentifier != nodeId { - - if dialed, err := peer.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil { - plugin.LogDebug(err.Error()) - } else { - plugin.LogDebug("sent peering request to " + peer.String()) - - if dialed { - tcp.HandleConnection(peer.Conn) - } + for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() { + time.Sleep(5 * time.Second) + + if candidateShouldBeContacted(chosenNeighborCandidate) { + if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil { + plugin.LogDebug(err.Error()) + } else { + plugin.LogDebug("sent peering request to " + chosenNeighborCandidate.String()) + + if dialed { + tcp.HandleConnection(chosenNeighborCandidate.Conn) } } - }(chosenNeighborCandidate) + } } } + +func candidateShouldBeContacted(candidate *peer.Peer) bool { + nodeId := candidate.Identity.StringIdentifier + + return (!acceptedneighbors.INSTANCE.Contains(nodeId) &&!chosenneighbors.INSTANCE.Contains(nodeId) && + accountability.OWN_ID.StringIdentifier != nodeId) && ( + len(chosenneighbors.INSTANCE.Peers) < constants.NEIGHBOR_COUNT / 2 || + chosenneighbors.OWN_DISTANCE(candidate) < chosenneighbors.FURTHEST_NEIGHBOR_DISTANCE) +} diff --git a/plugins/autopeering/protocol/plugin.go b/plugins/autopeering/protocol/plugin.go index 0e6ed5a33db2e5acbeb7d2c712eb30d346a30d37..e875ad2761a2f5f2699283d365a54eb6ccb5462e 100644 --- a/plugins/autopeering/protocol/plugin.go +++ b/plugins/autopeering/protocol/plugin.go @@ -10,6 +10,7 @@ import ( func Configure(plugin *node.Plugin) { errorHandler := createErrorHandler(plugin) + udp.Events.ReceiveDrop.Attach(createIncomingDropProcessor(plugin)) udp.Events.ReceivePing.Attach(createIncomingPingProcessor(plugin)) udp.Events.Error.Attach(errorHandler) @@ -19,6 +20,8 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(createChosenNeighborDropper(plugin)) + daemon.BackgroundWorker(createAcceptedNeighborDropper(plugin)) daemon.BackgroundWorker(createOutgoingRequestProcessor(plugin)) daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) } diff --git a/plugins/autopeering/saltmanager/constants.go b/plugins/autopeering/saltmanager/constants.go index 12c19170ebe4b67a79172d0d69fcced9e867355a..20bd690699594d36bef7a91e0f02d125b8a5e6f0 100644 --- a/plugins/autopeering/saltmanager/constants.go +++ b/plugins/autopeering/saltmanager/constants.go @@ -3,8 +3,8 @@ package saltmanager import "time" const ( - PUBLIC_SALT_LIFETIME = 50 * time.Second - PRIVATE_SALT_LIFETIME = 50 * time.Second + PUBLIC_SALT_LIFETIME = 1800 * time.Second + PRIVATE_SALT_LIFETIME = 1800 * time.Second ) var ( diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index acb88f066b8303be093d2072d358cafe1c2036c9..340e901bd982dc052a3436ab19ae7ac6e65b8daf 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -3,16 +3,22 @@ package saltmanager import ( "github.com/dgraph-io/badger" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/settings" "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" "time" ) var ( - PRIVATE_SALT = createSalt(PRIVATE_SALT_SETTINGS_KEY, PRIVATE_SALT_LIFETIME, Events.UpdatePrivateSalt.Trigger) - PUBLIC_SALT = createSalt(PUBLIC_SALT_SETTINGS_KEY, PUBLIC_SALT_LIFETIME, Events.UpdatePublicSalt.Trigger) + PRIVATE_SALT *salt.Salt + PUBLIC_SALT *salt.Salt ) +func Configure(plugin *node.Plugin) { + PRIVATE_SALT = createSalt(PRIVATE_SALT_SETTINGS_KEY, PRIVATE_SALT_LIFETIME, Events.UpdatePrivateSalt.Trigger) + PUBLIC_SALT = createSalt(PUBLIC_SALT_SETTINGS_KEY, PUBLIC_SALT_LIFETIME, Events.UpdatePublicSalt.Trigger) +} + func generateNewSalt(key []byte, lifetime time.Duration) *salt.Salt { newSalt := salt.New(lifetime) @@ -61,14 +67,14 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan if saltToUpdate.ExpirationTime.Before(now) { updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) } else { - go func() { + daemon.BackgroundWorker(func() { select { - case <-daemon.ShutdownSignal: - return case <-time.After(saltToUpdate.ExpirationTime.Sub(now)): updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) + case <-daemon.ShutdownSignal: + return } - }() + }) } } diff --git a/plugins/autopeering/server/udp/events.go b/plugins/autopeering/server/udp/events.go index 68f8d4131190fc9cba28662e3bf92275f26a0827..449a0265ba7b2548c8766e6503b0f26997174969 100644 --- a/plugins/autopeering/server/udp/events.go +++ b/plugins/autopeering/server/udp/events.go @@ -1,6 +1,7 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" "github.com/iotaledger/goshimmer/plugins/autopeering/types/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" @@ -9,6 +10,7 @@ import ( ) var Events = &pluginEvents{ + ReceiveDrop: &dropEvent{make(map[uintptr]DropConsumer)}, ReceivePing: &pingEvent{make(map[uintptr]PingConsumer)}, ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, @@ -16,12 +18,31 @@ var Events = &pluginEvents{ } type pluginEvents struct { + ReceiveDrop *dropEvent ReceivePing *pingEvent ReceiveRequest *requestEvent ReceiveResponse *responseEvent Error *ipErrorEvent } +type dropEvent struct { + callbacks map[uintptr]DropConsumer +} + +func (this *dropEvent) Attach(callback DropConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *dropEvent) Detach(callback DropConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *dropEvent) Trigger(drop *drop.Drop) { + for _, callback := range this.callbacks { + callback(drop) + } +} + type pingEvent struct { callbacks map[uintptr]PingConsumer } diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index 704b1150eeb93c58a15ae22fc0a533a8d578d62d..b050d205f1619f50bf290ec079f6d1cd55a5170b 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" "github.com/iotaledger/goshimmer/plugins/autopeering/types/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" @@ -81,6 +82,14 @@ func processReceivedData(addr *net.UDPAddr, data []byte) { Events.ReceivePing.Trigger(ping) } + case drop.MARSHALLED_PACKET_HEADER: + if drop, err := drop.Unmarshal(data); err != nil { + Events.Error.Trigger(addr.IP, err) + } else { + drop.Issuer.Address = addr.IP + + Events.ReceiveDrop.Trigger(drop) + } default: Events.Error.Trigger(addr.IP, errors.New("invalid UDP peering packet from " + addr.IP.String())) } diff --git a/plugins/autopeering/server/udp/types.go b/plugins/autopeering/server/udp/types.go index 985e1d02da2a04aac232b50cf2eb224b692fb4d2..2c18f4f81c864fbfcc0a4f8ab9377f51fd015786 100644 --- a/plugins/autopeering/server/udp/types.go +++ b/plugins/autopeering/server/udp/types.go @@ -1,12 +1,15 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/drop" "github.com/iotaledger/goshimmer/plugins/autopeering/types/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" "net" ) +type DropConsumer = func(d *drop.Drop) + type PingConsumer = func(p *ping.Ping) type ConnectionPeeringRequestConsumer = func(request *request.Request) diff --git a/plugins/autopeering/types/drop/constants.go b/plugins/autopeering/types/drop/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..09c7b3c1b250d2f4212e469166e7bc15d17b66d9 --- /dev/null +++ b/plugins/autopeering/types/drop/constants.go @@ -0,0 +1,23 @@ +package drop + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" +) + +const ( + MARSHALLED_PACKET_HEADER = 0x05 + + PACKET_HEADER_START = 0 + MARSHALLED_ISSUER_START = PACKET_HEADER_END + MARSHALLED_SIGNATURE_START = MARSHALLED_ISSUER_END + + PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE + MARSHALLED_ISSUER_END = MARSHALLED_ISSUER_START + MARSHALLED_ISSUER_SIZE + MARSHALLED_SIGNATURE_END = MARSHALLED_SIGNATURE_START + MARSHALLED_SIGNATURE_SIZE + + PACKET_HEADER_SIZE = 1 + MARSHALLED_ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_SIGNATURE_SIZE = 65 + + MARSHALLED_TOTAL_SIZE = MARSHALLED_SIGNATURE_END +) diff --git a/plugins/autopeering/types/drop/drop.go b/plugins/autopeering/types/drop/drop.go new file mode 100644 index 0000000000000000000000000000000000000000..374be520b70150a05751c381da9240f7f6f66013 --- /dev/null +++ b/plugins/autopeering/types/drop/drop.go @@ -0,0 +1,59 @@ +package drop + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" +) + +type Drop struct { + Issuer *peer.Peer + Signature [MARSHALLED_SIGNATURE_SIZE]byte +} + +func Unmarshal(data []byte) (*Drop, error) { + if data[0] != MARSHALLED_PACKET_HEADER || len(data) != MARSHALLED_TOTAL_SIZE { + return nil, ErrMalformedDropMessage + } + + ping := &Drop{} + + if unmarshalledPeer, err := peer.Unmarshal(data[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END]); err != nil { + return nil, err + } else { + ping.Issuer = unmarshalledPeer + } + if err := saltmanager.CheckSalt(ping.Issuer.Salt); err != nil { + return nil, err + } + + if issuer, err := identity.FromSignedData(data[:MARSHALLED_SIGNATURE_START], data[MARSHALLED_SIGNATURE_START:]); err != nil { + return nil, err + } else { + if !bytes.Equal(issuer.Identifier, ping.Issuer.Identity.Identifier) { + return nil, ErrInvalidSignature + } + } + copy(ping.Signature[:], data[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END]) + + return ping, nil +} + +func (ping *Drop) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) + + result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(result[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END], ping.Issuer.Marshal()) + copy(result[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END], ping.Signature[:MARSHALLED_SIGNATURE_SIZE]) + + return result +} + +func (this *Drop) Sign() { + if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:MARSHALLED_SIGNATURE_START]); err != nil { + panic(err) + } else { + copy(this.Signature[:], signature) + } +} diff --git a/plugins/autopeering/types/drop/errors.go b/plugins/autopeering/types/drop/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..4c7db5bb411b5f19d46c67ad493b818fa8345dd7 --- /dev/null +++ b/plugins/autopeering/types/drop/errors.go @@ -0,0 +1,8 @@ +package drop + +import "github.com/pkg/errors" + +var ( + ErrInvalidSignature = errors.New("invalid signature in drop message") + ErrMalformedDropMessage = errors.New("malformed drop message") +) diff --git a/plugins/autopeering/types/peerlist/peer_list.go b/plugins/autopeering/types/peerlist/peer_list.go index a83dd135731b6439618a38c4066823e5be008a3e..f31af2f4c9502d1d3cf3cdb684cdf964854f0180 100644 --- a/plugins/autopeering/types/peerlist/peer_list.go +++ b/plugins/autopeering/types/peerlist/peer_list.go @@ -7,6 +7,15 @@ import ( type PeerList []*peer.Peer +func (this PeerList) Clone() PeerList { + result := make(PeerList, len(this)) + for i, entry := range this { + result[i] = entry + } + + return result +} + func (this PeerList) Filter(predicate func(p *peer.Peer) bool) PeerList { peerList := make(PeerList, len(this)) diff --git a/plugins/autopeering/types/peerregister/events.go b/plugins/autopeering/types/peerregister/events.go new file mode 100644 index 0000000000000000000000000000000000000000..29daca8234d4945a365f202d0320793cf579255f --- /dev/null +++ b/plugins/autopeering/types/peerregister/events.go @@ -0,0 +1,32 @@ +package peerregister + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "reflect" +) + +type peerRegisterEvents struct { + Add *peerEvent + Update *peerEvent + Remove *peerEvent +} + +type peerEvent struct { + callbacks map[uintptr]PeerConsumer +} + +func (this *peerEvent) Attach(callback PeerConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *peerEvent) Detach(callback PeerConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *peerEvent) Trigger(p *peer.Peer) { + for _, callback := range this.callbacks { + callback(p) + } +} + +type PeerConsumer = func(p *peer.Peer) diff --git a/plugins/autopeering/types/peerregister/peer_register.go b/plugins/autopeering/types/peerregister/peer_register.go index 68fec0d3aca5ba6dc30848817e3d57f583031ae5..bae902d5882f52b5d03224d78ce0722017b58980 100644 --- a/plugins/autopeering/types/peerregister/peer_register.go +++ b/plugins/autopeering/types/peerregister/peer_register.go @@ -4,24 +4,34 @@ import ( "bytes" "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "sync" ) type PeerRegister struct { - Peers map[string]*peer.Peer - Lock sync.RWMutex + Peers map[string]*peer.Peer + Events peerRegisterEvents + lock sync.RWMutex } func New() *PeerRegister { return &PeerRegister{ Peers: make(map[string]*peer.Peer), + Events: peerRegisterEvents{ + Add: &peerEvent{make(map[uintptr]PeerConsumer)}, + Update: &peerEvent{make(map[uintptr]PeerConsumer)}, + Remove: &peerEvent{make(map[uintptr]PeerConsumer)}, + }, } } // returns true if a new entry was added -func (this *PeerRegister) AddOrUpdate(peer *peer.Peer) bool { +func (this *PeerRegister) AddOrUpdate(peer *peer.Peer, lock... bool) bool { + if len(lock) == 0 || lock[0] { + defer this.Lock()() + } + if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { return false } @@ -31,18 +41,37 @@ func (this *PeerRegister) AddOrUpdate(peer *peer.Peer) bool { existingPeer.GossipPort = peer.GossipPort existingPeer.PeeringPort = peer.PeeringPort - // trigger update peer + this.Events.Update.Trigger(existingPeer) return false } else { this.Peers[peer.Identity.StringIdentifier] = peer - // trigger add peer + this.Events.Add.Trigger(peer) return true } } +// by calling defer peerRegister.Lock()() we can auto-lock AND unlock (note: two parentheses) +func (this *PeerRegister) Lock() func() { + this.lock.Lock() + + return this.lock.Unlock +} + +func (this *PeerRegister) Remove(key string, lock... bool) { + if len(lock) == 0 || lock[0] { + defer this.Lock()() + } + + if peerEntry, exists := this.Peers[key]; exists { + delete(this.Peers, key) + + this.Events.Remove.Trigger(peerEntry) + } +} + func (this *PeerRegister) Contains(key string) bool { if _, exists := this.Peers[key]; exists { return true diff --git a/plugins/cli/plugin.go b/plugins/cli/plugin.go index 3eacb9b140bbf72a214a8d0eda6cf06007e37236..ff13c2d8dd09f7b6075d7e4fa62190c3a9873a83 100644 --- a/plugins/cli/plugin.go +++ b/plugins/cli/plugin.go @@ -2,6 +2,7 @@ package cli import ( "flag" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/parameter" "strings" @@ -28,8 +29,8 @@ func init() { onAddStringParameter(param) } - parameter.Events.AddInt.Attach(onAddIntParameter) - parameter.Events.AddString.Attach(onAddStringParameter) + parameter.Events.AddInt.Attach(events.NewClosure(onAddIntParameter)) + parameter.Events.AddString.Attach(events.NewClosure(onAddStringParameter)) flag.Usage = printUsage diff --git a/plugins/statusscreen/statusscreen.go b/plugins/statusscreen/statusscreen.go index df4e4f717fba2a1490c6859f919861bd0565bc91..7f7c7b30141cfbb60734c3ebf32122498e6c2c77 100644 --- a/plugins/statusscreen/statusscreen.go +++ b/plugins/statusscreen/statusscreen.go @@ -3,6 +3,7 @@ package statusscreen import ( "github.com/gdamore/tcell" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/rivo/tview" "time" @@ -18,13 +19,13 @@ func configure(plugin *node.Plugin) { plugin.Node.AddLogger(DEFAULT_LOGGER) - daemon.Events.Shutdown.Attach(func() { + daemon.Events.Shutdown.Attach(events.NewClosure(func() { node.DEFAULT_LOGGER.Enabled = true if app != nil { app.Stop() } - }) + })) } func run(plugin *node.Plugin) {