From 54b87796af4c1604a1d57b4dc87460b85cf9450d Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Sat, 4 May 2019 21:55:53 +0200 Subject: [PATCH] Fix: fixed problematic event handler implementation (closure wrapper) --- packages/events/types.go | 2 + packages/network/events.go | 64 ++-------------- packages/network/managed_connection.go | 11 ++- packages/network/tcp/server.go | 9 ++- packages/network/tcp/server_events.go | 76 ++----------------- packages/network/udp/events.go | 71 ++--------------- packages/network/udp/server.go | 16 ++-- plugins/analysis/client/plugin.go | 27 +++---- plugins/analysis/server/plugin.go | 26 +++---- .../acceptedneighbors/furthest_neighbor.go | 9 ++- .../chosenneighbors/furthest_neighbor.go | 9 ++- .../instances/outgoingrequest/instance.go | 5 +- plugins/autopeering/plugin.go | 24 +++--- .../protocol/outgoing_ping_processor.go | 5 +- plugins/autopeering/saltmanager/events.go | 34 ++------- .../autopeering/saltmanager/saltmanager.go | 6 +- plugins/autopeering/saltmanager/types.go | 5 -- plugins/autopeering/server/tcp/server.go | 31 ++++---- plugins/autopeering/server/udp/server.go | 15 ++-- plugins/autopeering/types/peer/peer.go | 5 +- .../autopeering/types/peerregister/events.go | 28 ++----- .../types/peerregister/peer_register.go | 7 +- 22 files changed, 141 insertions(+), 344 deletions(-) delete mode 100644 plugins/autopeering/saltmanager/types.go diff --git a/packages/events/types.go b/packages/events/types.go index cbb2d798..8160d213 100644 --- a/packages/events/types.go +++ b/packages/events/types.go @@ -1,3 +1,5 @@ package events func CallbackCaller(handler interface{}, params ...interface{}) { handler.(func())() } + +func ErrorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } diff --git a/packages/network/events.go b/packages/network/events.go index fff2621d..14785389 100644 --- a/packages/network/events.go +++ b/packages/network/events.go @@ -1,63 +1,13 @@ package network -import "reflect" +import ( + "github.com/iotaledger/goshimmer/packages/events" +) type BufferedConnectionEvents struct { - ReceiveData *dataEvent - Close *callbackEvent - Error *errorEvent + ReceiveData *events.Event + Close *events.Event + Error *events.Event } -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -type errorEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } -} - -type dataEvent struct { - callbacks map[uintptr]DataConsumer -} - -func (this *dataEvent) Attach(callback DataConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *dataEvent) Detach(callback DataConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *dataEvent) Trigger(data []byte) { - for _, callback := range this.callbacks { - callback(data) - } -} +func dataCaller(handler interface{}, params ...interface{}) { handler.(func([]byte))(params[0].([]byte)) } \ No newline at end of file diff --git a/packages/network/managed_connection.go b/packages/network/managed_connection.go index 2adcb281..cb70a9b8 100644 --- a/packages/network/managed_connection.go +++ b/packages/network/managed_connection.go @@ -1,6 +1,7 @@ package network import ( + "github.com/iotaledger/goshimmer/packages/events" "io" "net" "sync" @@ -19,9 +20,9 @@ func NewManagedConnection(conn net.Conn) *ManagedConnection { bufferedConnection := &ManagedConnection{ Conn: conn, Events: BufferedConnectionEvents{ - ReceiveData: &dataEvent{make(map[uintptr]DataConsumer)}, - Close: &callbackEvent{make(map[uintptr]Callback)}, - Error: &errorEvent{make(map[uintptr]ErrorConsumer)}, + ReceiveData: events.NewEvent(dataCaller), + Close: events.NewEvent(events.CallbackCaller), + Error: events.NewEvent(events.ErrorCaller), }, } @@ -68,7 +69,9 @@ func (this *ManagedConnection) Close() error { this.Events.Error.Trigger(err) } - this.closeOnce.Do(this.Events.Close.Trigger) + this.closeOnce.Do(func() { + this.Events.Close.Trigger() + }) return err } diff --git a/packages/network/tcp/server.go b/packages/network/tcp/server.go index cd4069be..3f8a3d0c 100644 --- a/packages/network/tcp/server.go +++ b/packages/network/tcp/server.go @@ -1,6 +1,7 @@ package tcp import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "net" "strconv" @@ -51,10 +52,10 @@ func (this *Server) Listen(port int) *Server { func NewServer() *Server { return &Server{ Events: serverEvents{ - Start: &callbackEvent{make(map[uintptr]Callback)}, - Shutdown: &callbackEvent{make(map[uintptr]Callback)}, - Connect: &peerConsumerEvent{make(map[uintptr]PeerConsumer)}, - Error: &errorConsumerEvent{make(map[uintptr]ErrorConsumer)}, + Start: events.NewEvent(events.CallbackCaller), + Shutdown: events.NewEvent(events.CallbackCaller), + Connect: events.NewEvent(managedConnectionCaller), + Error: events.NewEvent(events.ErrorCaller), }, } } diff --git a/packages/network/tcp/server_events.go b/packages/network/tcp/server_events.go index 85561a9f..d3b6a57d 100644 --- a/packages/network/tcp/server_events.go +++ b/packages/network/tcp/server_events.go @@ -1,79 +1,15 @@ package tcp import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" - "reflect" ) type serverEvents struct { - Start *callbackEvent - Shutdown *callbackEvent - Connect *peerConsumerEvent - Error *errorConsumerEvent + Start *events.Event + Shutdown *events.Event + Connect *events.Event + Error *events.Event } -// region callbackEvent ///////////////////////////////////////////////////////////////////////////////////////////////// - -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region errorConsumerEvent //////////////////////////////////////////////////////////////////////////////////////////// - -type errorConsumerEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorConsumerEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorConsumerEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorConsumerEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region peerConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type peerConsumerEvent struct { - callbacks map[uintptr]PeerConsumer -} - -func (this *peerConsumerEvent) Attach(callback PeerConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *peerConsumerEvent) Detach(callback PeerConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *peerConsumerEvent) Trigger(conn *network.ManagedConnection) { - for _, callback := range this.callbacks { - callback(conn) - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +func managedConnectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) } diff --git a/packages/network/udp/events.go b/packages/network/udp/events.go index 0ce86386..f9016299 100644 --- a/packages/network/udp/events.go +++ b/packages/network/udp/events.go @@ -1,72 +1,15 @@ package udp import ( + "github.com/iotaledger/goshimmer/packages/events" "net" - "reflect" ) -//region peerConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -//region dataConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type dataConsumerEvent struct { - callbacks map[uintptr]AddressDataConsumer -} - -func (this *dataConsumerEvent) Attach(callback AddressDataConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *dataConsumerEvent) Detach(callback AddressDataConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *dataConsumerEvent) Trigger(addr *net.UDPAddr, data []byte) { - for _, callback := range this.callbacks { - callback(addr, data) - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -//region errorConsumerEvent //////////////////////////////////////////////////////////////////////////////////////////// - -type errorConsumerEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorConsumerEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorConsumerEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorConsumerEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } +type serverEvents struct { + Start *events.Event + Shutdown *events.Event + ReceiveData *events.Event + Error *events.Event } -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +func dataCaller(handler interface{}, params ...interface{}) { handler.(func(*net.UDPAddr, []byte))(params[0].(*net.UDPAddr), params[1].([]byte)) } diff --git a/packages/network/udp/server.go b/packages/network/udp/server.go index 82caae03..0e3a6afe 100644 --- a/packages/network/udp/server.go +++ b/packages/network/udp/server.go @@ -1,17 +1,11 @@ package udp import ( + "github.com/iotaledger/goshimmer/packages/events" "net" "strconv" ) -type serverEvents struct { - Start *callbackEvent - Shutdown *callbackEvent - ReceiveData *dataConsumerEvent - Error *errorConsumerEvent -} - type Server struct { Socket net.PacketConn ReceiveBufferSize int @@ -55,10 +49,10 @@ func NewServer(receiveBufferSize int) *Server { return &Server{ ReceiveBufferSize: receiveBufferSize, Events: serverEvents{ - Start: &callbackEvent{make(map[uintptr]Callback)}, - Shutdown: &callbackEvent{make(map[uintptr]Callback)}, - ReceiveData: &dataConsumerEvent{make(map[uintptr]AddressDataConsumer)}, - Error: &errorConsumerEvent{make(map[uintptr]ErrorConsumer)}, + Start: events.NewEvent(events.CallbackCaller), + Shutdown: events.NewEvent(events.CallbackCaller), + ReceiveData: events.NewEvent(dataCaller), + Error: events.NewEvent(events.ErrorCaller), }, } } diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 243b45b9..5233feed 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -3,6 +3,7 @@ package client import ( "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" @@ -59,25 +60,25 @@ func reportCurrentStatus(eventDispatchers *EventDispatchers) { func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatchers) { // define hooks //////////////////////////////////////////////////////////////////////////////////////////////////// - onDiscoverPeer := func(p *peer.Peer) { + onDiscoverPeer := events.NewClosure(func(p *peer.Peer) { eventDispatchers.AddNode(p.Identity.Identifier) - } + }) - onAddAcceptedNeighbor := func(p *peer.Peer) { + onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.ConnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) - } + }) - onRemoveAcceptedNeighbor := func(p *peer.Peer) { + onRemoveAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.DisconnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) - } + }) - onAddChosenNeighbor := func(p *peer.Peer) { + onAddChosenNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.ConnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) - } + }) - onRemoveChosenNeighbor := func(p *peer.Peer) { + onRemoveChosenNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.DisconnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) - } + }) // setup hooks ///////////////////////////////////////////////////////////////////////////////////////////////////// @@ -89,8 +90,8 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch // clean up hooks on close ///////////////////////////////////////////////////////////////////////////////////////// - var onClose func() - onClose = func() { + var onClose *events.Closure + onClose = events.NewClosure(func() { knownpeers.INSTANCE.Events.Add.Detach(onDiscoverPeer) acceptedneighbors.INSTANCE.Events.Add.Detach(onAddAcceptedNeighbor) acceptedneighbors.INSTANCE.Events.Remove.Detach(onRemoveAcceptedNeighbor) @@ -98,7 +99,7 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch chosenneighbors.INSTANCE.Events.Remove.Detach(onRemoveChosenNeighbor) conn.Events.Close.Detach(onClose) - } + }) conn.Events.Close.Attach(onClose) } diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 861ffe4f..88beeba8 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -3,6 +3,7 @@ package server import ( "encoding/hex" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" @@ -21,16 +22,16 @@ var server *tcp.Server func Configure(plugin *node.Plugin) { server = tcp.NewServer() - server.Events.Connect.Attach(HandleConnection) - server.Events.Error.Attach(func(err error) { + server.Events.Connect.Attach(events.NewClosure(HandleConnection)) + server.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in server: " + err.Error()) - }) - server.Events.Start.Attach(func() { + })) + server.Events.Start.Attach(events.NewClosure(func() { plugin.LogSuccess("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ... done") - }) - server.Events.Shutdown.Attach(func() { + })) + server.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping Server ... done") - }) + })) } func Run(plugin *node.Plugin) { @@ -55,18 +56,17 @@ func HandleConnection(conn *network.ManagedConnection) { var offset int var connectedNodeId string - var onReceiveData func(data []byte) - var onDisconnect func() + var onDisconnect *events.Closure - onReceiveData = func(data []byte) { + onReceiveData := events.NewClosure(func(data []byte) { processIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset, &connectedNodeId) - } - onDisconnect = func() { + }) + onDisconnect = events.NewClosure(func() { Events.NodeOffline.Trigger(connectedNodeId) conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.Close.Detach(onDisconnect) - } + }) conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.Close.Attach(onDisconnect) diff --git a/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go index 8dbcf68f..462e9d08 100644 --- a/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go +++ b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go @@ -1,6 +1,7 @@ package acceptedneighbors import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "sync" ) @@ -12,14 +13,14 @@ var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) var FurthestNeighborLock sync.RWMutex func configureFurthestNeighbor() { - INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() updateFurthestNeighbor(p) - }) + })) - INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -31,7 +32,7 @@ func configureFurthestNeighbor() { updateFurthestNeighbor(furthestNeighborCandidate) } } - }) + })) } func updateFurthestNeighbor(p *peer.Peer) { diff --git a/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go index 107f9eaa..7767ca0b 100644 --- a/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go +++ b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go @@ -1,6 +1,7 @@ package chosenneighbors import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "sync" ) @@ -12,7 +13,7 @@ var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) var FurthestNeighborLock sync.RWMutex func configureFurthestNeighbor() { - INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -21,9 +22,9 @@ func configureFurthestNeighbor() { FURTHEST_NEIGHBOR = p FURTHEST_NEIGHBOR_DISTANCE = distance } - }) + })) - INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -39,5 +40,5 @@ func configureFurthestNeighbor() { } } } - }) + })) } \ No newline at end of file diff --git a/plugins/autopeering/instances/outgoingrequest/instance.go b/plugins/autopeering/instances/outgoingrequest/instance.go index c91a9dbc..1ffc8b96 100644 --- a/plugins/autopeering/instances/outgoingrequest/instance.go +++ b/plugins/autopeering/instances/outgoingrequest/instance.go @@ -1,6 +1,7 @@ package outgoingrequest import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" @@ -16,7 +17,7 @@ func Configure(plugin *node.Plugin) { } INSTANCE.Sign() - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + saltmanager.Events.UpdatePublicSalt.Attach(events.NewClosure(func(salt *salt.Salt) { INSTANCE.Sign() - }) + })) } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 8254f34f..81f80fe5 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -36,24 +36,24 @@ func run(plugin *node.Plugin) { } func configureLogging(plugin *node.Plugin) { - acceptedneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - acceptedneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + })) + acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) - chosenneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - chosenneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + })) + chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) - knownpeers.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + knownpeers.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogInfo("peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - knownpeers.INSTANCE.Events.Update.Attach(func(p *peer.Peer) { + })) + knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) } diff --git a/plugins/autopeering/protocol/outgoing_ping_processor.go b/plugins/autopeering/protocol/outgoing_ping_processor.go index 580791ac..e94a78cf 100644 --- a/plugins/autopeering/protocol/outgoing_ping_processor.go +++ b/plugins/autopeering/protocol/outgoing_ping_processor.go @@ -3,6 +3,7 @@ package protocol import ( "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" @@ -30,9 +31,9 @@ func createOutgoingPingProcessor(plugin *node.Plugin) func() { } outgoingPing.Sign() - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + saltmanager.Events.UpdatePublicSalt.Attach(events.NewClosure(func(salt *salt.Salt) { outgoingPing.Sign() - }) + })) pingPeers(plugin, outgoingPing) diff --git a/plugins/autopeering/saltmanager/events.go b/plugins/autopeering/saltmanager/events.go index b9493017..18315773 100644 --- a/plugins/autopeering/saltmanager/events.go +++ b/plugins/autopeering/saltmanager/events.go @@ -1,34 +1,16 @@ package saltmanager import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" - "reflect" ) -type packageEvents struct { - UpdatePublicSalt *saltEvent - UpdatePrivateSalt *saltEvent +var Events = struct { + UpdatePublicSalt *events.Event + UpdatePrivateSalt *events.Event +}{ + UpdatePublicSalt: events.NewEvent(saltCaller), + UpdatePrivateSalt: events.NewEvent(saltCaller), } -type saltEvent struct { - callbacks map[uintptr]SaltConsumer -} - -func (this *saltEvent) Attach(callback SaltConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *saltEvent) Detach(callback SaltConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *saltEvent) Trigger(salt *salt.Salt) { - for _, callback := range this.callbacks { - callback(salt) - } -} - -var Events = packageEvents{ - UpdatePublicSalt: &saltEvent{make(map[uintptr]SaltConsumer)}, - UpdatePrivateSalt: &saltEvent{make(map[uintptr]SaltConsumer)}, -} +func saltCaller(handler interface{}, params ...interface{}) { handler.(func(*salt.Salt))(params[0].(*salt.Salt)) } diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index 340e901b..a18a1ebb 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -46,7 +46,7 @@ func getSalt(key []byte, lifetime time.Duration) *salt.Salt { } } -func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, updateCallback func(salt *salt.Salt)) { +func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, updateCallback func(params ...interface{})) { newSalt := salt.New(lifeSpan) saltToUpdate.Bytes = newSalt.Bytes @@ -61,7 +61,7 @@ func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time scheduleUpdateForSalt(saltToUpdate, settingsKey, lifeSpan, updateCallback) } -func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, callback SaltConsumer) { +func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, callback func(params ...interface{})) { now := time.Now() if saltToUpdate.ExpirationTime.Before(now) { @@ -78,7 +78,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan } } -func createSalt(settingsKey []byte, lifeSpan time.Duration, updateCallback SaltConsumer) *salt.Salt { +func createSalt(settingsKey []byte, lifeSpan time.Duration, updateCallback func(params ...interface{})) *salt.Salt { newSalt := getSalt(settingsKey, lifeSpan) scheduleUpdateForSalt(newSalt, settingsKey, lifeSpan, updateCallback) diff --git a/plugins/autopeering/saltmanager/types.go b/plugins/autopeering/saltmanager/types.go deleted file mode 100644 index 4a273e65..00000000 --- a/plugins/autopeering/saltmanager/types.go +++ /dev/null @@ -1,5 +0,0 @@ -package saltmanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" - -type SaltConsumer = func(salt *salt.Salt) diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 7740316e..0c25eadd 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -2,6 +2,7 @@ package tcp import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" @@ -18,20 +19,20 @@ import ( var server = tcp.NewServer() func ConfigureServer(plugin *node.Plugin) { - server.Events.Connect.Attach(HandleConnection) - server.Events.Error.Attach(func(err error) { + server.Events.Connect.Attach(events.NewClosure(HandleConnection)) + server.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in tcp server: " + err.Error()) - }) - server.Events.Start.Attach(func() { + })) + server.Events.Start.Attach(events.NewClosure(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } else { plugin.LogSuccess("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } - }) - server.Events.Shutdown.Attach(func() { + })) + server.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping TCP Server ... done") - }) + })) } func RunServer(plugin *node.Plugin) { @@ -59,9 +60,9 @@ func HandleConnection(conn *network.ManagedConnection) { var receiveBuffer []byte var offset int - conn.Events.ReceiveData.Attach(func(data []byte) { + conn.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { ProcessIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset) - }) + })) go conn.Read(make([]byte, int(math.Max(ping.MARSHALLED_TOTAL_SIZE, math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE))))) } @@ -141,9 +142,9 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, req.Issuer.Conn = conn req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - req.Issuer.Conn.Events.Close.Attach(func() { + req.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { req.Issuer.Conn = nil - }) + })) Events.ReceiveRequest.Trigger(req) } @@ -174,9 +175,9 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, res.Issuer.Conn = conn res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - res.Issuer.Conn.Events.Close.Attach(func() { + res.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { res.Issuer.Conn = nil - }) + })) Events.ReceiveResponse.Trigger(res) } @@ -207,9 +208,9 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con ping.Issuer.Conn = conn ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - ping.Issuer.Conn.Events.Close.Attach(func() { + ping.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { ping.Issuer.Conn = nil - }) + })) Events.ReceivePing.Trigger(ping) } diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index b050d205..a6ed7c51 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -2,6 +2,7 @@ package udp import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" @@ -22,20 +23,20 @@ func ConfigureServer(plugin *node.Plugin) { plugin.LogFailure(err.Error()) }) - udpServer.Events.ReceiveData.Attach(processReceivedData) - udpServer.Events.Error.Attach(func(err error) { + udpServer.Events.ReceiveData.Attach(events.NewClosure(processReceivedData)) + udpServer.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in udp server: " + err.Error()) - }) - udpServer.Events.Start.Attach(func() { + })) + udpServer.Events.Start.Attach(events.NewClosure(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogSuccess("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } else { plugin.LogSuccess("Starting UDP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } - }) - udpServer.Events.Shutdown.Attach(func() { + })) + udpServer.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping UDP Server ... done") - }) + })) } func RunServer(plugin *node.Plugin) { diff --git a/plugins/autopeering/types/peer/peer.go b/plugins/autopeering/types/peer/peer.go index 4604559a..088ea1af 100644 --- a/plugins/autopeering/types/peer/peer.go +++ b/plugins/autopeering/types/peer/peer.go @@ -2,6 +2,7 @@ package peer import ( "encoding/binary" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" @@ -80,9 +81,9 @@ func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) { } else { peer.Conn = network.NewManagedConnection(conn) - peer.Conn.Events.Close.Attach(func() { + peer.Conn.Events.Close.Attach(events.NewClosure(func() { peer.Conn = nil - }) + })) return peer.Conn, true, nil } diff --git a/plugins/autopeering/types/peerregister/events.go b/plugins/autopeering/types/peerregister/events.go index 29daca82..1bf6ca25 100644 --- a/plugins/autopeering/types/peerregister/events.go +++ b/plugins/autopeering/types/peerregister/events.go @@ -1,32 +1,14 @@ package peerregister import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "reflect" ) type peerRegisterEvents struct { - Add *peerEvent - Update *peerEvent - Remove *peerEvent + Add *events.Event + Update *events.Event + Remove *events.Event } -type peerEvent struct { - callbacks map[uintptr]PeerConsumer -} - -func (this *peerEvent) Attach(callback PeerConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *peerEvent) Detach(callback PeerConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *peerEvent) Trigger(p *peer.Peer) { - for _, callback := range this.callbacks { - callback(p) - } -} - -type PeerConsumer = func(p *peer.Peer) +func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*peer.Peer))(params[0].(*peer.Peer)) } diff --git a/plugins/autopeering/types/peerregister/peer_register.go b/plugins/autopeering/types/peerregister/peer_register.go index bae902d5..e2ba3823 100644 --- a/plugins/autopeering/types/peerregister/peer_register.go +++ b/plugins/autopeering/types/peerregister/peer_register.go @@ -3,6 +3,7 @@ package peerregister import ( "bytes" "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" @@ -19,9 +20,9 @@ func New() *PeerRegister { return &PeerRegister{ Peers: make(map[string]*peer.Peer), Events: peerRegisterEvents{ - Add: &peerEvent{make(map[uintptr]PeerConsumer)}, - Update: &peerEvent{make(map[uintptr]PeerConsumer)}, - Remove: &peerEvent{make(map[uintptr]PeerConsumer)}, + Add: events.NewEvent(peerCaller), + Update: events.NewEvent(peerCaller), + Remove: events.NewEvent(peerCaller), }, } } -- GitLab