diff --git a/packages/events/types.go b/packages/events/types.go index cbb2d79800f340233c896d5fbe4973b562f59fdd..8160d21325b1b894b7b0f14783103cdba0de5583 100644 --- a/packages/events/types.go +++ b/packages/events/types.go @@ -1,3 +1,5 @@ package events func CallbackCaller(handler interface{}, params ...interface{}) { handler.(func())() } + +func ErrorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } diff --git a/packages/network/events.go b/packages/network/events.go index fff2621dbc824828fdce18253e63771033dd43ec..14785389435ba77af1587639ee86340a7322805d 100644 --- a/packages/network/events.go +++ b/packages/network/events.go @@ -1,63 +1,13 @@ package network -import "reflect" +import ( + "github.com/iotaledger/goshimmer/packages/events" +) type BufferedConnectionEvents struct { - ReceiveData *dataEvent - Close *callbackEvent - Error *errorEvent + ReceiveData *events.Event + Close *events.Event + Error *events.Event } -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -type errorEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } -} - -type dataEvent struct { - callbacks map[uintptr]DataConsumer -} - -func (this *dataEvent) Attach(callback DataConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *dataEvent) Detach(callback DataConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *dataEvent) Trigger(data []byte) { - for _, callback := range this.callbacks { - callback(data) - } -} +func dataCaller(handler interface{}, params ...interface{}) { handler.(func([]byte))(params[0].([]byte)) } \ No newline at end of file diff --git a/packages/network/managed_connection.go b/packages/network/managed_connection.go index 2adcb2811a132abffb91937d99223442408c9be6..cb70a9b82d8b355740ecae19bba270ebb2430d6c 100644 --- a/packages/network/managed_connection.go +++ b/packages/network/managed_connection.go @@ -1,6 +1,7 @@ package network import ( + "github.com/iotaledger/goshimmer/packages/events" "io" "net" "sync" @@ -19,9 +20,9 @@ func NewManagedConnection(conn net.Conn) *ManagedConnection { bufferedConnection := &ManagedConnection{ Conn: conn, Events: BufferedConnectionEvents{ - ReceiveData: &dataEvent{make(map[uintptr]DataConsumer)}, - Close: &callbackEvent{make(map[uintptr]Callback)}, - Error: &errorEvent{make(map[uintptr]ErrorConsumer)}, + ReceiveData: events.NewEvent(dataCaller), + Close: events.NewEvent(events.CallbackCaller), + Error: events.NewEvent(events.ErrorCaller), }, } @@ -68,7 +69,9 @@ func (this *ManagedConnection) Close() error { this.Events.Error.Trigger(err) } - this.closeOnce.Do(this.Events.Close.Trigger) + this.closeOnce.Do(func() { + this.Events.Close.Trigger() + }) return err } diff --git a/packages/network/tcp/server.go b/packages/network/tcp/server.go index cd4069be847de2bb11cf2cfcc718b3f4707890ca..3f8a3d0c177d408523207a349722af3f6df71765 100644 --- a/packages/network/tcp/server.go +++ b/packages/network/tcp/server.go @@ -1,6 +1,7 @@ package tcp import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "net" "strconv" @@ -51,10 +52,10 @@ func (this *Server) Listen(port int) *Server { func NewServer() *Server { return &Server{ Events: serverEvents{ - Start: &callbackEvent{make(map[uintptr]Callback)}, - Shutdown: &callbackEvent{make(map[uintptr]Callback)}, - Connect: &peerConsumerEvent{make(map[uintptr]PeerConsumer)}, - Error: &errorConsumerEvent{make(map[uintptr]ErrorConsumer)}, + Start: events.NewEvent(events.CallbackCaller), + Shutdown: events.NewEvent(events.CallbackCaller), + Connect: events.NewEvent(managedConnectionCaller), + Error: events.NewEvent(events.ErrorCaller), }, } } diff --git a/packages/network/tcp/server_events.go b/packages/network/tcp/server_events.go index 85561a9f80d0b4ac1640704db22fbcb5bc4fe316..d3b6a57d6d710a11054327acc41c22670d284c5c 100644 --- a/packages/network/tcp/server_events.go +++ b/packages/network/tcp/server_events.go @@ -1,79 +1,15 @@ package tcp import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" - "reflect" ) type serverEvents struct { - Start *callbackEvent - Shutdown *callbackEvent - Connect *peerConsumerEvent - Error *errorConsumerEvent + Start *events.Event + Shutdown *events.Event + Connect *events.Event + Error *events.Event } -// region callbackEvent ///////////////////////////////////////////////////////////////////////////////////////////////// - -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region errorConsumerEvent //////////////////////////////////////////////////////////////////////////////////////////// - -type errorConsumerEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorConsumerEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorConsumerEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorConsumerEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region peerConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type peerConsumerEvent struct { - callbacks map[uintptr]PeerConsumer -} - -func (this *peerConsumerEvent) Attach(callback PeerConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *peerConsumerEvent) Detach(callback PeerConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *peerConsumerEvent) Trigger(conn *network.ManagedConnection) { - for _, callback := range this.callbacks { - callback(conn) - } -} - -// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +func managedConnectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) } diff --git a/packages/network/udp/events.go b/packages/network/udp/events.go index 0ce863860fa74063d4df934b8d5f67fe8ca2feb4..f90162998664b01977ca0c787b10b781d4bbfc15 100644 --- a/packages/network/udp/events.go +++ b/packages/network/udp/events.go @@ -1,72 +1,15 @@ package udp import ( + "github.com/iotaledger/goshimmer/packages/events" "net" - "reflect" ) -//region peerConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type callbackEvent struct { - callbacks map[uintptr]Callback -} - -func (this *callbackEvent) Attach(callback Callback) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *callbackEvent) Detach(callback Callback) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *callbackEvent) Trigger() { - for _, callback := range this.callbacks { - callback() - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -//region dataConsumerEvent ///////////////////////////////////////////////////////////////////////////////////////////// - -type dataConsumerEvent struct { - callbacks map[uintptr]AddressDataConsumer -} - -func (this *dataConsumerEvent) Attach(callback AddressDataConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *dataConsumerEvent) Detach(callback AddressDataConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *dataConsumerEvent) Trigger(addr *net.UDPAddr, data []byte) { - for _, callback := range this.callbacks { - callback(addr, data) - } -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// - -//region errorConsumerEvent //////////////////////////////////////////////////////////////////////////////////////////// - -type errorConsumerEvent struct { - callbacks map[uintptr]ErrorConsumer -} - -func (this *errorConsumerEvent) Attach(callback ErrorConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *errorConsumerEvent) Detach(callback ErrorConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *errorConsumerEvent) Trigger(err error) { - for _, callback := range this.callbacks { - callback(err) - } +type serverEvents struct { + Start *events.Event + Shutdown *events.Event + ReceiveData *events.Event + Error *events.Event } -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +func dataCaller(handler interface{}, params ...interface{}) { handler.(func(*net.UDPAddr, []byte))(params[0].(*net.UDPAddr), params[1].([]byte)) } diff --git a/packages/network/udp/server.go b/packages/network/udp/server.go index 82caae03453644aa1cb625d189b220fddb424b64..0e3a6afecba8fdf99c0941a461ebfb3e38b216da 100644 --- a/packages/network/udp/server.go +++ b/packages/network/udp/server.go @@ -1,17 +1,11 @@ package udp import ( + "github.com/iotaledger/goshimmer/packages/events" "net" "strconv" ) -type serverEvents struct { - Start *callbackEvent - Shutdown *callbackEvent - ReceiveData *dataConsumerEvent - Error *errorConsumerEvent -} - type Server struct { Socket net.PacketConn ReceiveBufferSize int @@ -55,10 +49,10 @@ func NewServer(receiveBufferSize int) *Server { return &Server{ ReceiveBufferSize: receiveBufferSize, Events: serverEvents{ - Start: &callbackEvent{make(map[uintptr]Callback)}, - Shutdown: &callbackEvent{make(map[uintptr]Callback)}, - ReceiveData: &dataConsumerEvent{make(map[uintptr]AddressDataConsumer)}, - Error: &errorConsumerEvent{make(map[uintptr]ErrorConsumer)}, + Start: events.NewEvent(events.CallbackCaller), + Shutdown: events.NewEvent(events.CallbackCaller), + ReceiveData: events.NewEvent(dataCaller), + Error: events.NewEvent(events.ErrorCaller), }, } } diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 243b45b96674e049cd273f1ca701812b7de05f86..5233feedf15d02b9c0686516265a983648cefa9f 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -3,6 +3,7 @@ package client import ( "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" @@ -59,25 +60,25 @@ func reportCurrentStatus(eventDispatchers *EventDispatchers) { func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatchers) { // define hooks //////////////////////////////////////////////////////////////////////////////////////////////////// - onDiscoverPeer := func(p *peer.Peer) { + onDiscoverPeer := events.NewClosure(func(p *peer.Peer) { eventDispatchers.AddNode(p.Identity.Identifier) - } + }) - onAddAcceptedNeighbor := func(p *peer.Peer) { + onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.ConnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) - } + }) - onRemoveAcceptedNeighbor := func(p *peer.Peer) { + onRemoveAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.DisconnectNodes(p.Identity.Identifier, accountability.OWN_ID.Identifier) - } + }) - onAddChosenNeighbor := func(p *peer.Peer) { + onAddChosenNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.ConnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) - } + }) - onRemoveChosenNeighbor := func(p *peer.Peer) { + onRemoveChosenNeighbor := events.NewClosure(func(p *peer.Peer) { eventDispatchers.DisconnectNodes(accountability.OWN_ID.Identifier, p.Identity.Identifier) - } + }) // setup hooks ///////////////////////////////////////////////////////////////////////////////////////////////////// @@ -89,8 +90,8 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch // clean up hooks on close ///////////////////////////////////////////////////////////////////////////////////////// - var onClose func() - onClose = func() { + var onClose *events.Closure + onClose = events.NewClosure(func() { knownpeers.INSTANCE.Events.Add.Detach(onDiscoverPeer) acceptedneighbors.INSTANCE.Events.Add.Detach(onAddAcceptedNeighbor) acceptedneighbors.INSTANCE.Events.Remove.Detach(onRemoveAcceptedNeighbor) @@ -98,7 +99,7 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch chosenneighbors.INSTANCE.Events.Remove.Detach(onRemoveChosenNeighbor) conn.Events.Close.Detach(onClose) - } + }) conn.Events.Close.Attach(onClose) } diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 861ffe4fe7c3be369703a67480d36bd595da5102..88beeba8b905cd45872349695995250155251c77 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -3,6 +3,7 @@ package server import ( "encoding/hex" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" @@ -21,16 +22,16 @@ var server *tcp.Server func Configure(plugin *node.Plugin) { server = tcp.NewServer() - server.Events.Connect.Attach(HandleConnection) - server.Events.Error.Attach(func(err error) { + server.Events.Connect.Attach(events.NewClosure(HandleConnection)) + server.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in server: " + err.Error()) - }) - server.Events.Start.Attach(func() { + })) + server.Events.Start.Attach(events.NewClosure(func() { plugin.LogSuccess("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ... done") - }) - server.Events.Shutdown.Attach(func() { + })) + server.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping Server ... done") - }) + })) } func Run(plugin *node.Plugin) { @@ -55,18 +56,17 @@ func HandleConnection(conn *network.ManagedConnection) { var offset int var connectedNodeId string - var onReceiveData func(data []byte) - var onDisconnect func() + var onDisconnect *events.Closure - onReceiveData = func(data []byte) { + onReceiveData := events.NewClosure(func(data []byte) { processIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset, &connectedNodeId) - } - onDisconnect = func() { + }) + onDisconnect = events.NewClosure(func() { Events.NodeOffline.Trigger(connectedNodeId) conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.Close.Detach(onDisconnect) - } + }) conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.Close.Attach(onDisconnect) diff --git a/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go index 8dbcf68f9a2f16e383f9def8d49867023cd3b805..462e9d087619262b6f90e99b3d3b8a1054beead0 100644 --- a/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go +++ b/plugins/autopeering/instances/acceptedneighbors/furthest_neighbor.go @@ -1,6 +1,7 @@ package acceptedneighbors import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "sync" ) @@ -12,14 +13,14 @@ var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) var FurthestNeighborLock sync.RWMutex func configureFurthestNeighbor() { - INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() updateFurthestNeighbor(p) - }) + })) - INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -31,7 +32,7 @@ func configureFurthestNeighbor() { updateFurthestNeighbor(furthestNeighborCandidate) } } - }) + })) } func updateFurthestNeighbor(p *peer.Peer) { diff --git a/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go index 107f9eaa772563139e2723c5a456fd6cc9b34ac0..7767ca0b5c480c5ebfba2e5da9e9a97d58d43873 100644 --- a/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go +++ b/plugins/autopeering/instances/chosenneighbors/furthest_neighbor.go @@ -1,6 +1,7 @@ package chosenneighbors import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "sync" ) @@ -12,7 +13,7 @@ var FURTHEST_NEIGHBOR_DISTANCE = uint64(0) var FurthestNeighborLock sync.RWMutex func configureFurthestNeighbor() { - INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -21,9 +22,9 @@ func configureFurthestNeighbor() { FURTHEST_NEIGHBOR = p FURTHEST_NEIGHBOR_DISTANCE = distance } - }) + })) - INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { FurthestNeighborLock.Lock() defer FurthestNeighborLock.Unlock() @@ -39,5 +40,5 @@ func configureFurthestNeighbor() { } } } - }) + })) } \ No newline at end of file diff --git a/plugins/autopeering/instances/outgoingrequest/instance.go b/plugins/autopeering/instances/outgoingrequest/instance.go index c91a9dbc04d4afaef65a40c36298d23e9b73e34d..1ffc8b96de6567113fd5523b4a037951d5b615d6 100644 --- a/plugins/autopeering/instances/outgoingrequest/instance.go +++ b/plugins/autopeering/instances/outgoingrequest/instance.go @@ -1,6 +1,7 @@ package outgoingrequest import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" @@ -16,7 +17,7 @@ func Configure(plugin *node.Plugin) { } INSTANCE.Sign() - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + saltmanager.Events.UpdatePublicSalt.Attach(events.NewClosure(func(salt *salt.Salt) { INSTANCE.Sign() - }) + })) } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 8254f34fdb6094986bb3bc5d6e7b5bb23273a3a2..81f80fe58127575761e30929a93ae53ae36812ec 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -36,24 +36,24 @@ func run(plugin *node.Plugin) { } func configureLogging(plugin *node.Plugin) { - acceptedneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - acceptedneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + })) + acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) - chosenneighbors.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - chosenneighbors.INSTANCE.Events.Remove.Attach(func(p *peer.Peer) { + })) + chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) - knownpeers.INSTANCE.Events.Add.Attach(func(p *peer.Peer) { + knownpeers.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogInfo("peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) - knownpeers.INSTANCE.Events.Update.Attach(func(p *peer.Peer) { + })) + knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier) - }) + })) } diff --git a/plugins/autopeering/protocol/outgoing_ping_processor.go b/plugins/autopeering/protocol/outgoing_ping_processor.go index 580791ac6a84ebf30ccec47253b73f95b653532f..e94a78cf9bc883aaeeb02ee2f594a64749766cb6 100644 --- a/plugins/autopeering/protocol/outgoing_ping_processor.go +++ b/plugins/autopeering/protocol/outgoing_ping_processor.go @@ -3,6 +3,7 @@ package protocol import ( "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/ownpeer" @@ -30,9 +31,9 @@ func createOutgoingPingProcessor(plugin *node.Plugin) func() { } outgoingPing.Sign() - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + saltmanager.Events.UpdatePublicSalt.Attach(events.NewClosure(func(salt *salt.Salt) { outgoingPing.Sign() - }) + })) pingPeers(plugin, outgoingPing) diff --git a/plugins/autopeering/saltmanager/events.go b/plugins/autopeering/saltmanager/events.go index b9493017916e911dae12da1d1c60bc7adc9bad89..18315773eaf443e8ebaea0a74db7c43a84f8d9aa 100644 --- a/plugins/autopeering/saltmanager/events.go +++ b/plugins/autopeering/saltmanager/events.go @@ -1,34 +1,16 @@ package saltmanager import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" - "reflect" ) -type packageEvents struct { - UpdatePublicSalt *saltEvent - UpdatePrivateSalt *saltEvent +var Events = struct { + UpdatePublicSalt *events.Event + UpdatePrivateSalt *events.Event +}{ + UpdatePublicSalt: events.NewEvent(saltCaller), + UpdatePrivateSalt: events.NewEvent(saltCaller), } -type saltEvent struct { - callbacks map[uintptr]SaltConsumer -} - -func (this *saltEvent) Attach(callback SaltConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *saltEvent) Detach(callback SaltConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *saltEvent) Trigger(salt *salt.Salt) { - for _, callback := range this.callbacks { - callback(salt) - } -} - -var Events = packageEvents{ - UpdatePublicSalt: &saltEvent{make(map[uintptr]SaltConsumer)}, - UpdatePrivateSalt: &saltEvent{make(map[uintptr]SaltConsumer)}, -} +func saltCaller(handler interface{}, params ...interface{}) { handler.(func(*salt.Salt))(params[0].(*salt.Salt)) } diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index 340e901bd982dc052a3436ab19ae7ac6e65b8daf..a18a1ebb24a9b3d03a13b07d81ab57ba2e694823 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -46,7 +46,7 @@ func getSalt(key []byte, lifetime time.Duration) *salt.Salt { } } -func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, updateCallback func(salt *salt.Salt)) { +func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, updateCallback func(params ...interface{})) { newSalt := salt.New(lifeSpan) saltToUpdate.Bytes = newSalt.Bytes @@ -61,7 +61,7 @@ func updatePublicSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time scheduleUpdateForSalt(saltToUpdate, settingsKey, lifeSpan, updateCallback) } -func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, callback SaltConsumer) { +func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan time.Duration, callback func(params ...interface{})) { now := time.Now() if saltToUpdate.ExpirationTime.Before(now) { @@ -78,7 +78,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan } } -func createSalt(settingsKey []byte, lifeSpan time.Duration, updateCallback SaltConsumer) *salt.Salt { +func createSalt(settingsKey []byte, lifeSpan time.Duration, updateCallback func(params ...interface{})) *salt.Salt { newSalt := getSalt(settingsKey, lifeSpan) scheduleUpdateForSalt(newSalt, settingsKey, lifeSpan, updateCallback) diff --git a/plugins/autopeering/saltmanager/types.go b/plugins/autopeering/saltmanager/types.go deleted file mode 100644 index 4a273e65685b26ada412b29272d8052ed88c6459..0000000000000000000000000000000000000000 --- a/plugins/autopeering/saltmanager/types.go +++ /dev/null @@ -1,5 +0,0 @@ -package saltmanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" - -type SaltConsumer = func(salt *salt.Salt) diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 7740316e94d43610e1228bd66aa682b859d2fc57..0c25eaddfd6ad1e6bb6d654032768349455a9af4 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -2,6 +2,7 @@ package tcp import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" @@ -18,20 +19,20 @@ import ( var server = tcp.NewServer() func ConfigureServer(plugin *node.Plugin) { - server.Events.Connect.Attach(HandleConnection) - server.Events.Error.Attach(func(err error) { + server.Events.Connect.Attach(events.NewClosure(HandleConnection)) + server.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in tcp server: " + err.Error()) - }) - server.Events.Start.Attach(func() { + })) + server.Events.Start.Attach(events.NewClosure(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } else { plugin.LogSuccess("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } - }) - server.Events.Shutdown.Attach(func() { + })) + server.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping TCP Server ... done") - }) + })) } func RunServer(plugin *node.Plugin) { @@ -59,9 +60,9 @@ func HandleConnection(conn *network.ManagedConnection) { var receiveBuffer []byte var offset int - conn.Events.ReceiveData.Attach(func(data []byte) { + conn.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { ProcessIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset) - }) + })) go conn.Read(make([]byte, int(math.Max(ping.MARSHALLED_TOTAL_SIZE, math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE))))) } @@ -141,9 +142,9 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, req.Issuer.Conn = conn req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - req.Issuer.Conn.Events.Close.Attach(func() { + req.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { req.Issuer.Conn = nil - }) + })) Events.ReceiveRequest.Trigger(req) } @@ -174,9 +175,9 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, res.Issuer.Conn = conn res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - res.Issuer.Conn.Events.Close.Attach(func() { + res.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { res.Issuer.Conn = nil - }) + })) Events.ReceiveResponse.Trigger(res) } @@ -207,9 +208,9 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con ping.Issuer.Conn = conn ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - ping.Issuer.Conn.Events.Close.Attach(func() { + ping.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { ping.Issuer.Conn = nil - }) + })) Events.ReceivePing.Trigger(ping) } diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index b050d205f1619f50bf290ec079f6d1cd55a5170b..a6ed7c51fe6859cdd7fed9fe4e35b26ac48f41e8 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -2,6 +2,7 @@ package udp import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" @@ -22,20 +23,20 @@ func ConfigureServer(plugin *node.Plugin) { plugin.LogFailure(err.Error()) }) - udpServer.Events.ReceiveData.Attach(processReceivedData) - udpServer.Events.Error.Attach(func(err error) { + udpServer.Events.ReceiveData.Attach(events.NewClosure(processReceivedData)) + udpServer.Events.Error.Attach(events.NewClosure(func(err error) { plugin.LogFailure("error in udp server: " + err.Error()) - }) - udpServer.Events.Start.Attach(func() { + })) + udpServer.Events.Start.Attach(events.NewClosure(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogSuccess("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } else { plugin.LogSuccess("Starting UDP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.PORT.Value) + ") ... done") } - }) - udpServer.Events.Shutdown.Attach(func() { + })) + udpServer.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogSuccess("Stopping UDP Server ... done") - }) + })) } func RunServer(plugin *node.Plugin) { diff --git a/plugins/autopeering/types/peer/peer.go b/plugins/autopeering/types/peer/peer.go index 4604559a0c537a2705834dee44585b16ae7ab94e..088ea1afa1fb24680e02df4814ed209d972f69dd 100644 --- a/plugins/autopeering/types/peer/peer.go +++ b/plugins/autopeering/types/peer/peer.go @@ -2,6 +2,7 @@ package peer import ( "encoding/binary" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/plugins/autopeering/types/salt" @@ -80,9 +81,9 @@ func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) { } else { peer.Conn = network.NewManagedConnection(conn) - peer.Conn.Events.Close.Attach(func() { + peer.Conn.Events.Close.Attach(events.NewClosure(func() { peer.Conn = nil - }) + })) return peer.Conn, true, nil } diff --git a/plugins/autopeering/types/peerregister/events.go b/plugins/autopeering/types/peerregister/events.go index 29daca8234d4945a365f202d0320793cf579255f..1bf6ca25f97e105d899bfd2683104a6ae0a63937 100644 --- a/plugins/autopeering/types/peerregister/events.go +++ b/plugins/autopeering/types/peerregister/events.go @@ -1,32 +1,14 @@ package peerregister import ( + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "reflect" ) type peerRegisterEvents struct { - Add *peerEvent - Update *peerEvent - Remove *peerEvent + Add *events.Event + Update *events.Event + Remove *events.Event } -type peerEvent struct { - callbacks map[uintptr]PeerConsumer -} - -func (this *peerEvent) Attach(callback PeerConsumer) { - this.callbacks[reflect.ValueOf(callback).Pointer()] = callback -} - -func (this *peerEvent) Detach(callback PeerConsumer) { - delete(this.callbacks, reflect.ValueOf(callback).Pointer()) -} - -func (this *peerEvent) Trigger(p *peer.Peer) { - for _, callback := range this.callbacks { - callback(p) - } -} - -type PeerConsumer = func(p *peer.Peer) +func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*peer.Peer))(params[0].(*peer.Peer)) } diff --git a/plugins/autopeering/types/peerregister/peer_register.go b/plugins/autopeering/types/peerregister/peer_register.go index bae902d5882f52b5d03224d78ce0722017b58980..e2ba382315fff7b35b2a4674f772ede140c2c832 100644 --- a/plugins/autopeering/types/peerregister/peer_register.go +++ b/plugins/autopeering/types/peerregister/peer_register.go @@ -3,6 +3,7 @@ package peerregister import ( "bytes" "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" @@ -19,9 +20,9 @@ func New() *PeerRegister { return &PeerRegister{ Peers: make(map[string]*peer.Peer), Events: peerRegisterEvents{ - Add: &peerEvent{make(map[uintptr]PeerConsumer)}, - Update: &peerEvent{make(map[uintptr]PeerConsumer)}, - Remove: &peerEvent{make(map[uintptr]PeerConsumer)}, + Add: events.NewEvent(peerCaller), + Update: events.NewEvent(peerCaller), + Remove: events.NewEvent(peerCaller), }, } }