diff --git a/packages/accountability/accountability.go b/packages/accountability/accountability.go index e581796192e6fd2d5ed4f3462a2f678863cb9886..0a24e9b8b362a3e6bbcb370206d298e0e6b2cd34 100644 --- a/packages/accountability/accountability.go +++ b/packages/accountability/accountability.go @@ -6,7 +6,7 @@ import ( "github.com/iotaledger/goshimmer/packages/identity" ) -var OWN_ID *identity.Identity +var OWN_ID = getIdentity() func generateNewIdentity() *identity.Identity { newIdentity := identity.GenerateRandomIdentity() @@ -43,7 +43,3 @@ func getIdentity() *identity.Identity { return identity.NewIdentity(publicKey, privateKey) } - -func init() { - OWN_ID = getIdentity() -} \ No newline at end of file diff --git a/packages/network/events.go b/packages/network/events.go new file mode 100644 index 0000000000000000000000000000000000000000..188ddea1a4e12f7dde7a7d2b3792cbc9f9f65a56 --- /dev/null +++ b/packages/network/events.go @@ -0,0 +1,63 @@ +package network + +import "reflect" + +type BufferedConnectionEvents struct { + ReceiveData *dataEvent + Close *callbackEvent + Error *errorEvent +} + +type callbackEvent struct { + callbacks map[uintptr]Callback +} + +func (this *callbackEvent) Attach(callback Callback) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *callbackEvent) Detach(callback Callback) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *callbackEvent) Trigger() { + for _, callback := range this.callbacks { + callback() + } +} + +type errorEvent struct { + callbacks map[uintptr]ErrorConsumer +} + +func (this *errorEvent) Attach(callback ErrorConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *errorEvent) Detach(callback ErrorConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *errorEvent) Trigger(err error) { + for _, callback := range this.callbacks { + callback(err) + } +} + +type dataEvent struct { + callbacks map[uintptr]DataConsumer +} + +func (this *dataEvent) Attach(callback DataConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *dataEvent) Detach(callback ErrorConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *dataEvent) Trigger(data []byte) { + for _, callback := range this.callbacks { + callback(data) + } +} diff --git a/packages/network/interfaces.go b/packages/network/interfaces.go deleted file mode 100644 index 7590e112572aab36cbd741a2ed6877a19e18b116..0000000000000000000000000000000000000000 --- a/packages/network/interfaces.go +++ /dev/null @@ -1,27 +0,0 @@ -package network - -import ( - "net" - "time" -) - -type Connection interface { - GetProtocol() string - GetConnection() net.Conn - Write(data []byte) - OnReceiveData(callback DataConsumer) Connection - OnDisconnect(callback Callback) Connection - OnError(callback ErrorConsumer) Connection - TriggerReceiveData(data []byte) Connection - TriggerDisconnect() Connection - TriggerError(err error) Connection - SetTimeout(duration time.Duration) Connection - HandleConnection() -} - -type Callback func() - -type ErrorConsumer func(err error) - -type DataConsumer func(data []byte) - diff --git a/packages/network/managed_connection.go b/packages/network/managed_connection.go new file mode 100644 index 0000000000000000000000000000000000000000..2adcb2811a132abffb91937d99223442408c9be6 --- /dev/null +++ b/packages/network/managed_connection.go @@ -0,0 +1,154 @@ +package network + +import ( + "io" + "net" + "sync" + "time" +) + +type ManagedConnection struct { + Conn net.Conn + Events BufferedConnectionEvents + readTimeout time.Duration + writeTimeout time.Duration + closeOnce sync.Once +} + +func NewManagedConnection(conn net.Conn) *ManagedConnection { + bufferedConnection := &ManagedConnection{ + Conn: conn, + Events: BufferedConnectionEvents{ + ReceiveData: &dataEvent{make(map[uintptr]DataConsumer)}, + Close: &callbackEvent{make(map[uintptr]Callback)}, + Error: &errorEvent{make(map[uintptr]ErrorConsumer)}, + }, + } + + return bufferedConnection +} + +func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) { + defer this.Close() + + totalReadBytes := 0 + for { + if err := this.setReadTimeoutBasedDeadline(); err != nil { + return totalReadBytes, err + } + + byteCount, err := this.Conn.Read(receiveBuffer) + if err != nil { + if err != io.EOF { + this.Events.Error.Trigger(err) + } + + return totalReadBytes, err + } + totalReadBytes += byteCount + + receivedData := make([]byte, byteCount) + copy(receivedData, receiveBuffer) + + this.Events.ReceiveData.Trigger(receivedData) + } +} + +func (this *ManagedConnection) Write(data []byte) (n int, err error) { + if err := this.setWriteTimeoutBasedDeadline(); err != nil { + return 0, err + } + + return this.Conn.Write(data) +} + +func (this *ManagedConnection) Close() error { + err := this.Conn.Close() + if err != nil { + this.Events.Error.Trigger(err) + } + + this.closeOnce.Do(this.Events.Close.Trigger) + + return err +} + +func (this *ManagedConnection) LocalAddr() net.Addr { + return this.Conn.LocalAddr() +} + +func (this *ManagedConnection) RemoteAddr() net.Addr { + return this.Conn.RemoteAddr() +} + +func (this *ManagedConnection) SetDeadline(t time.Time) error { + return this.Conn.SetDeadline(t) +} + +func (this *ManagedConnection) SetReadDeadline(t time.Time) error { + return this.Conn.SetReadDeadline(t) +} + +func (this *ManagedConnection) SetWriteDeadline(t time.Time) error { + return this.Conn.SetWriteDeadline(t) +} + +func (this *ManagedConnection) SetTimeout(d time.Duration) error { + if err := this.SetReadTimeout(d); err != nil { + return err + } + + if err := this.SetWriteTimeout(d); err != nil { + return err + } + + return nil +} + +func (this *ManagedConnection) SetReadTimeout(d time.Duration) error { + this.readTimeout = d + + if err := this.setReadTimeoutBasedDeadline(); err != nil { + return err + } + + return nil +} + +func (this *ManagedConnection) SetWriteTimeout(d time.Duration) error { + this.writeTimeout = d + + if err := this.setWriteTimeoutBasedDeadline(); err != nil { + return err + } + + return nil +} + +func (this *ManagedConnection) setReadTimeoutBasedDeadline() error { + if this.readTimeout != 0 { + if err := this.Conn.SetReadDeadline(time.Now().Add(this.readTimeout)); err != nil { + return err + } + } else { + if err := this.Conn.SetReadDeadline(time.Time{}); err != nil { + return err + } + } + + return nil +} + +func (this *ManagedConnection) setWriteTimeoutBasedDeadline() error { + if this.writeTimeout != 0 { + if err := this.Conn.SetWriteDeadline(time.Now().Add(this.writeTimeout)); err != nil { + return err + } + } else { + if err := this.Conn.SetWriteDeadline(time.Time{}); err != nil { + return err + } + } + + return nil +} diff --git a/packages/network/peer.go b/packages/network/peer.go deleted file mode 100644 index 7035a804371be58d3ce1da13aa7361cddcd3062d..0000000000000000000000000000000000000000 --- a/packages/network/peer.go +++ /dev/null @@ -1,118 +0,0 @@ -package network - -import ( - "io" - "net" - "time" -) - -type peerImplementation struct { - timeout time.Duration - protocol string - conn net.Conn - receiveDataHandlers []DataConsumer - disconnectHandlers []Callback - errorHandlers []ErrorConsumer -} - -func NewPeer(protocol string, conn net.Conn) Connection { - this := &peerImplementation{ - protocol: protocol, - conn: conn, - receiveDataHandlers: make([]DataConsumer, 0), - disconnectHandlers: make([]Callback, 0), - errorHandlers: make([]ErrorConsumer, 0), - } - - return this -} - -func (this *peerImplementation) SetTimeout(duration time.Duration) Connection { - this.timeout = duration - - //this.conn.SetDeadline(time.Now().Add(this.timeout)) - - return this -} - -func (this *peerImplementation) GetProtocol() string { - return this.protocol -} - -func (this *peerImplementation) GetConnection() net.Conn { - return this.conn -} - -func (this *peerImplementation) Write(data []byte) { - //this.conn.SetDeadline(time.Now().Add(this.timeout)) - - if _, err := this.conn.Write(data); err != nil { - this.TriggerError(err) - } -} - -func (this *peerImplementation) OnReceiveData(callback DataConsumer) Connection { - this.receiveDataHandlers = append(this.receiveDataHandlers, callback) - - return this -} - -func (this *peerImplementation) OnDisconnect(callback Callback) Connection { - this.disconnectHandlers = append(this.disconnectHandlers, callback) - - return this -} - -func (this *peerImplementation) OnError(callback ErrorConsumer) Connection { - this.errorHandlers = append(this.errorHandlers, callback) - - return this -} - -func (this *peerImplementation) TriggerReceiveData(data []byte) Connection { - for _, receiveDataHandler := range this.receiveDataHandlers { - receiveDataHandler(data) - } - - return this -} - -func (this *peerImplementation) TriggerDisconnect() Connection { - for _, disconnectHandler := range this.disconnectHandlers { - disconnectHandler() - } - - return this -} - -func (this *peerImplementation) TriggerError(err error) Connection { - for _, errorHandler := range this.errorHandlers { - errorHandler(err) - } - - return this -} - -func (this *peerImplementation) HandleConnection() { - defer this.conn.Close() - defer this.TriggerDisconnect() - - receiveBuffer := make([]byte, READ_BUFFER_SIZE) - for { - //this.conn.SetDeadline(time.Now().Add(this.timeout)) - - byteCount, err := this.conn.Read(receiveBuffer) - if err != nil { - if err != io.EOF { - this.TriggerError(err) - } - - return - } - - receivedData := make([]byte, byteCount) - copy(receivedData, receiveBuffer) - - this.TriggerReceiveData(receivedData) - } -} diff --git a/packages/network/tcp/server.go b/packages/network/tcp/server.go index 51ec4bbab23d0409699941acee3061f254e6afc6..cd4069be847de2bb11cf2cfcc718b3f4707890ca 100644 --- a/packages/network/tcp/server.go +++ b/packages/network/tcp/server.go @@ -39,7 +39,7 @@ func (this *Server) Listen(port int) *Server { this.Events.Error.Trigger(err) } } else { - peer := network.NewPeer("tcp", socket) + peer := network.NewManagedConnection(socket) go this.Events.Connect.Trigger(peer) } diff --git a/packages/network/tcp/server_events.go b/packages/network/tcp/server_events.go index 27a3af2666bb2f8358bbf0bd08200c33c5ab7f0a..85561a9f80d0b4ac1640704db22fbcb5bc4fe316 100644 --- a/packages/network/tcp/server_events.go +++ b/packages/network/tcp/server_events.go @@ -70,9 +70,9 @@ func (this *peerConsumerEvent) Detach(callback PeerConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *peerConsumerEvent) Trigger(peer network.Connection) { +func (this *peerConsumerEvent) Trigger(conn *network.ManagedConnection) { for _, callback := range this.callbacks { - callback(peer) + callback(conn) } } diff --git a/packages/network/tcp/types.go b/packages/network/tcp/types.go index 6eb9d5d16eae568e8849a5d4adba75f6016ccb88..44856584f1590f95f495fe2b6a64a585fe26d008 100644 --- a/packages/network/tcp/types.go +++ b/packages/network/tcp/types.go @@ -6,4 +6,4 @@ type Callback = func() type ErrorConsumer = func(e error) -type PeerConsumer = func(peer network.Connection) +type PeerConsumer = func(conn *network.ManagedConnection) diff --git a/packages/network/types.go b/packages/network/types.go new file mode 100644 index 0000000000000000000000000000000000000000..2b985b7d83101a7992c097f25f2c97552ebe3ef1 --- /dev/null +++ b/packages/network/types.go @@ -0,0 +1,8 @@ +package network + +type Callback func() + +type ErrorConsumer func(err error) + +type DataConsumer func(data []byte) + diff --git a/packages/network/udp/server.go b/packages/network/udp/server.go index 08f265ceac5d9d00f192033947458ece3a037670..82caae03453644aa1cb625d189b220fddb424b64 100644 --- a/packages/network/udp/server.go +++ b/packages/network/udp/server.go @@ -2,6 +2,7 @@ package udp import ( "net" + "strconv" ) type serverEvents struct { @@ -12,8 +13,9 @@ type serverEvents struct { } type Server struct { - Socket *net.UDPConn - Events serverEvents + Socket net.PacketConn + ReceiveBufferSize int + Events serverEvents } func (this *Server) Shutdown() { @@ -26,10 +28,7 @@ func (this *Server) Shutdown() { } func (this *Server) Listen(address string, port int) { - if socket, err := net.ListenUDP("udp", &net.UDPAddr{ - Port: port, - IP: net.ParseIP(address), - }); err != nil { + if socket, err := net.ListenPacket("udp", address + ":" + strconv.Itoa(port)); err != nil { this.Events.Error.Trigger(err) return @@ -40,20 +39,21 @@ func (this *Server) Listen(address string, port int) { this.Events.Start.Trigger() defer this.Events.Shutdown.Trigger() - buf := make([]byte, 1500) + buf := make([]byte, this.ReceiveBufferSize) for this.Socket != nil { - if bytesRead, addr, err := this.Socket.ReadFromUDP(buf); err != nil { + if bytesRead, addr, err := this.Socket.ReadFrom(buf); err != nil { if this.Socket != nil { this.Events.Error.Trigger(err) } } else { - this.Events.ReceiveData.Trigger(addr, buf[:bytesRead]) + this.Events.ReceiveData.Trigger(addr.(*net.UDPAddr), buf[:bytesRead]) } } } -func NewServer() *Server { +func NewServer(receiveBufferSize int) *Server { return &Server{ + ReceiveBufferSize: receiveBufferSize, Events: serverEvents{ Start: &callbackEvent{make(map[uintptr]Callback)}, Shutdown: &callbackEvent{make(map[uintptr]Callback)}, diff --git a/packages/node/constants.go b/packages/node/constants.go index 4b4ea4fae61c166f5c9a5f21be64b7717b443b31..52ad490ffebffbf795c62773d1d121bbc67f7911 100644 --- a/packages/node/constants.go +++ b/packages/node/constants.go @@ -5,4 +5,5 @@ const ( LOG_LEVEL_WARNING = 1 LOG_LEVEL_SUCCESS = 2 LOG_LEVEL_INFO = 3 + LOG_LEVEL_DEBUG = 4 ) diff --git a/packages/node/logger.go b/packages/node/logger.go index a2665c1eac539d1274f829302053e5eb0763dbf4..6a206f888e9af0528c27b7e61c77093e4edd5dfa 100644 --- a/packages/node/logger.go +++ b/packages/node/logger.go @@ -8,6 +8,7 @@ type Logger struct { LogSuccess func(pluginName string, message string) LogWarning func(pluginName string, message string) LogFailure func(pluginName string, message string) + LogDebug func(pluginName string, message string) } func pluginPrefix(pluginName string) string { @@ -35,4 +36,7 @@ var DEFAULT_LOGGER = &Logger{ LogFailure: func(pluginName string, message string) { fmt.Println("[ FAIL ] " + pluginPrefix(pluginName) + message) }, + LogDebug: func(pluginName string, message string) { + fmt.Println("[ NOTE ] " + pluginPrefix(pluginName) + message) + }, } diff --git a/packages/node/node.go b/packages/node/node.go index bbbf7f85992ea6053fc146f611ab89259dae5111..e5ff4299b95d37fb09310d2c9d3e0f5343d03732 100644 --- a/packages/node/node.go +++ b/packages/node/node.go @@ -73,6 +73,16 @@ func (node *Node) LogInfo(pluginName string, message string) { } } +func (node *Node) LogDebug(pluginName string, message string) { + if node.logLevel >= LOG_LEVEL_DEBUG { + for _, logger := range node.loggers { + if logger.Enabled { + logger.LogDebug(pluginName, message) + } + } + } +} + func (node *Node) LogWarning(pluginName string, message string) { if node.logLevel >= LOG_LEVEL_WARNING { for _, logger := range node.loggers { @@ -98,9 +108,6 @@ func (node *Node) Load(plugins ...*Plugin) { if len(plugins) >= 1 { for _, plugin := range plugins { - fmt.Println(*DISABLE_PLUGINS.Value) - fmt.Println(disabledPlugins) - fmt.Println(strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))) if _, exists := disabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists { plugin.wg = node.wg plugin.Node = node diff --git a/packages/node/parameters.go b/packages/node/parameters.go index 9cf6276e1da7fc65a661ad745b1ade5c037c76dd..23d890d9b3b91a20b3fe45d2ffcabfe0a873757e 100644 --- a/packages/node/parameters.go +++ b/packages/node/parameters.go @@ -3,6 +3,6 @@ package node import "github.com/iotaledger/goshimmer/packages/parameter" var ( - LOG_LEVEL = parameter.AddInt("NODE/LOG_LEVEL", LOG_LEVEL_SUCCESS, "controls the log types that are shown") + LOG_LEVEL = parameter.AddInt("NODE/LOG_LEVEL", LOG_LEVEL_INFO, "controls the log types that are shown") DISABLE_PLUGINS = parameter.AddString("NODE/DISABLE_PLUGINS", "", "a list of plugins that shall be disabled") ) diff --git a/packages/node/plugin.go b/packages/node/plugin.go index 5ebf45df451238185072ce1b817b6d2337478d38..f494745f3de5061d06c863cc063463fc04125d9e 100644 --- a/packages/node/plugin.go +++ b/packages/node/plugin.go @@ -49,3 +49,7 @@ func (plugin *Plugin) LogWarning(message string) { func (plugin *Plugin) LogFailure(message string) { plugin.Node.LogFailure(plugin.Name, message) } + +func (plugin *Plugin) LogDebug(message string) { + plugin.Node.LogDebug(plugin.Name, message) +} diff --git a/plugins/autopeering/parameters/parameters.go b/plugins/autopeering/parameters/parameters.go index 63014eda145c558ab2740ea7e46bbcac863beb05..f4cceaa347ab3d8d0ba408c8bd089d4c0075fa3b 100644 --- a/plugins/autopeering/parameters/parameters.go +++ b/plugins/autopeering/parameters/parameters.go @@ -4,6 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter" var ( ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests") - UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests") ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering") + TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests") + UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests") ) diff --git a/plugins/autopeering/peermanager/accepted_neighbors.go b/plugins/autopeering/peermanager/accepted_neighbors.go new file mode 100644 index 0000000000000000000000000000000000000000..ff1e87ed1cb2fbff2855cea5a936548eae996196 --- /dev/null +++ b/plugins/autopeering/peermanager/accepted_neighbors.go @@ -0,0 +1,5 @@ +package peermanager + +import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + +var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/chosen_neighbors.go b/plugins/autopeering/peermanager/chosen_neighbors.go new file mode 100644 index 0000000000000000000000000000000000000000..595d857104197067fc90db5bdc351d8efe2bcc98 --- /dev/null +++ b/plugins/autopeering/peermanager/chosen_neighbors.go @@ -0,0 +1,5 @@ +package peermanager + +import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + +var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/constants.go b/plugins/autopeering/peermanager/constants.go index 80fa7543ab834406b25ad6b1356253ec6b7e113b..df72cfb2ee2ccdd4dce8b7d96a83e4a3f79b4de3 100644 --- a/plugins/autopeering/peermanager/constants.go +++ b/plugins/autopeering/peermanager/constants.go @@ -1,12 +1,9 @@ package peermanager import ( - "github.com/iotaledger/goshimmer/packages/identity" "time" ) const ( FIND_NEIGHBOR_INTERVAL = 5 * time.Second ) - -var UNKNOWN_IDENTITY = identity.GenerateRandomIdentity() diff --git a/plugins/autopeering/peermanager/entry_nodes.go b/plugins/autopeering/peermanager/entry_nodes.go index 799192128b0019887dbafdfb9255258ff15edb13..e5d6b27abba4f65f64b411b80456e04439f3afb4 100644 --- a/plugins/autopeering/peermanager/entry_nodes.go +++ b/plugins/autopeering/peermanager/entry_nodes.go @@ -3,21 +3,24 @@ package peermanager import ( "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" "net" "strconv" "strings" ) -func getEntryNodes() []*protocol.Peer { - result := make([]*protocol.Peer, 0) +var ENTRY_NODES = parseEntryNodes() + +func parseEntryNodes() []*peer.Peer { + result := make([]*peer.Peer, 0) for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) { if entryNodeDefinition == "" { continue } - entryNode := &protocol.Peer{ - Identity: UNKNOWN_IDENTITY, + entryNode := &peer.Peer{ + Identity: nil, } protocolBits := strings.Split(entryNodeDefinition, "://") @@ -26,9 +29,9 @@ func getEntryNodes() []*protocol.Peer { } switch protocolBits[0] { case "tcp": - entryNode.PeeringProtocolType = protocol.TCP_PROTOCOL + entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_TCP case "udp": - entryNode.PeeringProtocolType = protocol.UDP_PROTOCOL + entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_UDP } addressBits := strings.Split(protocolBits[1], ":") @@ -70,5 +73,3 @@ func getEntryNodes() []*protocol.Peer { return result } - -var ENTRY_NODES = getEntryNodes() \ No newline at end of file diff --git a/plugins/autopeering/peermanager/known_peers.go b/plugins/autopeering/peermanager/known_peers.go new file mode 100644 index 0000000000000000000000000000000000000000..d8aeba0ad6335f5dc88b7839c204d303bec71e1b --- /dev/null +++ b/plugins/autopeering/peermanager/known_peers.go @@ -0,0 +1,5 @@ +package peermanager + +import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + +var KNOWN_PEERS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/peer_list.go b/plugins/autopeering/peermanager/peer_list.go index f7a4a73313e6f7f596ef815a35b4de7b45f3f48f..c8add98070a69cb6a09c5b28f8b42ef7165e3d66 100644 --- a/plugins/autopeering/peermanager/peer_list.go +++ b/plugins/autopeering/peermanager/peer_list.go @@ -3,41 +3,36 @@ package peermanager import ( "bytes" "github.com/iotaledger/goshimmer/packages/accountability" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "time" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" ) type PeerList struct { - Peers map[string]*protocol.Peer + Peers map[string]*peer.Peer } -func (this *PeerList) Update(peer *protocol.Peer) { - if peer.Identity == UNKNOWN_IDENTITY || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { - return +func (this *PeerList) Update(peer *peer.Peer) bool { + if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { + return false } - now := time.Now() - if existingPeer, exists := this.Peers[peer.Identity.StringIdentifier]; exists { existingPeer.Address = peer.Address existingPeer.GossipPort = peer.GossipPort existingPeer.PeeringPort = peer.PeeringPort - existingPeer.LastSeen = now - existingPeer.LastContact = now // trigger update peer - } else { - peer.FirstSeen = now - peer.LastSeen = now - peer.LastContact = now + return false + } else { this.Peers[peer.Identity.StringIdentifier] = peer // trigger add peer + + return true } } -func (this *PeerList) Add(peer *protocol.Peer) { +func (this *PeerList) Add(peer *peer.Peer) { this.Peers[peer.Identity.StringIdentifier] = peer } diff --git a/plugins/autopeering/peermanager/peer_manager.go b/plugins/autopeering/peermanager/peer_manager.go index 62253cf5cd25d298f9a69102c88859a1925186e7..b9979c53b9dda4c7ea13cb0dce6b782ff98788b0 100644 --- a/plugins/autopeering/peermanager/peer_manager.go +++ b/plugins/autopeering/peermanager/peer_manager.go @@ -1,187 +1,240 @@ package peermanager import ( - "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/salt" - "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" - "github.com/iotaledger/goshimmer/plugins/autopeering/server" - "math" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" "net" "strconv" "time" ) -var PEERING_REQUEST *protocol.PeeringRequest - -var KNOWN_PEERS = &PeerList{make(map[string]*protocol.Peer)} - -var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*protocol.Peer)} - -var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*protocol.Peer)} - -func configurePeeringRequest() { - PEERING_REQUEST = &protocol.PeeringRequest{ - Issuer: &protocol.Peer{ - Identity: accountability.OWN_ID, - PeeringProtocolType: protocol.TCP_PROTOCOL, - PeeringPort: uint16(*parameters.UDP_PORT.Value), - GossipProtocolType: protocol.TCP_PROTOCOL, - GossipPort: uint16(*parameters.UDP_PORT.Value), - Address: net.IPv4(0, 0, 0, 0), - }, - Salt: saltmanager.PUBLIC_SALT, - } - PEERING_REQUEST.Sign() - - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { - PEERING_REQUEST.Sign() - }) -} - func Configure(plugin *node.Plugin) { configurePeeringRequest() - server.Events.ReceivePeeringRequest.Attach(func(ip net.IP, peeringRequest *protocol.PeeringRequest) { - peer := peeringRequest.Issuer - peer.Address = ip - - KNOWN_PEERS.Update(peer) - - plugin.LogInfo("received peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) + // setup processing of peering requests + udp.Events.ReceiveRequest.Attach(func(peeringRequest *request.Request) { + processPeeringRequest(plugin, peeringRequest, nil) + }) + tcp.Events.ReceiveRequest.Attach(func(conn *network.ManagedConnection, peeringRequest *request.Request) { + processPeeringRequest(plugin, peeringRequest, conn) }) - server.Events.ReceiveTCPPeeringRequest.Attach(func(conn network.Connection, request *protocol.PeeringRequest) { - peer := request.Issuer - peer.Address = conn.GetConnection().RemoteAddr().(*net.TCPAddr).IP - - KNOWN_PEERS.Update(peer) - plugin.LogInfo("received peering request from " + request.Issuer.Identity.StringIdentifier) + // setup processing of peering responses + udp.Events.ReceiveResponse.Attach(func(peeringResponse *response.Response) { + processPeeringResponse(plugin, peeringResponse, nil) + }) + tcp.Events.ReceiveResponse.Attach(func(conn *network.ManagedConnection, peeringResponse *response.Response) { + processPeeringResponse(plugin, peeringResponse, conn) + }) - sendPeeringResponse(conn.GetConnection()) + udp.Events.Error.Attach(func(ip net.IP, err error) { + plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error()) }) - server.Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogFailure("invalid peering request from " + ip.String()) + tcp.Events.Error.Attach(func(ip net.IP, err error) { + plugin.LogDebug("invalid peering request from " + ip.String() + ": " + err.Error()) }) } func Run(plugin *node.Plugin) { + // setup background worker that contacts "chosen neighbors" daemon.BackgroundWorker(func() { - chooseNeighbors(plugin) + plugin.LogInfo("Starting Peer Manager ...") + plugin.LogSuccess("Starting Peer Manager ... done") + + sendPeeringRequests(plugin) ticker := time.NewTicker(FIND_NEIGHBOR_INTERVAL) for { select { - case <- daemon.ShutdownSignal: + case <-daemon.ShutdownSignal: return - case <- ticker.C: - chooseNeighbors(plugin) + case <-ticker.C: + sendPeeringRequests(plugin) } } }) } -func Shutdown(plugin *node.Plugin) {} +func Shutdown(plugin *node.Plugin) { + plugin.LogInfo("Stopping Peer Manager ...") + plugin.LogSuccess("Stopping Peer Manager ... done") +} -func generateProposedNodeCandidates() []*protocol.Peer { - peers := make([]*protocol.Peer, 0) - for _, peer := range KNOWN_PEERS.Peers { - peers = append(peers, peer) +func processPeeringRequest(plugin *node.Plugin, peeringRequest *request.Request, conn net.Conn) { + if KNOWN_PEERS.Update(peeringRequest.Issuer) { + plugin.LogInfo("new peer detected: " + peeringRequest.Issuer.Address.String() + " / " + peeringRequest.Issuer.Identity.StringIdentifier) } - return peers + if conn == nil { + plugin.LogDebug("received UDP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) + + var protocolString string + switch peeringRequest.Issuer.PeeringProtocolType { + case protocol.PROTOCOL_TYPE_TCP: + protocolString = "tcp" + case protocol.PROTOCOL_TYPE_UDP: + protocolString = "udp" + default: + plugin.LogFailure("unsupported peering protocol in request from " + peeringRequest.Issuer.Address.String()) + + return + } + + var err error + conn, err = net.Dial(protocolString, peeringRequest.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringRequest.Issuer.PeeringPort))) + if err != nil { + plugin.LogDebug("error when connecting to " + peeringRequest.Issuer.Address.String() + " during peering process: " + err.Error()) + + return + } + } else { + plugin.LogDebug("received TCP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) + } + + sendFittingPeeringResponse(conn) +} + +func processPeeringResponse(plugin *node.Plugin, peeringResponse *response.Response, conn *network.ManagedConnection) { + if KNOWN_PEERS.Update(peeringResponse.Issuer) { + plugin.LogInfo("new peer detected: " + peeringResponse.Issuer.Address.String() + " / " + peeringResponse.Issuer.Identity.StringIdentifier) + } + for _, peer := range peeringResponse.Peers { + if KNOWN_PEERS.Update(peer) { + plugin.LogInfo("new peer detected: " + peer.Address.String() + " / " + peer.Identity.StringIdentifier) + } + } + + if conn == nil { + plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) + } else { + plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) + + conn.Close() + } + + switch peeringResponse.Type { + case response.TYPE_ACCEPT: + CHOSEN_NEIGHBORS.Update(peeringResponse.Issuer) + case response.TYPE_REJECT: + default: + plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) + } } -func rejectPeeringRequest(conn net.Conn) { - conn.Write((&protocol.PeeringResponse{ - Type: protocol.PEERING_RESPONSE_REJECT, +func sendFittingPeeringResponse(conn net.Conn) { + var peeringResponse *response.Response + if len(ACCEPTED_NEIGHBORS.Peers) < protocol.NEIGHBOR_COUNT/2 { + peeringResponse = generateAcceptResponse() + } else { + peeringResponse = generateRejectResponse() + } + + peeringResponse.Sign() + + conn.Write(peeringResponse.Marshal()) + conn.Close() +} + +func generateAcceptResponse() *response.Response { + peeringResponse := &response.Response{ + Type: response.TYPE_ACCEPT, Issuer: PEERING_REQUEST.Issuer, Peers: generateProposedNodeCandidates(), - }).Sign().Marshal()) - conn.Close() + } + + return peeringResponse } -func acceptPeeringRequest(conn net.Conn) { - conn.Write((&protocol.PeeringResponse{ - Type: protocol.PEERING_RESPONSE_ACCEPT, +func generateRejectResponse() *response.Response { + peeringResponse := &response.Response{ + Type: response.TYPE_REJECT, Issuer: PEERING_REQUEST.Issuer, Peers: generateProposedNodeCandidates(), - }).Sign().Marshal()) - conn.Close() + } + + return peeringResponse } -func sendPeeringResponse(conn net.Conn) { - if len(ACCEPTED_NEIGHBORS.Peers) < protocol.NEIGHBOR_COUNT / 2 { - acceptPeeringRequest(conn) - } else { - rejectPeeringRequest(conn) + +func generateProposedNodeCandidates() []*peer.Peer { + peers := make([]*peer.Peer, 0) + for _, peer := range KNOWN_PEERS.Peers { + peers = append(peers, peer) + } + + return peers +} + +//region PEERING REQUEST RELATED METHODS /////////////////////////////////////////////////////////////////////////////// + +func sendPeeringRequests(plugin *node.Plugin) { + for _, peer := range getChosenNeighborCandidates() { + sendPeeringRequest(plugin, peer) } } -func sendPeeringRequest(plugin *node.Plugin, peer *protocol.Peer) { - var protocolString string +func sendPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { switch peer.PeeringProtocolType { - case protocol.TCP_PROTOCOL: - protocolString = "tcp" - case protocol.UDP_PROTOCOL: - protocolString = "udp" + case protocol.PROTOCOL_TYPE_TCP: + sendTCPPeeringRequest(plugin, peer) + + case protocol.PROTOCOL_TYPE_UDP: + sendUDPPeeringRequest(plugin, peer) + default: panic("invalid protocol in known peers") } +} - conn, err := net.Dial(protocolString, peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort))) - if err != nil { - plugin.LogFailure(err.Error()) - } else { - conn := network.NewPeer(protocolString, conn) - - conn.Write(PEERING_REQUEST.Marshal()) - - buffer := make([]byte, protocol.PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE) - offset := 0 - conn.OnReceiveData(func(data []byte) { - remainingCapacity := int(math.Min(float64(protocol.PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE - offset), float64(len(data)))) +func sendTCPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { + go func() { + tcpConnection, err := net.Dial("tcp", peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort))) + if err != nil { + plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) + } else { + mConn := network.NewManagedConnection(tcpConnection) - copy(buffer[offset:], data[:remainingCapacity]) - offset += len(data) + plugin.LogDebug("sending TCP peering request to " + peer.String()) - if offset >= protocol.PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE { - peeringResponse, err := protocol.UnmarshalPeeringResponse(buffer) - if err != nil { - plugin.LogFailure("invalid peering response from " + conn.GetConnection().RemoteAddr().String()) - } else { - processPeeringResponse(plugin, peeringResponse) - } + if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { + plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) - conn.GetConnection().Close() + return } - }) - go conn.HandleConnection() - } + tcp.HandleConnection(mConn) + } + }() } -func processPeeringResponse(plugin *node.Plugin, response *protocol.PeeringResponse) { - KNOWN_PEERS.Update(response.Issuer) - for _, peer := range response.Peers { - KNOWN_PEERS.Update(peer) - } +func sendUDPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { + go func() { + udpConnection, err := net.Dial("udp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) + if err != nil { + plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) + } else { + mConn := network.NewManagedConnection(udpConnection) - switch response.Type { - case protocol.PEERING_RESPONSE_ACCEPT: - CHOSEN_NEIGHBORS.Update(response.Issuer) - case protocol.PEERING_RESPONSE_REJECT: - default: - plugin.LogInfo("invalid response type in peering response of " + response.Issuer.Address.String() + ":" + strconv.Itoa(int(response.Issuer.PeeringPort))) - } + if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { + plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) + + return + } + + // setup listener for incoming responses + } + }() } -func getChosenNeighborCandidates() []*protocol.Peer { - result := make([]*protocol.Peer, 0) +func getChosenNeighborCandidates() []*peer.Peer { + result := make([]*peer.Peer, 0) for _, peer := range KNOWN_PEERS.Peers { result = append(result, peer) @@ -194,8 +247,4 @@ func getChosenNeighborCandidates() []*protocol.Peer { return result } -func chooseNeighbors(plugin *node.Plugin) { - for _, peer := range getChosenNeighborCandidates() { - sendPeeringRequest(plugin, peer) - } -} +//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// \ No newline at end of file diff --git a/plugins/autopeering/peermanager/peering_request.go b/plugins/autopeering/peermanager/peering_request.go new file mode 100644 index 0000000000000000000000000000000000000000..4475b6f42b8a57fc74d8c1dac91e3fa57da71a13 --- /dev/null +++ b/plugins/autopeering/peermanager/peering_request.go @@ -0,0 +1,33 @@ +package peermanager + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "net" +) + +var PEERING_REQUEST *request.Request + +func configurePeeringRequest() { + PEERING_REQUEST = &request.Request{ + Issuer: &peer.Peer{ + Identity: accountability.OWN_ID, + PeeringProtocolType: protocol.PROTOCOL_TYPE_TCP, + PeeringPort: uint16(*parameters.UDP_PORT.Value), + GossipProtocolType: protocol.PROTOCOL_TYPE_TCP, + GossipPort: uint16(*parameters.UDP_PORT.Value), + Address: net.IPv4(0, 0, 0, 0), + }, + Salt: saltmanager.PUBLIC_SALT, + } + PEERING_REQUEST.Sign() + + saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + PEERING_REQUEST.Sign() + }) +} diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/plugin.go similarity index 100% rename from plugins/autopeering/autopeering.go rename to plugins/autopeering/plugin.go diff --git a/plugins/autopeering/protocol/constants.go b/plugins/autopeering/protocol/constants.go index 5e677f1691436bba0e453fb5fa9e0b3ffa212fe6..5405fa58a93d3e0a0eea8e5b7ab3f265d4963aaa 100644 --- a/plugins/autopeering/protocol/constants.go +++ b/plugins/autopeering/protocol/constants.go @@ -1,85 +1,11 @@ package protocol -import ( - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/plugins/autopeering/salt" -) - const ( - // PEERING REQUEST PACKET STRUCTURE //////////////////////////////////////////////////////////////////////////////// - NEIGHBOR_COUNT = 8 - PACKET_HEADER_SIZE = 1 - ISSUER_SIZE = PEER_MARSHALLED_TOTAL_SIZE - SALT_SIZE = salt.SALT_MARSHALLED_SIZE - SIGNATURE_SIZE = 65 - - PACKET_HEADER_START = 0 - ISSUER_START = PACKET_HEADER_END - SALT_START = ISSUER_END - SIGNATURE_START = SALT_END - - PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE - ISSUER_END = ISSUER_START + ISSUER_SIZE - SALT_END = SALT_START + SALT_SIZE - SIGNATURE_END = SIGNATURE_START + SIGNATURE_SIZE - - PEERING_REQUEST_MARSHALLED_TOTAL_SIZE = SIGNATURE_END - - PACKET_HEADER = 0xBE - - PEERING_RESPONSE_REJECT = ResponseType(0) - PEERING_RESPONSE_ACCEPT = ResponseType(1) - - PEERING_RESPONSE_MARSHALLED_PEERS_AMOUNT = NEIGHBOR_COUNT + NEIGHBOR_COUNT*NEIGHBOR_COUNT - - PEERING_RESPONSE_MARSHALLED_TYPE_SIZE = 1 - PEERING_RESPONSE_MARSHALLED_ISSUER_SIZE = PEER_MARSHALLED_TOTAL_SIZE - PEERING_RESPONSE_MARSHALLED_PEER_FLAG_SIZE = 1 - PEERING_RESPONSE_MARSHALLED_PEER_SIZE = PEERING_RESPONSE_MARSHALLED_PEER_FLAG_SIZE + PEER_MARSHALLED_TOTAL_SIZE - PEERING_RESPONSE_MARSHALLED_PEERS_SIZE = PEERING_RESPONSE_MARSHALLED_PEERS_AMOUNT * PEERING_RESPONSE_MARSHALLED_PEER_SIZE - PEERING_RESPONSE_MARSHALLED_SIGNATURE_SIZE = 65 - PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE = PEERING_RESPONSE_MARSHALLED_SIGNATURE_END - - PEERING_RESPONSE_MARSHALLED_TYPE_START = 0 - PEERING_RESPONSE_MARSHALLED_ISSUER_START = PEERING_RESPONSE_MARSHALLED_TYPE_END - PEERING_RESPONSE_MARSHALLED_PEERS_START = PEERING_RESPONSE_MARSHALLED_ISSUER_END - PEERING_RESPONSE_MARSHALLED_SIGNATURE_START = PEERING_RESPONSE_MARSHALLED_PEERS_END - - PEERING_RESPONSE_MARSHALLED_TYPE_END = PEERING_RESPONSE_MARSHALLED_TYPE_START + PEERING_RESPONSE_MARSHALLED_TYPE_SIZE - PEERING_RESPONSE_MARSHALLED_PEERS_END = PEERING_RESPONSE_MARSHALLED_PEERS_START + PEERING_RESPONSE_MARSHALLED_PEERS_SIZE - PEERING_RESPONSE_MARSHALLED_ISSUER_END = PEERING_RESPONSE_MARSHALLED_ISSUER_START + PEERING_RESPONSE_MARSHALLED_ISSUER_SIZE - PEERING_RESPONSE_MARSHALLED_SIGNATURE_END = PEERING_RESPONSE_MARSHALLED_SIGNATURE_START + PEERING_RESPONSE_MARSHALLED_SIGNATURE_SIZE - - PEER_MARSHALLED_PUBLIC_KEY_SIZE = identity.PUBLIC_KEY_BYTE_LENGTH - PEER_MARSHALLED_ADDRESS_TYPE_SIZE = 1 // ipv4/ipv6 - PEER_MARSHALLED_ADDRESS_SIZE = 16 - PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_SIZE = 1 // tcp udp - PEER_MARSHALLED_PEERING_PORT_SIZE = 2 - PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE = 1 - PEER_MARSHALLED_GOSSIP_PORT_SIZE = 2 // tcp udp - PEER_MARSHALLED_TOTAL_SIZE = PEER_MARSHALLED_GOSSIP_PORT_END - - PEER_MARSHALLED_PUBLIC_KEY_START = 0 - PEER_MARSHALLED_ADDRESS_TYPE_START = PEER_MARSHALLED_PUBLIC_KEY_END - PEER_MARSHALLED_ADDRESS_START = PEER_MARSHALLED_ADDRESS_TYPE_END - PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_START = PEER_MARSHALLED_ADDRESS_END - PEER_MARSHALLED_PEERING_PORT_START = PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_END - PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_START = PEER_MARSHALLED_PEERING_PORT_END - PEER_MARSHALLED_GOSSIP_PORT_START = PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_END - - PEER_MARSHALLED_PUBLIC_KEY_END = PEER_MARSHALLED_PUBLIC_KEY_START + PEER_MARSHALLED_PUBLIC_KEY_SIZE - PEER_MARSHALLED_ADDRESS_TYPE_END = PEER_MARSHALLED_ADDRESS_TYPE_START + PEER_MARSHALLED_ADDRESS_TYPE_SIZE - PEER_MARSHALLED_ADDRESS_END = PEER_MARSHALLED_ADDRESS_START + PEER_MARSHALLED_ADDRESS_SIZE - PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_END = PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_START + PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_SIZE - PEER_MARSHALLED_PEERING_PORT_END = PEER_MARSHALLED_PEERING_PORT_START + PEER_MARSHALLED_PEERING_PORT_SIZE - PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_END = PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_START + PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE - PEER_MARSHALLED_GOSSIP_PORT_END = PEER_MARSHALLED_GOSSIP_PORT_START + PEER_MARSHALLED_GOSSIP_PORT_SIZE - - PEER_MARSHALLED_ADDRESS_TYPE_IPV4 = AddressType(0) - PEER_MARSHALLED_ADDRESS_TYPE_IPV6 = AddressType(1) + PROTOCOL_TYPE_TCP = ProtocolType(0) + PROTOCOL_TYPE_UDP = ProtocolType(1) - TCP_PROTOCOL = ProtocolType(0) - UDP_PROTOCOL = ProtocolType(1) + ADDRESS_TYPE_IPV4 = AddressType(0) + ADDRESS_TYPE_IPV6 = AddressType(1) ) diff --git a/plugins/autopeering/protocol/peer.go b/plugins/autopeering/protocol/peer.go deleted file mode 100644 index 013de5495610e5884de4835ce4c6c336d09b67fb..0000000000000000000000000000000000000000 --- a/plugins/autopeering/protocol/peer.go +++ /dev/null @@ -1,72 +0,0 @@ -package protocol - -import ( - "encoding/binary" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/pkg/errors" - "net" - "time" -) - -type Peer struct { - Identity *identity.Identity - Address net.IP - PeeringProtocolType ProtocolType - PeeringPort uint16 - GossipProtocolType ProtocolType - GossipPort uint16 - FirstSeen time.Time - LastSeen time.Time - LastContact time.Time -} - -func UnmarshalPeer(data []byte) (*Peer, error) { - if len(data) < PEER_MARSHALLED_TOTAL_SIZE { - return nil, errors.New("size of marshalled peer is too small") - } - - peer := &Peer{ - Identity: identity.NewIdentity(data[PEER_MARSHALLED_PUBLIC_KEY_START:PEER_MARSHALLED_PUBLIC_KEY_END]), - } - - switch data[PEER_MARSHALLED_ADDRESS_TYPE_START] { - case PEER_MARSHALLED_ADDRESS_TYPE_IPV4: - peer.Address = net.IP(data[PEER_MARSHALLED_ADDRESS_START:PEER_MARSHALLED_ADDRESS_END]).To4() - case PEER_MARSHALLED_ADDRESS_TYPE_IPV6: - peer.Address = net.IP(data[PEER_MARSHALLED_ADDRESS_START:PEER_MARSHALLED_ADDRESS_END]).To16() - } - - peer.PeeringProtocolType = ProtocolType(data[PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_START]) - peer.PeeringPort = binary.BigEndian.Uint16(data[PEER_MARSHALLED_PEERING_PORT_START:PEER_MARSHALLED_PEERING_PORT_END]) - - peer.GossipProtocolType = ProtocolType(data[PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) - peer.GossipPort = binary.BigEndian.Uint16(data[PEER_MARSHALLED_GOSSIP_PORT_START:PEER_MARSHALLED_GOSSIP_PORT_END]) - - return peer, nil -} - -func (peer *Peer) Marshal() []byte { - result := make([]byte, PEER_MARSHALLED_TOTAL_SIZE) - - copy(result[PEER_MARSHALLED_PUBLIC_KEY_START:PEER_MARSHALLED_PUBLIC_KEY_END], - peer.Identity.PublicKey[:PEER_MARSHALLED_PUBLIC_KEY_SIZE]) - - switch len(peer.Address) { - case net.IPv4len: - result[PEER_MARSHALLED_ADDRESS_TYPE_START] = PEER_MARSHALLED_ADDRESS_TYPE_IPV4 - case net.IPv6len: - result[PEER_MARSHALLED_ADDRESS_TYPE_START] = PEER_MARSHALLED_ADDRESS_TYPE_IPV6 - default: - panic("invalid address in peer") - } - - copy(result[PEER_MARSHALLED_ADDRESS_START:PEER_MARSHALLED_ADDRESS_END], peer.Address.To16()) - - result[PEER_MARSHALLED_PEERING_PROTOCOL_TYPE_START] = peer.PeeringProtocolType - binary.BigEndian.PutUint16(result[PEER_MARSHALLED_PEERING_PORT_START:PEER_MARSHALLED_PEERING_PORT_END], peer.PeeringPort) - - result[PEER_MARSHALLED_GOSSIP_PROTOCOL_TYPE_START] = peer.GossipProtocolType - binary.BigEndian.PutUint16(result[PEER_MARSHALLED_GOSSIP_PORT_START:PEER_MARSHALLED_GOSSIP_PORT_END], peer.GossipPort) - - return result -} diff --git a/plugins/autopeering/protocol/peer/constants.go b/plugins/autopeering/protocol/peer/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..b1b65ea4003558b2fb83c6195c5361428df96278 --- /dev/null +++ b/plugins/autopeering/protocol/peer/constants.go @@ -0,0 +1,32 @@ +package peer + +import ( + "github.com/iotaledger/goshimmer/packages/identity" +) + +const ( + MARSHALLED_PUBLIC_KEY_START = 0 + MARSHALLED_ADDRESS_TYPE_START = MARSHALLED_PUBLIC_KEY_END + MARSHALLED_ADDRESS_START = MARSHALLED_ADDRESS_TYPE_END + MARSHALLED_PEERING_PROTOCOL_TYPE_START = MARSHALLED_ADDRESS_END + MARSHALLED_PEERING_PORT_START = MARSHALLED_PEERING_PROTOCOL_TYPE_END + MARSHALLED_GOSSIP_PROTOCOL_TYPE_START = MARSHALLED_PEERING_PORT_END + MARSHALLED_GOSSIP_PORT_START = MARSHALLED_GOSSIP_PROTOCOL_TYPE_END + + MARSHALLED_PUBLIC_KEY_END = MARSHALLED_PUBLIC_KEY_START + MARSHALLED_PUBLIC_KEY_SIZE + MARSHALLED_ADDRESS_TYPE_END = MARSHALLED_ADDRESS_TYPE_START + MARSHALLED_ADDRESS_TYPE_SIZE + MARSHALLED_ADDRESS_END = MARSHALLED_ADDRESS_START + MARSHALLED_ADDRESS_SIZE + MARSHALLED_PEERING_PROTOCOL_TYPE_END = MARSHALLED_PEERING_PROTOCOL_TYPE_START + MARSHALLED_PEERING_PROTOCOL_TYPE_SIZE + MARSHALLED_PEERING_PORT_END = MARSHALLED_PEERING_PORT_START + MARSHALLED_PEERING_PORT_SIZE + MARSHALLED_GOSSIP_PROTOCOL_TYPE_END = MARSHALLED_GOSSIP_PROTOCOL_TYPE_START + MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE + MARSHALLED_GOSSIP_PORT_END = MARSHALLED_GOSSIP_PORT_START + MARSHALLED_GOSSIP_PORT_SIZE + + MARSHALLED_PUBLIC_KEY_SIZE = identity.PUBLIC_KEY_BYTE_LENGTH + MARSHALLED_ADDRESS_TYPE_SIZE = 1 + MARSHALLED_ADDRESS_SIZE = 16 + MARSHALLED_PEERING_PROTOCOL_TYPE_SIZE = 1 + MARSHALLED_PEERING_PORT_SIZE = 2 + MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE = 1 + MARSHALLED_GOSSIP_PORT_SIZE = 2 + MARSHALLED_TOTAL_SIZE = MARSHALLED_GOSSIP_PORT_END +) diff --git a/plugins/autopeering/protocol/peer/peer.go b/plugins/autopeering/protocol/peer/peer.go new file mode 100644 index 0000000000000000000000000000000000000000..a004bfd6b2a7dfb4febe35b2ffa346f1a7597b69 --- /dev/null +++ b/plugins/autopeering/protocol/peer/peer.go @@ -0,0 +1,78 @@ +package peer + +import ( + "encoding/binary" + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/pkg/errors" + "net" + "strconv" +) + +type Peer struct { + Identity *identity.Identity + Address net.IP + PeeringProtocolType protocol.ProtocolType + PeeringPort uint16 + GossipProtocolType protocol.ProtocolType + GossipPort uint16 +} + +func Unmarshal(data []byte) (*Peer, error) { + if len(data) < MARSHALLED_TOTAL_SIZE { + return nil, errors.New("size of marshalled peer is too small") + } + + peer := &Peer{ + Identity: identity.NewIdentity(data[MARSHALLED_PUBLIC_KEY_START:MARSHALLED_PUBLIC_KEY_END]), + } + + switch data[MARSHALLED_ADDRESS_TYPE_START] { + case protocol.ADDRESS_TYPE_IPV4: + peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To4() + case protocol.ADDRESS_TYPE_IPV6: + peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To16() + } + + peer.PeeringProtocolType = protocol.ProtocolType(data[MARSHALLED_PEERING_PROTOCOL_TYPE_START]) + peer.PeeringPort = binary.BigEndian.Uint16(data[MARSHALLED_PEERING_PORT_START:MARSHALLED_PEERING_PORT_END]) + + peer.GossipProtocolType = protocol.ProtocolType(data[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) + peer.GossipPort = binary.BigEndian.Uint16(data[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END]) + + return peer, nil +} + +func (peer *Peer) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) + + copy(result[MARSHALLED_PUBLIC_KEY_START:MARSHALLED_PUBLIC_KEY_END], + peer.Identity.PublicKey[:MARSHALLED_PUBLIC_KEY_SIZE]) + + switch len(peer.Address) { + case net.IPv4len: + result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV4 + case net.IPv6len: + result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV6 + default: + panic("invalid address in peer") + } + + copy(result[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END], peer.Address.To16()) + + result[MARSHALLED_PEERING_PROTOCOL_TYPE_START] = peer.PeeringProtocolType + binary.BigEndian.PutUint16(result[MARSHALLED_PEERING_PORT_START:MARSHALLED_PEERING_PORT_END], peer.PeeringPort) + + result[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START] = peer.GossipProtocolType + binary.BigEndian.PutUint16(result[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END], peer.GossipPort) + + return result +} + +func (peer *Peer) String() string { + if peer.Identity != nil { + return peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + } else { + return peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + } +} diff --git a/plugins/autopeering/protocol/peering_response.go b/plugins/autopeering/protocol/peering_response.go deleted file mode 100644 index 1cb9dee23d7584b79f4861193f33d4028776380c..0000000000000000000000000000000000000000 --- a/plugins/autopeering/protocol/peering_response.go +++ /dev/null @@ -1,87 +0,0 @@ -package protocol - -import ( - "bytes" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/pkg/errors" -) - -type PeeringResponse struct { - Type ResponseType - Issuer *Peer - Peers []*Peer - Signature [PEERING_RESPONSE_MARSHALLED_SIGNATURE_SIZE]byte -} - -func UnmarshalPeeringResponse(data []byte) (*PeeringResponse, error) { - if len(data) < PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE { - return nil, errors.New("size of marshalled peering response is too small") - } - - peeringResponse := &PeeringResponse{ - Type: data[PEERING_RESPONSE_MARSHALLED_TYPE_START], - Peers: make([]*Peer, 0), - } - - if unmarshalledPeer, err := UnmarshalPeer(data[PEERING_RESPONSE_MARSHALLED_ISSUER_START:PEERING_RESPONSE_MARSHALLED_ISSUER_END]); err != nil { - return nil, err - } else { - peeringResponse.Issuer = unmarshalledPeer - } - - for i := 0; i < PEERING_RESPONSE_MARSHALLED_PEERS_AMOUNT; i++ { - PEERING_RESPONSE_MARSHALLED_PEER_START := PEERING_RESPONSE_MARSHALLED_PEERS_START + (i * PEERING_RESPONSE_MARSHALLED_PEER_SIZE) - PEERING_RESPONSE_MARSHALLED_PEER_END := PEERING_RESPONSE_MARSHALLED_PEER_START + PEERING_RESPONSE_MARSHALLED_PEER_SIZE - - if data[PEERING_RESPONSE_MARSHALLED_PEER_START] == 1 { - peer, err := UnmarshalPeer(data[PEERING_RESPONSE_MARSHALLED_PEER_START+1 : PEERING_RESPONSE_MARSHALLED_PEER_END]) - if err != nil { - return nil, err - } - - peeringResponse.Peers = append(peeringResponse.Peers, peer) - } - } - - if issuer, err := identity.FromSignedData(data[:PEERING_RESPONSE_MARSHALLED_SIGNATURE_START], data[PEERING_RESPONSE_MARSHALLED_SIGNATURE_START:]); err != nil { - return nil, err - } else { - if !bytes.Equal(issuer.Identifier, peeringResponse.Issuer.Identity.Identifier) { - return nil, ErrInvalidSignature - } - } - copy(peeringResponse.Signature[:], data[PEERING_RESPONSE_MARSHALLED_SIGNATURE_START:PEERING_RESPONSE_MARSHALLED_SIGNATURE_END]) - - return peeringResponse, nil -} - -func (this *PeeringResponse) Sign() *PeeringResponse { - dataToSign := this.Marshal()[:PEERING_RESPONSE_MARSHALLED_SIGNATURE_START] - if signature, err := this.Issuer.Identity.Sign(dataToSign); err != nil { - panic(err) - } else { - copy(this.Signature[:], signature) - } - - return this -} - -func (this *PeeringResponse) Marshal() []byte { - result := make([]byte, PEERING_RESPONSE_MARSHALLED_TOTAL_SIZE) - - result[PEERING_RESPONSE_MARSHALLED_TYPE_START] = this.Type - - copy(result[PEERING_RESPONSE_MARSHALLED_ISSUER_START:PEERING_RESPONSE_MARSHALLED_ISSUER_END], this.Issuer.Marshal()) - - for i, peer := range this.Peers { - PEERING_RESPONSE_MARSHALLED_PEER_START := PEERING_RESPONSE_MARSHALLED_PEERS_START + (i * PEERING_RESPONSE_MARSHALLED_PEER_SIZE) - PEERING_RESPONSE_MARSHALLED_PEER_END := PEERING_RESPONSE_MARSHALLED_PEER_START + PEERING_RESPONSE_MARSHALLED_PEER_SIZE - - result[PEERING_RESPONSE_MARSHALLED_PEER_START] = 1 - copy(result[PEERING_RESPONSE_MARSHALLED_PEER_START+1:PEERING_RESPONSE_MARSHALLED_PEER_END], peer.Marshal()[:PEERING_RESPONSE_MARSHALLED_PEER_SIZE-1]) - } - - copy(result[PEERING_RESPONSE_MARSHALLED_SIGNATURE_START:PEERING_RESPONSE_MARSHALLED_SIGNATURE_END], this.Signature[:PEERING_RESPONSE_MARSHALLED_SIGNATURE_SIZE]) - - return result -} diff --git a/plugins/autopeering/protocol/request/constants.go b/plugins/autopeering/protocol/request/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..eb05a8323e745424743a52b1184cb2cab6fa2255 --- /dev/null +++ b/plugins/autopeering/protocol/request/constants.go @@ -0,0 +1,27 @@ +package request + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" +) + +const ( + PACKET_HEADER_SIZE = 1 + ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE + SALT_SIZE = salt.SALT_MARSHALLED_SIZE + SIGNATURE_SIZE = 65 + + PACKET_HEADER_START = 0 + ISSUER_START = PACKET_HEADER_END + SALT_START = ISSUER_END + SIGNATURE_START = SALT_END + + PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE + ISSUER_END = ISSUER_START + ISSUER_SIZE + SALT_END = SALT_START + SALT_SIZE + SIGNATURE_END = SIGNATURE_START + SIGNATURE_SIZE + + MARSHALLED_TOTAL_SIZE = SIGNATURE_END + + MARSHALLED_PACKET_HEADER = 0xBE +) diff --git a/plugins/autopeering/protocol/errors.go b/plugins/autopeering/protocol/request/errors.go similarity index 95% rename from plugins/autopeering/protocol/errors.go rename to plugins/autopeering/protocol/request/errors.go index 53f4ab081ff3274a3057eefc7f63cc1fdaad3567..8895aa9eb0057273f39d25e6da8eae94ce3c86a9 100644 --- a/plugins/autopeering/protocol/errors.go +++ b/plugins/autopeering/protocol/request/errors.go @@ -1,4 +1,4 @@ -package protocol +package request import "github.com/pkg/errors" diff --git a/plugins/autopeering/protocol/peering_request.go b/plugins/autopeering/protocol/request/request.go similarity index 64% rename from plugins/autopeering/protocol/peering_request.go rename to plugins/autopeering/protocol/request/request.go index 1ed834db1c9097a8f757357e3c6f429805550c03..bf499741007b2193603fdeb1bfdcd5ddcb56d0a1 100644 --- a/plugins/autopeering/protocol/peering_request.go +++ b/plugins/autopeering/protocol/request/request.go @@ -1,27 +1,28 @@ -package protocol +package request import ( "bytes" "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/plugins/autopeering/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" "time" ) -type PeeringRequest struct { - Issuer *Peer +type Request struct { + Issuer *peer.Peer Salt *salt.Salt Signature [SIGNATURE_SIZE]byte } -func UnmarshalPeeringRequest(data []byte) (*PeeringRequest, error) { - if data[0] != PACKET_HEADER || len(data) != PEERING_REQUEST_MARSHALLED_TOTAL_SIZE { +func Unmarshal(data []byte) (*Request, error) { + if data[0] != MARSHALLED_PACKET_HEADER || len(data) != MARSHALLED_TOTAL_SIZE { return nil, ErrMalformedPeeringRequest } - peeringRequest := &PeeringRequest{} + peeringRequest := &Request{} - if unmarshalledPeer, err := UnmarshalPeer(data[ISSUER_START:ISSUER_END]); err != nil { + if unmarshalledPeer, err := peer.Unmarshal(data[ISSUER_START:ISSUER_END]); err != nil { return nil, err } else { peeringRequest.Issuer = unmarshalledPeer @@ -34,7 +35,7 @@ func UnmarshalPeeringRequest(data []byte) (*PeeringRequest, error) { } now := time.Now() - if peeringRequest.Salt.ExpirationTime.Before(now) { + if peeringRequest.Salt.ExpirationTime.Before(now.Add(-1 * time.Minute)) { return nil, ErrPublicSaltExpired } if peeringRequest.Salt.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1*time.Minute)) { @@ -53,19 +54,18 @@ func UnmarshalPeeringRequest(data []byte) (*PeeringRequest, error) { return peeringRequest, nil } -func (this *PeeringRequest) Sign() { - dataToSign := this.Marshal()[:SIGNATURE_START] - if signature, err := this.Issuer.Identity.Sign(dataToSign); err != nil { +func (this *Request) Sign() { + if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:SIGNATURE_START]); err != nil { panic(err) } else { copy(this.Signature[:], signature) } } -func (this *PeeringRequest) Marshal() []byte { - result := make([]byte, PEERING_REQUEST_MARSHALLED_TOTAL_SIZE) +func (this *Request) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) - result[PACKET_HEADER_START] = PACKET_HEADER + result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER copy(result[ISSUER_START:ISSUER_END], this.Issuer.Marshal()) copy(result[SALT_START:SALT_END], this.Salt.Marshal()) copy(result[SIGNATURE_START:SIGNATURE_END], this.Signature[:SIGNATURE_SIZE]) diff --git a/plugins/autopeering/protocol/response/constants.go b/plugins/autopeering/protocol/response/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..1934656553ab90adf5b941ad6f17b91dfad9d572 --- /dev/null +++ b/plugins/autopeering/protocol/response/constants.go @@ -0,0 +1,35 @@ +package response + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" +) + +const ( + TYPE_REJECT = Type(0) + TYPE_ACCEPT = Type(1) + + MARSHALLED_PEERS_AMOUNT = protocol.NEIGHBOR_COUNT + protocol.NEIGHBOR_COUNT*protocol.NEIGHBOR_COUNT + MARHSALLED_PACKET_HEADER = 0xBC + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_TYPE_START = MARSHALLED_PACKET_HEADER_END + MARSHALLED_ISSUER_START = MARSHALLED_TYPE_END + MARSHALLED_PEERS_START = MARSHALLED_ISSUER_END + MARSHALLED_SIGNATURE_START = MARSHALLED_PEERS_END + + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + MARSHALLED_TYPE_END = MARSHALLED_TYPE_START + MARSHALLED_TYPE_SIZE + MARSHALLED_PEERS_END = MARSHALLED_PEERS_START + MARSHALLED_PEERS_SIZE + MARSHALLED_ISSUER_END = MARSHALLED_ISSUER_START + MARSHALLED_ISSUER_SIZE + MARSHALLED_SIGNATURE_END = MARSHALLED_SIGNATURE_START + MARSHALLED_SIGNATURE_SIZE + + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_TYPE_SIZE = 1 + MARSHALLED_ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEER_FLAG_SIZE = 1 + MARSHALLED_PEER_SIZE = MARSHALLED_PEER_FLAG_SIZE + peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEERS_SIZE = MARSHALLED_PEERS_AMOUNT * MARSHALLED_PEER_SIZE + MARSHALLED_SIGNATURE_SIZE = 65 + MARSHALLED_TOTAL_SIZE = MARSHALLED_SIGNATURE_END +) diff --git a/plugins/autopeering/protocol/response/errors.go b/plugins/autopeering/protocol/response/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..abda4d1fda24555173735923dabd7b968ad2cd9e --- /dev/null +++ b/plugins/autopeering/protocol/response/errors.go @@ -0,0 +1,7 @@ +package response + +import "github.com/pkg/errors" + +var ( + ErrInvalidSignature = errors.New("invalid signature in peering request") +) diff --git a/plugins/autopeering/protocol/response/response.go b/plugins/autopeering/protocol/response/response.go new file mode 100644 index 0000000000000000000000000000000000000000..c91fc91d0f23d7d95b248d98b4aabf107da8b6e8 --- /dev/null +++ b/plugins/autopeering/protocol/response/response.go @@ -0,0 +1,89 @@ +package response + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/pkg/errors" +) + +type Response struct { + Type Type + Issuer *peer.Peer + Peers []*peer.Peer + Signature [MARSHALLED_SIGNATURE_SIZE]byte +} + +func Unmarshal(data []byte) (*Response, error) { + if data[0] != MARHSALLED_PACKET_HEADER || len(data) < MARSHALLED_TOTAL_SIZE { + return nil, errors.New("malformed peering response") + } + + peeringResponse := &Response{ + Type: data[MARSHALLED_TYPE_START], + Peers: make([]*peer.Peer, 0), + } + + if unmarshalledPeer, err := peer.Unmarshal(data[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END]); err != nil { + return nil, err + } else { + peeringResponse.Issuer = unmarshalledPeer + } + + for i := 0; i < MARSHALLED_PEERS_AMOUNT; i++ { + PEERING_RESPONSE_MARSHALLED_PEER_START := MARSHALLED_PEERS_START + (i * MARSHALLED_PEER_SIZE) + PEERING_RESPONSE_MARSHALLED_PEER_END := PEERING_RESPONSE_MARSHALLED_PEER_START + MARSHALLED_PEER_SIZE + + if data[PEERING_RESPONSE_MARSHALLED_PEER_START] == 1 { + peer, err := peer.Unmarshal(data[PEERING_RESPONSE_MARSHALLED_PEER_START+1 : PEERING_RESPONSE_MARSHALLED_PEER_END]) + if err != nil { + return nil, err + } + + peeringResponse.Peers = append(peeringResponse.Peers, peer) + } + } + + if issuer, err := identity.FromSignedData(data[:MARSHALLED_SIGNATURE_START], data[MARSHALLED_SIGNATURE_START:]); err != nil { + return nil, err + } else { + if !bytes.Equal(issuer.Identifier, peeringResponse.Issuer.Identity.Identifier) { + return nil, ErrInvalidSignature + } + } + copy(peeringResponse.Signature[:], data[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END]) + + return peeringResponse, nil +} + +func (this *Response) Sign() *Response { + dataToSign := this.Marshal()[:MARSHALLED_SIGNATURE_START] + if signature, err := this.Issuer.Identity.Sign(dataToSign); err != nil { + panic(err) + } else { + copy(this.Signature[:], signature) + } + + return this +} + +func (this *Response) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) + + result[MARSHALLED_PACKET_HEADER_START] = MARHSALLED_PACKET_HEADER + result[MARSHALLED_TYPE_START] = this.Type + + copy(result[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END], this.Issuer.Marshal()) + + for i, peer := range this.Peers { + PEERING_RESPONSE_MARSHALLED_PEER_START := MARSHALLED_PEERS_START + (i * MARSHALLED_PEER_SIZE) + PEERING_RESPONSE_MARSHALLED_PEER_END := PEERING_RESPONSE_MARSHALLED_PEER_START + MARSHALLED_PEER_SIZE + + result[PEERING_RESPONSE_MARSHALLED_PEER_START] = 1 + copy(result[PEERING_RESPONSE_MARSHALLED_PEER_START+1:PEERING_RESPONSE_MARSHALLED_PEER_END], peer.Marshal()[:MARSHALLED_PEER_SIZE-1]) + } + + copy(result[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END], this.Signature[:MARSHALLED_SIGNATURE_SIZE]) + + return result +} diff --git a/plugins/autopeering/protocol/response/types.go b/plugins/autopeering/protocol/response/types.go new file mode 100644 index 0000000000000000000000000000000000000000..64e48e6c2dec3026fbe222ab8814a2084ab53759 --- /dev/null +++ b/plugins/autopeering/protocol/response/types.go @@ -0,0 +1,3 @@ +package response + +type Type = byte diff --git a/plugins/autopeering/salt/constants.go b/plugins/autopeering/protocol/salt/constants.go similarity index 100% rename from plugins/autopeering/salt/constants.go rename to plugins/autopeering/protocol/salt/constants.go diff --git a/plugins/autopeering/salt/salt.go b/plugins/autopeering/protocol/salt/salt.go similarity index 100% rename from plugins/autopeering/salt/salt.go rename to plugins/autopeering/protocol/salt/salt.go diff --git a/plugins/autopeering/protocol/types.go b/plugins/autopeering/protocol/types.go index 22a7f44ea0c7d6dd34cda08f0429ae0ef626e3cb..99e489a3b3ae8cf6ec25e9230d874c3b7f3562e2 100644 --- a/plugins/autopeering/protocol/types.go +++ b/plugins/autopeering/protocol/types.go @@ -1,7 +1,5 @@ package protocol -type ProtocolType = byte - type AddressType = byte -type ResponseType = byte +type ProtocolType = byte diff --git a/plugins/autopeering/saltmanager/events.go b/plugins/autopeering/saltmanager/events.go index a1bc3a8d3afd497dab5e389eb7537e13db975636..5fb00471ad39355ba633679b842df205323d6d86 100644 --- a/plugins/autopeering/saltmanager/events.go +++ b/plugins/autopeering/saltmanager/events.go @@ -1,7 +1,7 @@ package saltmanager import ( - "github.com/iotaledger/goshimmer/plugins/autopeering/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" "reflect" ) diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index c119d3b23c5a06aad6274dab3dff368a17353220..2c3d62399f378cbfe3ec6c4d3ca9cd6a6d721570 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -4,7 +4,7 @@ import ( "github.com/dgraph-io/badger" "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/settings" - "github.com/iotaledger/goshimmer/plugins/autopeering/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" "time" ) diff --git a/plugins/autopeering/saltmanager/types.go b/plugins/autopeering/saltmanager/types.go index 650741e2e188ce605d0b6290f7f75076866c1ec2..1a0438c005e7b072acb1fa2e9d8225eaf742305c 100644 --- a/plugins/autopeering/saltmanager/types.go +++ b/plugins/autopeering/saltmanager/types.go @@ -1,5 +1,5 @@ package saltmanager -import "github.com/iotaledger/goshimmer/plugins/autopeering/salt" +import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" type SaltConsumer = func(salt *salt.Salt) diff --git a/plugins/autopeering/server/constants.go b/plugins/autopeering/server/constants.go deleted file mode 100644 index c122957ba31c3e7c99c1996ce7ce70cf891e4cab..0000000000000000000000000000000000000000 --- a/plugins/autopeering/server/constants.go +++ /dev/null @@ -1,7 +0,0 @@ -package server - -import "time" - -const ( - TCP_IDLE_TIMEOUT = 5 * time.Second -) diff --git a/plugins/autopeering/server/server.go b/plugins/autopeering/server/server.go index 3615391bd0044eabf5c1e459c449c0300b813906..212a6ca5e864ae79543e0393ade580a9c9c4d25a 100644 --- a/plugins/autopeering/server/server.go +++ b/plugins/autopeering/server/server.go @@ -2,19 +2,21 @@ package server import ( "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" ) func Configure(plugin *node.Plugin) { - ConfigureUDPServer(plugin) - ConfigureTCPServer(plugin) + udp.ConfigureServer(plugin) + tcp.ConfigureServer(plugin) } func Run(plugin *node.Plugin) { - RunUDPServer(plugin) - RunTCPServer(plugin) + udp.RunServer(plugin) + tcp.RunServer(plugin) } func Shutdown(plugin *node.Plugin) { - ShutdownUDPServer(plugin) - ShutdownTCPServer(plugin) + udp.ShutdownUDPServer(plugin) + tcp.ShutdownServer(plugin) } diff --git a/plugins/autopeering/server/tcp/constants.go b/plugins/autopeering/server/tcp/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..df04531275bf96f2c254282a40f84af277929e28 --- /dev/null +++ b/plugins/autopeering/server/tcp/constants.go @@ -0,0 +1,11 @@ +package tcp + +import "time" + +const ( + IDLE_TIMEOUT = 5 * time.Second + + STATE_INITIAL = byte(0) + STATE_REQUEST = byte(1) + STATE_RESPONSE = byte(2) +) diff --git a/plugins/autopeering/server/events.go b/plugins/autopeering/server/tcp/events.go similarity index 50% rename from plugins/autopeering/server/events.go rename to plugins/autopeering/server/tcp/events.go index 734a280756b2e5a0416236178198fd883c40e8e5..fa67810d14e56bdcaeb5df3992db37defb8440e4 100644 --- a/plugins/autopeering/server/events.go +++ b/plugins/autopeering/server/tcp/events.go @@ -1,51 +1,58 @@ -package server +package tcp import ( "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" "reflect" ) +var Events = &pluginEvents{ + ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, + ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, +} + type pluginEvents struct { - ReceivePeeringRequest *addressPeeringRequestEvent - ReceiveTCPPeeringRequest *tcpPeeringRequestEvent - Error *ipErrorEvent + ReceiveRequest *requestEvent + ReceiveResponse *responseEvent + Error *ipErrorEvent } -type tcpPeeringRequestEvent struct { +type requestEvent struct { callbacks map[uintptr]ConnectionPeeringRequestConsumer } -func (this *tcpPeeringRequestEvent) Attach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Attach(callback ConnectionPeeringRequestConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *tcpPeeringRequestEvent) Detach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Detach(callback ConnectionPeeringRequestConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *tcpPeeringRequestEvent) Trigger(conn network.Connection, request *protocol.PeeringRequest) { +func (this *requestEvent) Trigger(conn *network.ManagedConnection, request *request.Request) { for _, callback := range this.callbacks { callback(conn, request) } } -type addressPeeringRequestEvent struct { - callbacks map[uintptr]IPPeeringRequestConsumer +type responseEvent struct { + callbacks map[uintptr]ConnectionPeeringResponseConsumer } -func (this *addressPeeringRequestEvent) Attach(callback IPPeeringRequestConsumer) { +func (this *responseEvent) Attach(callback ConnectionPeeringResponseConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *addressPeeringRequestEvent) Detach(callback IPPeeringRequestConsumer) { +func (this *responseEvent) Detach(callback ConnectionPeeringResponseConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *addressPeeringRequestEvent) Trigger(ip net.IP, request *protocol.PeeringRequest) { +func (this *responseEvent) Trigger(conn *network.ManagedConnection, peeringResponse *response.Response) { for _, callback := range this.callbacks { - callback(ip, request) + callback(conn, peeringResponse) } } @@ -66,9 +73,3 @@ func (this *ipErrorEvent) Trigger(ip net.IP, err error) { callback(ip, err) } } - -var Events = &pluginEvents{ - ReceivePeeringRequest: &addressPeeringRequestEvent{make(map[uintptr]IPPeeringRequestConsumer)}, - ReceiveTCPPeeringRequest: &tcpPeeringRequestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, - Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, -} diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go new file mode 100644 index 0000000000000000000000000000000000000000..1544c703848961e91f8a63c6584906132af36077 --- /dev/null +++ b/plugins/autopeering/server/tcp/server.go @@ -0,0 +1,171 @@ +package tcp + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/packages/network/tcp" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "github.com/pkg/errors" + "math" + "net" + "strconv" +) + +var server = tcp.NewServer() + +func ConfigureServer(plugin *node.Plugin) { + server.Events.Connect.Attach(HandleConnection) + server.Events.Error.Attach(func(err error) { + plugin.LogFailure("error in tcp server: " + err.Error()) + }) + server.Events.Start.Attach(func() { + if *parameters.ADDRESS.Value == "0.0.0.0" { + plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*parameters.TCP_PORT.Value) + ") ... done") + } else { + plugin.LogSuccess("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.TCP_PORT.Value) + ") ... done") + } + }) + server.Events.Shutdown.Attach(func() { + plugin.LogSuccess("Stopping TCP Server ... done") + }) +} + +func RunServer(plugin *node.Plugin) { + daemon.BackgroundWorker(func() { + if *parameters.ADDRESS.Value == "0.0.0.0" { + plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.TCP_PORT.Value) + ") ...") + } else { + plugin.LogInfo("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.TCP_PORT.Value) + ") ...") + } + + server.Listen(*parameters.TCP_PORT.Value) + }) +} + +func ShutdownServer(plugin *node.Plugin) { + plugin.LogInfo("Stopping TCP Server ...") + + server.Shutdown() +} + +func HandleConnection(conn *network.ManagedConnection) { + conn.SetTimeout(IDLE_TIMEOUT) + + var connectionState = STATE_INITIAL + var receiveBuffer []byte + var offset int + + conn.Events.ReceiveData.Attach(func(data []byte) { + ProcessIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset) + }) + + go conn.Read(make([]byte, int(math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE)))) +} + +func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { + if *connectionState == STATE_INITIAL { + var err error + if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil { + Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) + + conn.Close() + + return + } + + *offset = 0 + + switch *connectionState { + case STATE_REQUEST: + *receiveBuffer = make([]byte, request.MARSHALLED_TOTAL_SIZE) + case STATE_RESPONSE: + *receiveBuffer = make([]byte, response.MARSHALLED_TOTAL_SIZE) + } + } + + switch *connectionState { + case STATE_REQUEST: + processIncomingRequestPacket(connectionState, receiveBuffer, conn, data, offset) + case STATE_RESPONSE: + processIncomingResponsePacket(connectionState, receiveBuffer, conn, data, offset) + } +} + +func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { + var connectionState ConnectionState + var receivedData []byte + + switch data[0] { + case request.MARSHALLED_PACKET_HEADER: + receivedData = make([]byte, request.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_REQUEST + case response.MARHSALLED_PACKET_HEADER: + receivedData = make([]byte, response.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_RESPONSE + default: + return 0, nil, errors.New("invalid package header") + } + + return connectionState, receivedData, nil +} + +func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { + remainingCapacity := int(math.Min(float64(request.MARSHALLED_TOTAL_SIZE - *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < request.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if peeringRequest, err := request.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) + + conn.Close() + + return + } else { + peeringRequest.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + Events.ReceiveRequest.Trigger(conn, peeringRequest) + } + + *connectionState = STATE_INITIAL + + if *offset + len(data) > request.MARSHALLED_TOTAL_SIZE { + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[request.MARSHALLED_TOTAL_SIZE:], offset) + } + } +} + +func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { + remainingCapacity := int(math.Min(float64(response.MARSHALLED_TOTAL_SIZE - *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < response.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if peeringResponse, err := response.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) + + conn.Close() + + return + } else { + peeringResponse.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + Events.ReceiveResponse.Trigger(conn, peeringResponse) + } + + *connectionState = STATE_INITIAL + + if *offset + len(data) > response.MARSHALLED_TOTAL_SIZE { + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[response.MARSHALLED_TOTAL_SIZE:], offset) + } + } +} diff --git a/plugins/autopeering/server/tcp/types.go b/plugins/autopeering/server/tcp/types.go new file mode 100644 index 0000000000000000000000000000000000000000..4298cf0b67b7ec02213532a585807d9af9a588a4 --- /dev/null +++ b/plugins/autopeering/server/tcp/types.go @@ -0,0 +1,16 @@ +package tcp + +import ( + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "net" +) + +type ConnectionPeeringRequestConsumer = func(conn *network.ManagedConnection, request *request.Request) + +type ConnectionPeeringResponseConsumer = func(conn *network.ManagedConnection, peeringResponse *response.Response) + +type IPErrorConsumer = func(ip net.IP, err error) + +type ConnectionState = byte diff --git a/plugins/autopeering/server/tcp_server.go b/plugins/autopeering/server/tcp_server.go deleted file mode 100644 index 266870b142bc0abc9684a48f7854788c4d59af6d..0000000000000000000000000000000000000000 --- a/plugins/autopeering/server/tcp_server.go +++ /dev/null @@ -1,73 +0,0 @@ -package server - -import ( - "github.com/iotaledger/goshimmer/packages/daemon" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/packages/network/tcp" - "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "math" - "net" - "strconv" -) - -var tcpServer = tcp.NewServer() - -func ConfigureTCPServer(plugin *node.Plugin) { - tcpServer.Events.Connect.Attach(func(peer network.Connection) { - receivedData := make([]byte, protocol.PEERING_REQUEST_MARSHALLED_TOTAL_SIZE) - - peer.SetTimeout(TCP_IDLE_TIMEOUT) - - offset := 0 - peer.OnReceiveData(func(data []byte) { - remainingCapacity := int(math.Min(float64(protocol.PEERING_REQUEST_MARSHALLED_TOTAL_SIZE- offset), float64(len(data)))) - - copy(receivedData[offset:], data[:remainingCapacity]) - offset += len(data) - - if offset >= protocol.PEERING_REQUEST_MARSHALLED_TOTAL_SIZE { - if peeringRequest, err := protocol.UnmarshalPeeringRequest(receivedData); err != nil { - Events.Error.Trigger(peer.GetConnection().RemoteAddr().(*net.TCPAddr).IP, err) - } else { - Events.ReceiveTCPPeeringRequest.Trigger(peer, peeringRequest) - } - } - }) - - go peer.HandleConnection() - }) - - tcpServer.Events.Error.Attach(func(err error) { - plugin.LogFailure(err.Error()) - }) - tcpServer.Events.Start.Attach(func() { - if *parameters.ADDRESS.Value == "0.0.0.0" { - plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*parameters.UDP_PORT.Value) + ") ... done") - } else { - plugin.LogSuccess("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.UDP_PORT.Value) + ") ... done") - } - }) - tcpServer.Events.Shutdown.Attach(func() { - plugin.LogSuccess("Stopping TCP Server ... done") - }) -} - -func RunTCPServer(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { - if *parameters.ADDRESS.Value == "0.0.0.0" { - plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.UDP_PORT.Value) + ") ...") - } else { - plugin.LogInfo("Starting TCP Server (" + *parameters.ADDRESS.Value + ":" + strconv.Itoa(*parameters.UDP_PORT.Value) + ") ...") - } - - tcpServer.Listen(*parameters.UDP_PORT.Value) - }) -} - -func ShutdownTCPServer(plugin *node.Plugin) { - plugin.LogInfo("Stopping TCP Server ...") - - tcpServer.Shutdown() -} diff --git a/plugins/autopeering/server/types.go b/plugins/autopeering/server/types.go deleted file mode 100644 index b368652ef58d97ed84f9fff29e409ca0e88cfbbf..0000000000000000000000000000000000000000 --- a/plugins/autopeering/server/types.go +++ /dev/null @@ -1,13 +0,0 @@ -package server - -import ( - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "net" -) - -type ConnectionPeeringRequestConsumer = func(conn network.Connection, request *protocol.PeeringRequest) - -type IPPeeringRequestConsumer = func(ip net.IP, request *protocol.PeeringRequest) - -type IPErrorConsumer = func(ip net.IP, err error) diff --git a/plugins/autopeering/server/udp/events.go b/plugins/autopeering/server/udp/events.go new file mode 100644 index 0000000000000000000000000000000000000000..d07d3cd76dcbd4ef3fd38592876ff3151744ba72 --- /dev/null +++ b/plugins/autopeering/server/udp/events.go @@ -0,0 +1,75 @@ +package udp + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "net" + "reflect" +) + +var Events = &pluginEvents{ + ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, + ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, +} + +type pluginEvents struct { + ReceiveRequest *requestEvent + ReceiveResponse *responseEvent + Error *ipErrorEvent +} + +type requestEvent struct { + callbacks map[uintptr]ConnectionPeeringRequestConsumer +} + +func (this *requestEvent) Attach(callback ConnectionPeeringRequestConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *requestEvent) Detach(callback ConnectionPeeringRequestConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *requestEvent) Trigger(request *request.Request) { + for _, callback := range this.callbacks { + callback(request) + } +} + +type responseEvent struct { + callbacks map[uintptr]ConnectionPeeringResponseConsumer +} + +func (this *responseEvent) Attach(callback ConnectionPeeringResponseConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *responseEvent) Detach(callback ConnectionPeeringResponseConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *responseEvent) Trigger(peeringResponse *response.Response) { + for _, callback := range this.callbacks { + callback(peeringResponse) + } +} + +type ipErrorEvent struct { + callbacks map[uintptr]IPErrorConsumer +} + +func (this *ipErrorEvent) Attach(callback IPErrorConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *ipErrorEvent) Detach(callback IPErrorConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *ipErrorEvent) Trigger(ip net.IP, err error) { + for _, callback := range this.callbacks { + callback(ip, err) + } +} + diff --git a/plugins/autopeering/server/udp_server.go b/plugins/autopeering/server/udp/server.go similarity index 58% rename from plugins/autopeering/server/udp_server.go rename to plugins/autopeering/server/udp/server.go index 89e729b632a4fad87d7a654680cb64ebfd9ae639..c4b09ade0fc6d6896cec977084fdf605cfa66e40 100644 --- a/plugins/autopeering/server/udp_server.go +++ b/plugins/autopeering/server/udp/server.go @@ -1,25 +1,28 @@ -package server +package udp import ( "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "github.com/pkg/errors" + "math" "net" "strconv" ) -var udpServer = udp.NewServer() +var udpServer = udp.NewServer(int(math.Max(float64(request.MARSHALLED_TOTAL_SIZE), float64(response.MARSHALLED_TOTAL_SIZE)))) -func ConfigureUDPServer(plugin *node.Plugin) { +func ConfigureServer(plugin *node.Plugin) { Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogFailure(err.Error()) + plugin.LogFailure("u" + err.Error()) }) udpServer.Events.ReceiveData.Attach(processReceivedData) udpServer.Events.Error.Attach(func(err error) { - plugin.LogFailure(err.Error()) + plugin.LogFailure("error in udp server: " + err.Error()) }) udpServer.Events.Start.Attach(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { @@ -33,7 +36,7 @@ func ConfigureUDPServer(plugin *node.Plugin) { }) } -func RunUDPServer(plugin *node.Plugin) { +func RunServer(plugin *node.Plugin) { daemon.BackgroundWorker(func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogInfo("Starting UDP Server (port " + strconv.Itoa(*parameters.UDP_PORT.Value) + ") ...") @@ -52,11 +55,25 @@ func ShutdownUDPServer(plugin *node.Plugin) { } func processReceivedData(addr *net.UDPAddr, data []byte) { + switch data[0] { + case request.MARSHALLED_PACKET_HEADER: + if peeringRequest, err := request.Unmarshal(data); err != nil { + Events.Error.Trigger(addr.IP, err) + } else { + peeringRequest.Issuer.Address = addr.IP - if peeringRequest, err := protocol.UnmarshalPeeringRequest(data); err != nil { - Events.Error.Trigger(addr.IP, err) - } else { - Events.ReceivePeeringRequest.Trigger(addr.IP, peeringRequest) + Events.ReceiveRequest.Trigger(peeringRequest) + } + case response.MARHSALLED_PACKET_HEADER: + if peeringResponse, err := response.Unmarshal(data); err != nil { + Events.Error.Trigger(addr.IP, err) + } else { + peeringResponse.Issuer.Address = addr.IP + + Events.ReceiveResponse.Trigger(peeringResponse) + } + default: + Events.Error.Trigger(addr.IP, errors.New("invalid UDP peering packet from " + addr.IP.String())) } } diff --git a/plugins/autopeering/server/udp/types.go b/plugins/autopeering/server/udp/types.go new file mode 100644 index 0000000000000000000000000000000000000000..48e103e92311f81c4e7c4e98182e31fd680534bf --- /dev/null +++ b/plugins/autopeering/server/udp/types.go @@ -0,0 +1,13 @@ +package udp + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "net" +) + +type ConnectionPeeringRequestConsumer = func(request *request.Request) + +type ConnectionPeeringResponseConsumer = func(peeringResponse *response.Response) + +type IPErrorConsumer = func(ip net.IP, err error) \ No newline at end of file diff --git a/plugins/statusscreen/logger.go b/plugins/statusscreen/logger.go index 5fd4b29a57af173a1c245901e10e06369b693612..18c3155dbb5a65c3b57cbb3f41a8e42cab09ef28 100644 --- a/plugins/statusscreen/logger.go +++ b/plugins/statusscreen/logger.go @@ -41,4 +41,7 @@ var DEFAULT_LOGGER = &node.Logger{ LogFailure: func(pluginName string, message string) { storeStatusMessage(pluginName, message, node.LOG_LEVEL_FAILURE) }, + LogDebug: func(pluginName string, message string) { + storeStatusMessage(pluginName, message, node.LOG_LEVEL_DEBUG) + }, } diff --git a/plugins/statusscreen/ui_log_entry.go b/plugins/statusscreen/ui_log_entry.go index 0ee917dbefce20730c487e737ecc46c2216ff4d3..94048ede4d144360d57ad695606f355a584f0c5d 100644 --- a/plugins/statusscreen/ui_log_entry.go +++ b/plugins/statusscreen/ui_log_entry.go @@ -48,9 +48,11 @@ func NewUILogEntry(message StatusMessage) *UILogEntry { fmt.Fprintf(logEntry.LogLevelContainer, " [black::d][ [red::d]FAIL [black::d]]") textColor = "red" + case node.LOG_LEVEL_DEBUG: + fmt.Fprintf(logEntry.LogLevelContainer, " [black::d][ [blue::d]NOTE [black::b]]") } - fmt.Fprintf(logEntry.TimeContainer, " [black::b]" + message.Time.Format("01/02/2006 03:04:05 PM")) + fmt.Fprintf(logEntry.TimeContainer, " [black::b]" + message.Time.Format("15:04:05")) if message.Source == "Node" { fmt.Fprintf(logEntry.MessageContainer, "[" + textColor + "::d]" + message.Message) } else { @@ -58,7 +60,7 @@ func NewUILogEntry(message StatusMessage) *UILogEntry { } logEntry.Primitive. - SetColumns(25, 0, 11). + SetColumns(11, 0, 11). SetRows(1). SetBorders(false). AddItem(logEntry.TimeContainer, 0, 0, 1, 1, 0, 0, false).