Skip to content
Snippets Groups Projects
Commit ef3791e6 authored by Hans Moog's avatar Hans Moog
Browse files

Refactor: refactored the autopeering package

parent c9c372b1
No related branches found
No related tags found
No related merge requests found
Showing
with 281 additions and 180 deletions
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/identity"
) )
var OWN_ID *identity.Identity var OWN_ID = getIdentity()
func generateNewIdentity() *identity.Identity { func generateNewIdentity() *identity.Identity {
newIdentity := identity.GenerateRandomIdentity() newIdentity := identity.GenerateRandomIdentity()
...@@ -43,7 +43,3 @@ func getIdentity() *identity.Identity { ...@@ -43,7 +43,3 @@ func getIdentity() *identity.Identity {
return identity.NewIdentity(publicKey, privateKey) return identity.NewIdentity(publicKey, privateKey)
} }
func init() {
OWN_ID = getIdentity()
}
\ No newline at end of file
package network
import "reflect"
type BufferedConnectionEvents struct {
ReceiveData *dataEvent
Close *callbackEvent
Error *errorEvent
}
type callbackEvent struct {
callbacks map[uintptr]Callback
}
func (this *callbackEvent) Attach(callback Callback) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *callbackEvent) Detach(callback Callback) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *callbackEvent) Trigger() {
for _, callback := range this.callbacks {
callback()
}
}
type errorEvent struct {
callbacks map[uintptr]ErrorConsumer
}
func (this *errorEvent) Attach(callback ErrorConsumer) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *errorEvent) Detach(callback ErrorConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *errorEvent) Trigger(err error) {
for _, callback := range this.callbacks {
callback(err)
}
}
type dataEvent struct {
callbacks map[uintptr]DataConsumer
}
func (this *dataEvent) Attach(callback DataConsumer) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *dataEvent) Detach(callback ErrorConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *dataEvent) Trigger(data []byte) {
for _, callback := range this.callbacks {
callback(data)
}
}
package network
import (
"net"
"time"
)
type Connection interface {
GetProtocol() string
GetConnection() net.Conn
Write(data []byte)
OnReceiveData(callback DataConsumer) Connection
OnDisconnect(callback Callback) Connection
OnError(callback ErrorConsumer) Connection
TriggerReceiveData(data []byte) Connection
TriggerDisconnect() Connection
TriggerError(err error) Connection
SetTimeout(duration time.Duration) Connection
HandleConnection()
}
type Callback func()
type ErrorConsumer func(err error)
type DataConsumer func(data []byte)
package network
import (
"io"
"net"
"sync"
"time"
)
type ManagedConnection struct {
Conn net.Conn
Events BufferedConnectionEvents
readTimeout time.Duration
writeTimeout time.Duration
closeOnce sync.Once
}
func NewManagedConnection(conn net.Conn) *ManagedConnection {
bufferedConnection := &ManagedConnection{
Conn: conn,
Events: BufferedConnectionEvents{
ReceiveData: &dataEvent{make(map[uintptr]DataConsumer)},
Close: &callbackEvent{make(map[uintptr]Callback)},
Error: &errorEvent{make(map[uintptr]ErrorConsumer)},
},
}
return bufferedConnection
}
func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) {
defer this.Close()
totalReadBytes := 0
for {
if err := this.setReadTimeoutBasedDeadline(); err != nil {
return totalReadBytes, err
}
byteCount, err := this.Conn.Read(receiveBuffer)
if err != nil {
if err != io.EOF {
this.Events.Error.Trigger(err)
}
return totalReadBytes, err
}
totalReadBytes += byteCount
receivedData := make([]byte, byteCount)
copy(receivedData, receiveBuffer)
this.Events.ReceiveData.Trigger(receivedData)
}
}
func (this *ManagedConnection) Write(data []byte) (n int, err error) {
if err := this.setWriteTimeoutBasedDeadline(); err != nil {
return 0, err
}
return this.Conn.Write(data)
}
func (this *ManagedConnection) Close() error {
err := this.Conn.Close()
if err != nil {
this.Events.Error.Trigger(err)
}
this.closeOnce.Do(this.Events.Close.Trigger)
return err
}
func (this *ManagedConnection) LocalAddr() net.Addr {
return this.Conn.LocalAddr()
}
func (this *ManagedConnection) RemoteAddr() net.Addr {
return this.Conn.RemoteAddr()
}
func (this *ManagedConnection) SetDeadline(t time.Time) error {
return this.Conn.SetDeadline(t)
}
func (this *ManagedConnection) SetReadDeadline(t time.Time) error {
return this.Conn.SetReadDeadline(t)
}
func (this *ManagedConnection) SetWriteDeadline(t time.Time) error {
return this.Conn.SetWriteDeadline(t)
}
func (this *ManagedConnection) SetTimeout(d time.Duration) error {
if err := this.SetReadTimeout(d); err != nil {
return err
}
if err := this.SetWriteTimeout(d); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) SetReadTimeout(d time.Duration) error {
this.readTimeout = d
if err := this.setReadTimeoutBasedDeadline(); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) SetWriteTimeout(d time.Duration) error {
this.writeTimeout = d
if err := this.setWriteTimeoutBasedDeadline(); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) setReadTimeoutBasedDeadline() error {
if this.readTimeout != 0 {
if err := this.Conn.SetReadDeadline(time.Now().Add(this.readTimeout)); err != nil {
return err
}
} else {
if err := this.Conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
}
return nil
}
func (this *ManagedConnection) setWriteTimeoutBasedDeadline() error {
if this.writeTimeout != 0 {
if err := this.Conn.SetWriteDeadline(time.Now().Add(this.writeTimeout)); err != nil {
return err
}
} else {
if err := this.Conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
}
return nil
}
package network
import (
"io"
"net"
"time"
)
type peerImplementation struct {
timeout time.Duration
protocol string
conn net.Conn
receiveDataHandlers []DataConsumer
disconnectHandlers []Callback
errorHandlers []ErrorConsumer
}
func NewPeer(protocol string, conn net.Conn) Connection {
this := &peerImplementation{
protocol: protocol,
conn: conn,
receiveDataHandlers: make([]DataConsumer, 0),
disconnectHandlers: make([]Callback, 0),
errorHandlers: make([]ErrorConsumer, 0),
}
return this
}
func (this *peerImplementation) SetTimeout(duration time.Duration) Connection {
this.timeout = duration
//this.conn.SetDeadline(time.Now().Add(this.timeout))
return this
}
func (this *peerImplementation) GetProtocol() string {
return this.protocol
}
func (this *peerImplementation) GetConnection() net.Conn {
return this.conn
}
func (this *peerImplementation) Write(data []byte) {
//this.conn.SetDeadline(time.Now().Add(this.timeout))
if _, err := this.conn.Write(data); err != nil {
this.TriggerError(err)
}
}
func (this *peerImplementation) OnReceiveData(callback DataConsumer) Connection {
this.receiveDataHandlers = append(this.receiveDataHandlers, callback)
return this
}
func (this *peerImplementation) OnDisconnect(callback Callback) Connection {
this.disconnectHandlers = append(this.disconnectHandlers, callback)
return this
}
func (this *peerImplementation) OnError(callback ErrorConsumer) Connection {
this.errorHandlers = append(this.errorHandlers, callback)
return this
}
func (this *peerImplementation) TriggerReceiveData(data []byte) Connection {
for _, receiveDataHandler := range this.receiveDataHandlers {
receiveDataHandler(data)
}
return this
}
func (this *peerImplementation) TriggerDisconnect() Connection {
for _, disconnectHandler := range this.disconnectHandlers {
disconnectHandler()
}
return this
}
func (this *peerImplementation) TriggerError(err error) Connection {
for _, errorHandler := range this.errorHandlers {
errorHandler(err)
}
return this
}
func (this *peerImplementation) HandleConnection() {
defer this.conn.Close()
defer this.TriggerDisconnect()
receiveBuffer := make([]byte, READ_BUFFER_SIZE)
for {
//this.conn.SetDeadline(time.Now().Add(this.timeout))
byteCount, err := this.conn.Read(receiveBuffer)
if err != nil {
if err != io.EOF {
this.TriggerError(err)
}
return
}
receivedData := make([]byte, byteCount)
copy(receivedData, receiveBuffer)
this.TriggerReceiveData(receivedData)
}
}
...@@ -39,7 +39,7 @@ func (this *Server) Listen(port int) *Server { ...@@ -39,7 +39,7 @@ func (this *Server) Listen(port int) *Server {
this.Events.Error.Trigger(err) this.Events.Error.Trigger(err)
} }
} else { } else {
peer := network.NewPeer("tcp", socket) peer := network.NewManagedConnection(socket)
go this.Events.Connect.Trigger(peer) go this.Events.Connect.Trigger(peer)
} }
......
...@@ -70,9 +70,9 @@ func (this *peerConsumerEvent) Detach(callback PeerConsumer) { ...@@ -70,9 +70,9 @@ func (this *peerConsumerEvent) Detach(callback PeerConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer()) delete(this.callbacks, reflect.ValueOf(callback).Pointer())
} }
func (this *peerConsumerEvent) Trigger(peer network.Connection) { func (this *peerConsumerEvent) Trigger(conn *network.ManagedConnection) {
for _, callback := range this.callbacks { for _, callback := range this.callbacks {
callback(peer) callback(conn)
} }
} }
......
...@@ -6,4 +6,4 @@ type Callback = func() ...@@ -6,4 +6,4 @@ type Callback = func()
type ErrorConsumer = func(e error) type ErrorConsumer = func(e error)
type PeerConsumer = func(peer network.Connection) type PeerConsumer = func(conn *network.ManagedConnection)
package network
type Callback func()
type ErrorConsumer func(err error)
type DataConsumer func(data []byte)
...@@ -2,6 +2,7 @@ package udp ...@@ -2,6 +2,7 @@ package udp
import ( import (
"net" "net"
"strconv"
) )
type serverEvents struct { type serverEvents struct {
...@@ -12,8 +13,9 @@ type serverEvents struct { ...@@ -12,8 +13,9 @@ type serverEvents struct {
} }
type Server struct { type Server struct {
Socket *net.UDPConn Socket net.PacketConn
Events serverEvents ReceiveBufferSize int
Events serverEvents
} }
func (this *Server) Shutdown() { func (this *Server) Shutdown() {
...@@ -26,10 +28,7 @@ func (this *Server) Shutdown() { ...@@ -26,10 +28,7 @@ func (this *Server) Shutdown() {
} }
func (this *Server) Listen(address string, port int) { func (this *Server) Listen(address string, port int) {
if socket, err := net.ListenUDP("udp", &net.UDPAddr{ if socket, err := net.ListenPacket("udp", address + ":" + strconv.Itoa(port)); err != nil {
Port: port,
IP: net.ParseIP(address),
}); err != nil {
this.Events.Error.Trigger(err) this.Events.Error.Trigger(err)
return return
...@@ -40,20 +39,21 @@ func (this *Server) Listen(address string, port int) { ...@@ -40,20 +39,21 @@ func (this *Server) Listen(address string, port int) {
this.Events.Start.Trigger() this.Events.Start.Trigger()
defer this.Events.Shutdown.Trigger() defer this.Events.Shutdown.Trigger()
buf := make([]byte, 1500) buf := make([]byte, this.ReceiveBufferSize)
for this.Socket != nil { for this.Socket != nil {
if bytesRead, addr, err := this.Socket.ReadFromUDP(buf); err != nil { if bytesRead, addr, err := this.Socket.ReadFrom(buf); err != nil {
if this.Socket != nil { if this.Socket != nil {
this.Events.Error.Trigger(err) this.Events.Error.Trigger(err)
} }
} else { } else {
this.Events.ReceiveData.Trigger(addr, buf[:bytesRead]) this.Events.ReceiveData.Trigger(addr.(*net.UDPAddr), buf[:bytesRead])
} }
} }
} }
func NewServer() *Server { func NewServer(receiveBufferSize int) *Server {
return &Server{ return &Server{
ReceiveBufferSize: receiveBufferSize,
Events: serverEvents{ Events: serverEvents{
Start: &callbackEvent{make(map[uintptr]Callback)}, Start: &callbackEvent{make(map[uintptr]Callback)},
Shutdown: &callbackEvent{make(map[uintptr]Callback)}, Shutdown: &callbackEvent{make(map[uintptr]Callback)},
......
...@@ -5,4 +5,5 @@ const ( ...@@ -5,4 +5,5 @@ const (
LOG_LEVEL_WARNING = 1 LOG_LEVEL_WARNING = 1
LOG_LEVEL_SUCCESS = 2 LOG_LEVEL_SUCCESS = 2
LOG_LEVEL_INFO = 3 LOG_LEVEL_INFO = 3
LOG_LEVEL_DEBUG = 4
) )
...@@ -8,6 +8,7 @@ type Logger struct { ...@@ -8,6 +8,7 @@ type Logger struct {
LogSuccess func(pluginName string, message string) LogSuccess func(pluginName string, message string)
LogWarning func(pluginName string, message string) LogWarning func(pluginName string, message string)
LogFailure func(pluginName string, message string) LogFailure func(pluginName string, message string)
LogDebug func(pluginName string, message string)
} }
func pluginPrefix(pluginName string) string { func pluginPrefix(pluginName string) string {
...@@ -35,4 +36,7 @@ var DEFAULT_LOGGER = &Logger{ ...@@ -35,4 +36,7 @@ var DEFAULT_LOGGER = &Logger{
LogFailure: func(pluginName string, message string) { LogFailure: func(pluginName string, message string) {
fmt.Println("[ FAIL ] " + pluginPrefix(pluginName) + message) fmt.Println("[ FAIL ] " + pluginPrefix(pluginName) + message)
}, },
LogDebug: func(pluginName string, message string) {
fmt.Println("[ NOTE ] " + pluginPrefix(pluginName) + message)
},
} }
...@@ -73,6 +73,16 @@ func (node *Node) LogInfo(pluginName string, message string) { ...@@ -73,6 +73,16 @@ func (node *Node) LogInfo(pluginName string, message string) {
} }
} }
func (node *Node) LogDebug(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_DEBUG {
for _, logger := range node.loggers {
if logger.Enabled {
logger.LogDebug(pluginName, message)
}
}
}
}
func (node *Node) LogWarning(pluginName string, message string) { func (node *Node) LogWarning(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_WARNING { if node.logLevel >= LOG_LEVEL_WARNING {
for _, logger := range node.loggers { for _, logger := range node.loggers {
...@@ -98,9 +108,6 @@ func (node *Node) Load(plugins ...*Plugin) { ...@@ -98,9 +108,6 @@ func (node *Node) Load(plugins ...*Plugin) {
if len(plugins) >= 1 { if len(plugins) >= 1 {
for _, plugin := range plugins { for _, plugin := range plugins {
fmt.Println(*DISABLE_PLUGINS.Value)
fmt.Println(disabledPlugins)
fmt.Println(strings.ToLower(strings.Replace(plugin.Name, " ", "", -1)))
if _, exists := disabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists { if _, exists := disabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists {
plugin.wg = node.wg plugin.wg = node.wg
plugin.Node = node plugin.Node = node
......
...@@ -3,6 +3,6 @@ package node ...@@ -3,6 +3,6 @@ package node
import "github.com/iotaledger/goshimmer/packages/parameter" import "github.com/iotaledger/goshimmer/packages/parameter"
var ( var (
LOG_LEVEL = parameter.AddInt("NODE/LOG_LEVEL", LOG_LEVEL_SUCCESS, "controls the log types that are shown") LOG_LEVEL = parameter.AddInt("NODE/LOG_LEVEL", LOG_LEVEL_INFO, "controls the log types that are shown")
DISABLE_PLUGINS = parameter.AddString("NODE/DISABLE_PLUGINS", "", "a list of plugins that shall be disabled") DISABLE_PLUGINS = parameter.AddString("NODE/DISABLE_PLUGINS", "", "a list of plugins that shall be disabled")
) )
...@@ -49,3 +49,7 @@ func (plugin *Plugin) LogWarning(message string) { ...@@ -49,3 +49,7 @@ func (plugin *Plugin) LogWarning(message string) {
func (plugin *Plugin) LogFailure(message string) { func (plugin *Plugin) LogFailure(message string) {
plugin.Node.LogFailure(plugin.Name, message) plugin.Node.LogFailure(plugin.Name, message)
} }
func (plugin *Plugin) LogDebug(message string) {
plugin.Node.LogDebug(plugin.Name, message)
}
...@@ -4,6 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter" ...@@ -4,6 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter"
var ( var (
ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests") ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests")
UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests")
ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering") ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering")
TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests")
UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests")
) )
package peermanager
import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)}
package peermanager
import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)}
package peermanager package peermanager
import ( import (
"github.com/iotaledger/goshimmer/packages/identity"
"time" "time"
) )
const ( const (
FIND_NEIGHBOR_INTERVAL = 5 * time.Second FIND_NEIGHBOR_INTERVAL = 5 * time.Second
) )
var UNKNOWN_IDENTITY = identity.GenerateRandomIdentity()
...@@ -3,21 +3,24 @@ package peermanager ...@@ -3,21 +3,24 @@ package peermanager
import ( import (
"github.com/iotaledger/goshimmer/plugins/autopeering/parameters" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"net" "net"
"strconv" "strconv"
"strings" "strings"
) )
func getEntryNodes() []*protocol.Peer { var ENTRY_NODES = parseEntryNodes()
result := make([]*protocol.Peer, 0)
func parseEntryNodes() []*peer.Peer {
result := make([]*peer.Peer, 0)
for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) { for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) {
if entryNodeDefinition == "" { if entryNodeDefinition == "" {
continue continue
} }
entryNode := &protocol.Peer{ entryNode := &peer.Peer{
Identity: UNKNOWN_IDENTITY, Identity: nil,
} }
protocolBits := strings.Split(entryNodeDefinition, "://") protocolBits := strings.Split(entryNodeDefinition, "://")
...@@ -26,9 +29,9 @@ func getEntryNodes() []*protocol.Peer { ...@@ -26,9 +29,9 @@ func getEntryNodes() []*protocol.Peer {
} }
switch protocolBits[0] { switch protocolBits[0] {
case "tcp": case "tcp":
entryNode.PeeringProtocolType = protocol.TCP_PROTOCOL entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_TCP
case "udp": case "udp":
entryNode.PeeringProtocolType = protocol.UDP_PROTOCOL entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_UDP
} }
addressBits := strings.Split(protocolBits[1], ":") addressBits := strings.Split(protocolBits[1], ":")
...@@ -70,5 +73,3 @@ func getEntryNodes() []*protocol.Peer { ...@@ -70,5 +73,3 @@ func getEntryNodes() []*protocol.Peer {
return result return result
} }
var ENTRY_NODES = getEntryNodes()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment