diff --git a/plugins/autopeering/instances/ownpeer/instance.go b/plugins/autopeering/instances/ownpeer/instance.go index c813077edaf5f2951f4a923044d4f1d4cce1e634..9e3c0cef207b3c6bdf94e1fd693cb67e606070b7 100644 --- a/plugins/autopeering/instances/ownpeer/instance.go +++ b/plugins/autopeering/instances/ownpeer/instance.go @@ -6,6 +6,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/gossip" "net" ) @@ -15,7 +16,7 @@ func Configure(plugin *node.Plugin) { INSTANCE = &peer.Peer{ Identity: accountability.OWN_ID, PeeringPort: uint16(*parameters.PORT.Value), - GossipPort: uint16(*parameters.PORT.Value), + GossipPort: uint16(*gossip.PORT.Value), Address: net.IPv4(0, 0, 0, 0), Salt: saltmanager.PUBLIC_SALT, } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 81f80fe58127575761e30929a93ae53ae36812ec..9717eb36637b9340600cfca9f415d9cd1534ecd8 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -12,6 +12,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" "github.com/iotaledger/goshimmer/plugins/autopeering/server" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "github.com/iotaledger/goshimmer/plugins/gossip" ) var PLUGIN = node.NewPlugin("Auto Peering", configure, run) @@ -37,23 +38,55 @@ func run(plugin *node.Plugin) { func configureLogging(plugin *node.Plugin) { acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { - plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + gossip.AddNeighbor(&gossip.Peer{ + Identity: p.Identity, + Address: p.Address, + Port: p.GossipPort, + }) })) acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { - plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + plugin.LogDebug("accepted neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + gossip.RemoveNeighbor(p.Identity.StringIdentifier) })) chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { - plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + plugin.LogDebug("chosen neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + gossip.AddNeighbor(&gossip.Peer{ + Identity: p.Identity, + Address: p.Address, + Port: p.GossipPort, + }) })) chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) { - plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + plugin.LogDebug("chosen neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + gossip.RemoveNeighbor(p.Identity.StringIdentifier) })) knownpeers.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { - plugin.LogInfo("peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + plugin.LogInfo("new peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists { + gossip.AddNeighbor(&gossip.Peer{ + Identity: p.Identity, + Address: p.Address, + Port: p.GossipPort, + }) + } })) knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + + if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists { + gossip.AddNeighbor(&gossip.Peer{ + Identity: p.Identity, + Address: p.Address, + Port: p.GossipPort, + }) + } })) } diff --git a/plugins/gossip/errors.go b/plugins/gossip/errors.go index d097f25725c3b583ebf020b223f43d5c46e3a293..3790340fc20df1d2db31f03a11dbe9162c2357cb 100644 --- a/plugins/gossip/errors.go +++ b/plugins/gossip/errors.go @@ -3,6 +3,8 @@ package gossip import "github.com/iotaledger/goshimmer/packages/errors" var ( + ErrConnectionFailed = errors.Wrap(errors.New("connection error"), "could not connect to neighbor") ErrInvalidAuthenticationMessage = errors.Wrap(errors.New("protocol error"), "invalid authentication message") + ErrInvalidIdentity = errors.Wrap(errors.New("protocol error"), "invalid identity message") ErrInvalidStateTransition = errors.New("protocol error: invalid state transition message") ) diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go index e08c6813fb81883e391747d2ec15052003d6b34f..faf649765b82b82242f63fa2aec3e2cf5624e162 100644 --- a/plugins/gossip/events.go +++ b/plugins/gossip/events.go @@ -2,17 +2,29 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/transaction" ) var Events = pluginEvents{ - AddNeighbor: events.NewEvent(neighborCaller), - UpdateNeighbor: events.NewEvent(neighborCaller), - RemoveNeighbor: events.NewEvent(neighborCaller), - DropNeighbor: events.NewEvent(neighborCaller), - IncomingConnection: events.NewEvent(errorCaller), - ReceiveTransaction: events.NewEvent(transactionCaller), - Error: events.NewEvent(errorCaller), + // neighbor events + AddNeighbor: events.NewEvent(peerCaller), + UpdateNeighbor: events.NewEvent(peerCaller), + RemoveNeighbor: events.NewEvent(peerCaller), + + // low level network events + IncomingConnection: events.NewEvent(connectionCaller), + + // high level protocol events + DropNeighbor: events.NewEvent(peerCaller), + SendTransaction: events.NewEvent(transactionCaller), + SendTransactionRequest: events.NewEvent(transactionCaller), // TODO + ReceiveTransaction: events.NewEvent(transactionCaller), + ReceiveTransactionRequest: events.NewEvent(transactionCaller), // TODO + ProtocolError: events.NewEvent(transactionCaller), // TODO + + // generic events + Error: events.NewEvent(errorCaller), } type pluginEvents struct { @@ -49,7 +61,9 @@ type protocolEvents struct { func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } -func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) } +func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) } + +func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) } func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index 77b7247d2299be0dcaee226a9fd1c87ad6de3aec..414dd48962acfeaf9b2349dc1bf604dbef40f693 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -1,23 +1,120 @@ package gossip import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/packages/node" "net" + "strconv" "sync" ) +func configureNeighbors(plugin *node.Plugin) { + Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) { + plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port))) + })) + + Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Peer) { + plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port))) + })) + + Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Peer) { + plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port))) + })) +} + +func runNeighbors(plugin *node.Plugin) { + plugin.LogInfo("Starting Neighbor Connection Manager ...") + + neighborLock.RLock() + neighborLock.RUnlock() + + Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) { + manageConnection(plugin, neighbor) + })) + + plugin.LogSuccess("Starting Neighbor Connection Manager ... done") +} + +func manageConnection(plugin *node.Plugin, neighbor *Peer, initiated bool) { + daemon.BackgroundWorker(func() { + failedConnectionAttempts := 0 + + for failedConnectionAttempts < MAX_CONNECTION_ATTEMPTS { + if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists { + return + } else { + if conn, newConnection, err := neighbor.Connect(); err != nil { + failedConnectionAttempts++ + + plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(MAX_CONNECTION_ATTEMPTS) + "] " + err.Error()) + } else { + if newConnection { + newProtocol(conn).init() + } + } + } + } + + RemoveNeighbor(neighbor.Identity.StringIdentifier) + }) +} + type Peer struct { - Identity identity.Identity - Address net.IP - Port uint16 - Conn *network.ManagedConnection + Identity *identity.Identity + Address net.IP + Port uint16 + InitiatedConn *network.ManagedConnection + AcceptedConn *network.ManagedConnection + initiatedConnMutex sync.Mutex + acceptedConnMutex sync.Mutex } func UnmarshalPeer(data []byte) (*Peer, error) { return &Peer{}, nil } +func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) { + // if we already have an accepted connection -> use it instead + if peer.AcceptedConn != nil { + peer.acceptedConnMutex.Lock() + if peer.AcceptedConn != nil { + defer peer.acceptedConnMutex.Unlock() + + return peer.AcceptedConn, false, nil + } + peer.acceptedConnMutex.Unlock() + } + + // otherwise try to dial + peer.initiatedConnMutex.Lock() + defer peer.initiatedConnMutex.Unlock() + + if peer.InitiatedConn != nil { + return peer.InitiatedConn, false, nil + } else { + conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) + if err != nil { + return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+ + peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) + } + + peer.InitiatedConn = network.NewManagedConnection(conn) + + peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() { + peer.initiatedConnMutex.Lock() + defer peer.initiatedConnMutex.Unlock() + + peer.InitiatedConn = nil + })) + + return peer.InitiatedConn, true, nil + } +} + func (peer *Peer) Marshal() []byte { return nil } @@ -29,7 +126,7 @@ func (peer *Peer) Equals(other *Peer) bool { func AddNeighbor(newNeighbor *Peer) { neighborLock.Lock() - defer neighborLock.Lock() + defer neighborLock.Unlock() if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists { neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor @@ -57,11 +154,11 @@ func RemoveNeighbor(identifier string) { } } -func GetNeighbor(neighbor *Peer) (*Peer, bool) { +func GetNeighbor(identifier string) (*Peer, bool) { neighborLock.RLock() defer neighborLock.RUnlock() - neighbor, exists := neighbors[neighbor.Identity.StringIdentifier] + neighbor, exists := neighbors[identifier] return neighbor, exists } @@ -79,6 +176,7 @@ func GetNeighbors() map[string]*Peer { } const ( + MAX_CONNECTION_ATTEMPTS = 5 MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1 ) diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 560c86749ee14fc1dadd70349f5e5e188d0dac65..1f76cf1a20f1ddac207f219e8c0aa6150d657302 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -5,9 +5,11 @@ import "github.com/iotaledger/goshimmer/packages/node" var PLUGIN = node.NewPlugin("Gossip", configure, run) func configure(plugin *node.Plugin) { + configureNeighbors(plugin) configureServer(plugin) } func run(plugin *node.Plugin) { + runNeighbors(plugin) runServer(plugin) } diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index c2f6e8661feaf0192114190b4126624e7a28266c..bc5dbfd83328aef30f94cdc00848457ae74961fa 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -1,8 +1,11 @@ package gossip import ( + "fmt" + "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/network" "strconv" ) @@ -17,31 +20,69 @@ type protocolState interface { // region protocol ///////////////////////////////////////////////////////////////////////////////////////////////////// type protocol struct { + Conn *network.ManagedConnection + Neighbor *Peer + Version int + CurrentState protocolState Events protocolEvents - neighbor *Peer - currentState protocolState } -func newProtocol(neighbor *Peer) *protocol { +func newProtocol(conn *network.ManagedConnection) *protocol { protocol := &protocol{ + Conn: conn, + CurrentState: &versionState{}, Events: protocolEvents{ - ReceiveVersion: events.NewEvent(intCaller), + ReceiveVersion: events.NewEvent(intCaller), + ReceiveIdentification: events.NewEvent(peerCaller), }, - neighbor: neighbor, - currentState: &versionState{}, } return protocol } +func (protocol *protocol) init() { + var onClose, onReceiveData *events.Closure + + fmt.Println("INIT") + + onReceiveData = events.NewClosure(protocol.parseData) + onClose = events.NewClosure(func() { + protocol.Conn.Events.ReceiveData.Detach(onReceiveData) + protocol.Conn.Events.Close.Detach(onClose) + }) + + protocol.Conn.Events.ReceiveData.Attach(onReceiveData) + protocol.Conn.Events.Close.Attach(onClose) + protocol.Events.ReceiveVersion.Attach(events.NewClosure(func(version int) { + fmt.Println(version) + })) + protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(neighbor *Peer) { + fmt.Println(neighbor) + })) + + protocol.Conn.Write([]byte{1}) + fmt.Println("SENT VERSION") + protocol.Conn.Write(accountability.OWN_ID.Identifier) + + fmt.Println(len(accountability.OWN_ID.Identifier)) + + if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil { + protocol.Conn.Write(signature) + fmt.Println(len(signature)) + } + fmt.Println("SENTSIGNATURE") + + go protocol.Conn.Read(make([]byte, 1000)) +} + func (protocol *protocol) parseData(data []byte) { offset := 0 length := len(data) - for offset < length && protocol.currentState != nil { - if readBytes, err := protocol.currentState.Consume(protocol, data, offset, length); err != nil { + for offset < length && protocol.CurrentState != nil { + if readBytes, err := protocol.CurrentState.Consume(protocol, data, offset, length); err != nil { Events.Error.Trigger(err) - protocol.neighbor.Conn.Close() + protocol.Neighbor.InitiatedConn.Close() return } else { @@ -59,9 +100,10 @@ type versionState struct{} func (state *versionState) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[offset] { case 1: + protocol.Version = 1 protocol.Events.ReceiveVersion.Trigger(1) - protocol.currentState = newIndentificationStateV1() + protocol.CurrentState = newIndentificationStateV1() return 1, nil diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index 72a418ab178f993becc9e62177bf2123265bed1c..c033775c74d579f65de4b7ecdfb715352804a01f 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -1,8 +1,10 @@ package gossip import ( + "bytes" "github.com/iotaledger/goshimmer/packages/byteutils" "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/transaction" "strconv" ) @@ -16,7 +18,7 @@ type indentificationStateV1 struct { func newIndentificationStateV1() *indentificationStateV1 { return &indentificationStateV1{ - buffer: make([]byte, MARSHALLED_NEIGHBOR_TOTAL_SIZE), + buffer: make([]byte, MARSHALLED_IDENTITY_TOTAL_SIZE), offset: 0, } } @@ -25,16 +27,25 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) state.offset += bytesRead - if state.offset == MARSHALLED_NEIGHBOR_TOTAL_SIZE { - if unmarshalledNeighbor, err := UnmarshalPeer(state.buffer); err != nil { + if state.offset == MARSHALLED_IDENTITY_TOTAL_SIZE { + if receivedIdentity, err := unmarshalIdentity(state.buffer); err != nil { return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message") } else { - protocol.neighbor.Identity = unmarshalledNeighbor.Identity - protocol.neighbor.Port = unmarshalledNeighbor.Port + if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists { + neighbor.initiatedConnMutex.Lock() + if neighbor.InitiatedConn == nil { + neighbor.InitiatedConn = protocol.Conn + } + neighbor.initiatedConnMutex.Unlock() - protocol.Events.ReceiveIdentification.Trigger(protocol.neighbor) + protocol.Neighbor = neighbor + } else { + protocol.Neighbor = nil + } - protocol.currentState = newacceptanceStateV1() + protocol.Events.ReceiveIdentification.Trigger(protocol.Neighbor) + + protocol.CurrentState = newacceptanceStateV1() state.offset = 0 } } @@ -42,6 +53,20 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of return bytesRead, nil } +func unmarshalIdentity(data []byte) (*identity.Identity, error) { + identifier := data[MARSHALLED_IDENTITY_START:MARSHALLED_IDENTITY_END] + + if restoredIdentity, err := identity.FromSignedData(identifier, data[MARSHALLED_IDENTITY_SIGNATURE_START:MARSHALLED_IDENTITY_SIGNATURE_END]); err != nil { + return nil, err + } else { + if bytes.Equal(identifier, restoredIdentity.Identifier) { + return restoredIdentity, nil + } else { + return nil, errors.New("signature does not match claimed identity") + } + } +} + //endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// //region acceptanceStateV1 ///////////////////////////////////////////////////////////////////////////////////////////// @@ -57,14 +82,16 @@ func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset case 0: protocol.Events.RejectConnection.Trigger() - protocol.neighbor.Conn.Close() - protocol.currentState = nil + RemoveNeighbor(protocol.Neighbor.Identity.StringIdentifier) + + protocol.Neighbor.InitiatedConn.Close() + protocol.CurrentState = nil break case 1: protocol.Events.AcceptConnection.Trigger() - protocol.currentState = newDispatchStateV1() + protocol.CurrentState = newDispatchStateV1() break default: @@ -89,15 +116,15 @@ func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset in case 0: protocol.Events.RejectConnection.Trigger() - protocol.neighbor.Conn.Close() - protocol.currentState = nil + protocol.Neighbor.InitiatedConn.Close() + protocol.CurrentState = nil case 1: - protocol.currentState = newTransactionStateV1() + protocol.CurrentState = newTransactionStateV1() break case 2: - protocol.currentState = newRequestStateV1() + protocol.CurrentState = newRequestStateV1() break default: @@ -136,7 +163,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) }() - protocol.currentState = newDispatchStateV1() + protocol.CurrentState = newDispatchStateV1() state.offset = 0 } @@ -164,3 +191,16 @@ func (state *requestStateV1) Consume(protocol *protocol, data []byte, offset int } //endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// + +const ( + MARSHALLED_IDENTITY_START = 0 + MARSHALLED_IDENTITY_SIGNATURE_START = MARSHALLED_IDENTITY_END + + MARSHALLED_IDENTITY_SIZE = 20 + MARSHALLED_IDENTITY_SIGNATURE_SIZE = 65 + + MARSHALLED_IDENTITY_END = MARSHALLED_IDENTITY_START + MARSHALLED_IDENTITY_SIZE + MARSHALLED_IDENTITY_SIGNATURE_END = MARSHALLED_IDENTITY_SIGNATURE_START + MARSHALLED_IDENTITY_SIGNATURE_SIZE + + MARSHALLED_IDENTITY_TOTAL_SIZE = MARSHALLED_IDENTITY_SIGNATURE_END +) \ No newline at end of file diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 6595dc923d6e093ebc2e6212a9a2929b563c7c42..1002389bb5e2d96488727875f869836444e3ef9f 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -6,7 +6,6 @@ import ( "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" - "net" "strconv" ) @@ -14,26 +13,7 @@ var TCPServer = tcp.NewServer() func configureServer(plugin *node.Plugin) { TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) { - neighbor := &Peer{ - Address: conn.RemoteAddr().(*net.TCPAddr).IP, - } - - protocol := newProtocol(neighbor) - - var onClose, onReceiveData *events.Closure - - onReceiveData = events.NewClosure(func(data []byte) { - protocol.parseData(data) - }) - onClose = events.NewClosure(func() { - conn.Events.ReceiveData.Detach(onReceiveData) - conn.Events.Close.Detach(onClose) - }) - - conn.Events.ReceiveData.Attach(onReceiveData) - conn.Events.Close.Attach(onClose) - - go conn.Read(make([]byte, 1000)) + newProtocol(conn).init() })) daemon.Events.Shutdown.Attach(events.NewClosure(func() {