Skip to content
Snippets Groups Projects
Commit 7729f6a8 authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

Use hive.go/network

parent fa05123e
No related branches found
No related tags found
No related merge requests found
package network
import (
"github.com/iotaledger/hive.go/events"
)
type BufferedConnectionEvents struct {
ReceiveData *events.Event
Close *events.Event
Error *events.Event
}
func dataCaller(handler interface{}, params ...interface{}) {
handler.(func([]byte))(params[0].([]byte))
}
package network
import (
"io"
"net"
"sync"
"time"
"github.com/iotaledger/hive.go/events"
)
type ManagedConnection struct {
Conn net.Conn
Events BufferedConnectionEvents
readTimeout time.Duration
writeTimeout time.Duration
closeOnce sync.Once
}
func NewManagedConnection(conn net.Conn) *ManagedConnection {
bufferedConnection := &ManagedConnection{
Conn: conn,
Events: BufferedConnectionEvents{
ReceiveData: events.NewEvent(dataCaller),
Close: events.NewEvent(events.CallbackCaller),
Error: events.NewEvent(events.ErrorCaller),
},
}
return bufferedConnection
}
func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) {
defer this.Close()
totalReadBytes := 0
for {
if err := this.setReadTimeoutBasedDeadline(); err != nil {
return totalReadBytes, err
}
byteCount, err := this.Conn.Read(receiveBuffer)
if byteCount > 0 {
totalReadBytes += byteCount
receivedData := make([]byte, byteCount)
copy(receivedData, receiveBuffer)
this.Events.ReceiveData.Trigger(receivedData)
}
if err != nil {
if err != io.EOF {
this.Events.Error.Trigger(err)
}
return totalReadBytes, err
}
}
}
func (this *ManagedConnection) Write(data []byte) (n int, err error) {
if err := this.setWriteTimeoutBasedDeadline(); err != nil {
return 0, err
}
return this.Conn.Write(data)
}
func (this *ManagedConnection) Close() error {
err := this.Conn.Close()
if err != nil {
this.Events.Error.Trigger(err)
}
this.closeOnce.Do(func() {
this.Events.Close.Trigger()
})
return err
}
func (this *ManagedConnection) LocalAddr() net.Addr {
return this.Conn.LocalAddr()
}
func (this *ManagedConnection) RemoteAddr() net.Addr {
return this.Conn.RemoteAddr()
}
func (this *ManagedConnection) SetDeadline(t time.Time) error {
return this.Conn.SetDeadline(t)
}
func (this *ManagedConnection) SetReadDeadline(t time.Time) error {
return this.Conn.SetReadDeadline(t)
}
func (this *ManagedConnection) SetWriteDeadline(t time.Time) error {
return this.Conn.SetWriteDeadline(t)
}
func (this *ManagedConnection) SetTimeout(d time.Duration) error {
if err := this.SetReadTimeout(d); err != nil {
return err
}
if err := this.SetWriteTimeout(d); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) SetReadTimeout(d time.Duration) error {
this.readTimeout = d
if err := this.setReadTimeoutBasedDeadline(); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) SetWriteTimeout(d time.Duration) error {
this.writeTimeout = d
if err := this.setWriteTimeoutBasedDeadline(); err != nil {
return err
}
return nil
}
func (this *ManagedConnection) setReadTimeoutBasedDeadline() error {
if this.readTimeout != 0 {
if err := this.Conn.SetReadDeadline(time.Now().Add(this.readTimeout)); err != nil {
return err
}
} else {
if err := this.Conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
}
return nil
}
func (this *ManagedConnection) setWriteTimeoutBasedDeadline() error {
if this.writeTimeout != 0 {
if err := this.Conn.SetWriteDeadline(time.Now().Add(this.writeTimeout)); err != nil {
return err
}
} else {
if err := this.Conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
}
return nil
}
package network
const (
READ_BUFFER_SIZE = 81920
)
package tcp
import (
"net"
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/hive.go/events"
)
type Server struct {
socket net.Listener
socketMutex sync.RWMutex
Events serverEvents
}
func (this *Server) GetSocket() net.Listener {
this.socketMutex.RLock()
defer this.socketMutex.RUnlock()
return this.socket
}
func (this *Server) Shutdown() {
this.socketMutex.Lock()
defer this.socketMutex.Unlock()
if this.socket != nil {
socket := this.socket
this.socket = nil
socket.Close()
}
}
func (this *Server) Listen(port int) *Server {
socket, err := net.Listen("tcp4", "0.0.0.0:"+strconv.Itoa(port))
if err != nil {
this.Events.Error.Trigger(err)
return this
} else {
this.socketMutex.Lock()
this.socket = socket
this.socketMutex.Unlock()
}
this.Events.Start.Trigger()
defer this.Events.Shutdown.Trigger()
for this.GetSocket() != nil {
if socket, err := this.GetSocket().Accept(); err != nil {
if this.GetSocket() != nil {
this.Events.Error.Trigger(err)
}
} else {
peer := network.NewManagedConnection(socket)
go this.Events.Connect.Trigger(peer)
}
}
return this
}
func NewServer() *Server {
return &Server{
Events: serverEvents{
Start: events.NewEvent(events.CallbackCaller),
Shutdown: events.NewEvent(events.CallbackCaller),
Connect: events.NewEvent(managedConnectionCaller),
Error: events.NewEvent(events.ErrorCaller),
},
}
}
package tcp
import (
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/hive.go/events"
)
type serverEvents struct {
Start *events.Event
Shutdown *events.Event
Connect *events.Event
Error *events.Event
}
func managedConnectionCaller(handler interface{}, params ...interface{}) {
handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection))
}
package tcp
import "github.com/iotaledger/goshimmer/packages/network"
type Callback = func()
type ErrorConsumer = func(e error)
type PeerConsumer = func(conn *network.ManagedConnection)
package network
type Callback func()
type ErrorConsumer func(err error)
type DataConsumer func(data []byte)
......@@ -6,7 +6,6 @@ import (
"github.com/iotaledger/goshimmer/packages/autopeering/discover"
"github.com/iotaledger/goshimmer/packages/autopeering/selection"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
......@@ -19,6 +18,7 @@ import (
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil"
)
......
......@@ -5,8 +5,6 @@ import (
"errors"
"math"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/network/tcp"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
......@@ -17,13 +15,15 @@ import (
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node"
)
var (
ErrInvalidPackageHeader = errors.New("invalid package header")
ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
server *tcp.Server
server *tcp.TCPServer
log *logger.Logger
)
......@@ -46,7 +46,7 @@ func Configure(plugin *node.Plugin) {
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
go server.Listen(parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
go server.Listen("0.0.0.0", parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
<-shutdownSignal
Shutdown()
}, shutdown.ShutdownPriorityAnalysis)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment