Skip to content
Snippets Groups Projects
Unverified Commit 6edb4a48 authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Develop.merge binary (#263)


* Feat: started to merge changes

* Refactor: moved parameter package to be a plugin (same with logger)

* Feat: first compiling version of new ontologoies merge

* Feat: ported additional plugins

* Feat: transaction get solid now

* Refactor: reverted some previous changes from debugging

* Feat: added a banner module for the cli interface

* Feat: added a plugin for the port checks

* Feat: fixed some bugs

* Refactor: reverted some changes

* Feat: reworked TransactionParser to use Errors

* Feat: TransactionParser uses Peer

* Feat: started to rework broadCastData webapi call

* Feat: refactored some plugins

* Fix: fixed test of tangle

* Refactor: changed tangle package in graph plugin

* Refactor: uncommented broken code

* Fix: fixed broken method signature in gossip test

* Feat: started adding value tangle

* Feat: adjusted to new hive.go

* Feat: upgraded hive.go

* Clean up PortCheck plugin and make it standalone #259 (#271)

Co-authored-by: default avatarHans Moog <hm@mkjc.net>

Co-authored-by: default avatarJonas Theis <mail@jonastheis.de>
parent 7f238124
Branches
Tags
No related merge requests found
Showing
with 348 additions and 1142 deletions
package graph package graph
import ( import (
"github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/banner"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
) )
...@@ -18,5 +18,5 @@ func init() { ...@@ -18,5 +18,5 @@ func init() {
flag.String(CFG_SOCKET_IO, "socket.io-client/dist/socket.io.js", "Path to socket.io.js") flag.String(CFG_SOCKET_IO, "socket.io-client/dist/socket.io.js", "Path to socket.io.js")
flag.String(CFG_DOMAIN, "", "Set the domain on which IOTA Tangle Visualiser is served") flag.String(CFG_DOMAIN, "", "Set the domain on which IOTA Tangle Visualiser is served")
flag.String(CFG_BIND_ADDRESS, "127.0.0.1:8082", "the bind address for the IOTA Tangle Visualizer") flag.String(CFG_BIND_ADDRESS, "127.0.0.1:8082", "the bind address for the IOTA Tangle Visualizer")
flag.String(CFG_NETWORK, cli.AppName, "Name of the network shown in IOTA Tangle Visualiser") flag.String(CFG_NETWORK, banner.AppName, "Name of the network shown in IOTA Tangle Visualiser")
} }
...@@ -5,12 +5,14 @@ import ( ...@@ -5,12 +5,14 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/goshimmer/plugins/tangle"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/shutdown"
engineio "github.com/googollee/go-engine.io" engineio "github.com/googollee/go-engine.io"
"github.com/googollee/go-engine.io/transport" "github.com/googollee/go-engine.io/transport"
"github.com/googollee/go-engine.io/transport/polling" "github.com/googollee/go-engine.io/transport/polling"
...@@ -39,7 +41,7 @@ var ( ...@@ -39,7 +41,7 @@ var (
) )
func downloadSocketIOHandler(w http.ResponseWriter, r *http.Request) { func downloadSocketIOHandler(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, parameter.NodeConfig.GetString(CFG_SOCKET_IO)) http.ServeFile(w, r, config.Node.GetString(CFG_SOCKET_IO))
} }
func configureSocketIOServer() error { func configureSocketIOServer() error {
...@@ -72,11 +74,11 @@ func configure(plugin *node.Plugin) { ...@@ -72,11 +74,11 @@ func configure(plugin *node.Plugin) {
// socket.io and web server // socket.io and web server
server = &http.Server{ server = &http.Server{
Addr: parameter.NodeConfig.GetString(CFG_BIND_ADDRESS), Addr: config.Node.GetString(CFG_BIND_ADDRESS),
Handler: router, Handler: router,
} }
fs := http.FileServer(http.Dir(parameter.NodeConfig.GetString(CFG_WEBROOT))) fs := http.FileServer(http.Dir(config.Node.GetString(CFG_WEBROOT)))
if err := configureSocketIOServer(); err != nil { if err := configureSocketIOServer(); err != nil {
log.Panicf("Graph: %v", err.Error()) log.Panicf("Graph: %v", err.Error())
...@@ -100,10 +102,10 @@ func run(*node.Plugin) { ...@@ -100,10 +102,10 @@ func run(*node.Plugin) {
daemon.BackgroundWorker("Graph[NewTxWorker]", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("Graph[NewTxWorker]", func(shutdownSignal <-chan struct{}) {
log.Info("Starting Graph[NewTxWorker] ... done") log.Info("Starting Graph[NewTxWorker] ... done")
tangle.Events.TransactionStored.Attach(notifyNewTx) tangle.Instance.Events.TransactionAttached.Attach(notifyNewTx)
newTxWorkerPool.Start() newTxWorkerPool.Start()
<-shutdownSignal <-shutdownSignal
tangle.Events.TransactionStored.Detach(notifyNewTx) tangle.Instance.Events.TransactionAttached.Detach(notifyNewTx)
newTxWorkerPool.Stop() newTxWorkerPool.Stop()
log.Info("Stopping Graph[NewTxWorker] ... done") log.Info("Stopping Graph[NewTxWorker] ... done")
}, shutdown.ShutdownPriorityGraph) }, shutdown.ShutdownPriorityGraph)
...@@ -113,7 +115,7 @@ func run(*node.Plugin) { ...@@ -113,7 +115,7 @@ func run(*node.Plugin) {
stopped := make(chan struct{}) stopped := make(chan struct{})
go func() { go func() {
log.Infof("You can now access IOTA Tangle Visualiser using: http://%s", parameter.NodeConfig.GetString(CFG_BIND_ADDRESS)) log.Infof("You can now access IOTA Tangle Visualiser using: http://%s", config.Node.GetString(CFG_BIND_ADDRESS))
if err := server.ListenAndServe(); err != nil { if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) { if !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error serving: %s", err) log.Errorf("Error serving: %s", err)
......
package logger
import (
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// define the plugin as a placeholder, so the init methods get executed accordingly
var PLUGIN = node.NewPlugin("Logger", node.Enabled)
func Init() {
PLUGIN.Events.Init.Trigger(PLUGIN)
}
func init() {
PLUGIN.Events.Init.Attach(events.NewClosure(func(*node.Plugin) {
if err := logger.InitGlobalLogger(config.Node); err != nil {
panic(err)
}
}))
}
...@@ -3,19 +3,27 @@ package metrics ...@@ -3,19 +3,27 @@ package metrics
import ( import (
"time" "time"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil" "github.com/iotaledger/hive.go/timeutil"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/tangle"
) )
var PLUGIN = node.NewPlugin("Metrics", node.Enabled, configure, run) var PLUGIN = node.NewPlugin("Metrics", node.Enabled, configure, run)
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
// increase received TPS counter whenever we receive a new transaction // increase received TPS counter whenever we receive a new transaction
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(_ *gossip.TransactionReceivedEvent) { increaseReceivedTPSCounter() })) tangle.Instance.Events.TransactionAttached.Attach(events.NewClosure(func(transaction *transaction.CachedTransaction, metadata *transactionmetadata.CachedTransactionMetadata) {
transaction.Release()
metadata.Release()
increaseReceivedTPSCounter()
}))
} }
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
......
package portcheck
import (
"net"
"sync"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/goshimmer/packages/netutil"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
const (
PLUGIN_NAME = "PortCheck"
)
var (
PLUGIN = node.NewPlugin(PLUGIN_NAME, node.Enabled, configure, run)
log *logger.Logger
)
func configure(plugin *node.Plugin) {
log = logger.NewLogger(PLUGIN_NAME)
}
func run(ctx *node.Plugin) {
if !node.IsSkipped(autopeering.PLUGIN) {
log.Info("Testing autopeering service ...")
checkAutopeeringConnection()
log.Info("Testing autopeering service ... done")
}
if !node.IsSkipped(gossip.PLUGIN) {
log.Info("Testing gossip service ...")
checkGossipConnection()
log.Info("Testing gossip service ... done")
}
}
// check that discovery is working and the port is open
func checkAutopeeringConnection() {
peering := local.GetInstance().Services().Get(service.PeeringKey)
// use the port of the peering service
_, peeringPort, err := net.SplitHostPort(peering.String())
if err != nil {
panic(err)
}
// resolve the bind address
address := net.JoinHostPort(config.Node.GetString(local.CFG_BIND), peeringPort)
localAddr, err := net.ResolveUDPAddr(peering.Network(), address)
if err != nil {
log.Fatalf("Error resolving %s: %v", local.CFG_BIND, err)
}
remoteAddr, err := net.ResolveUDPAddr(peering.Network(), peering.String())
if err != nil {
panic(err)
}
// do not check address and port as a NAT may change them for local connections
err = netutil.CheckUDP(localAddr, remoteAddr, false, false)
if err != nil {
log.Errorf("Error testing autopeering service: %s", err)
log.Panicf("Please check that %s is publicly reachable at %s/%s",
banner.AppName, peering.String(), peering.Network())
}
}
// check that the gossip server is working and the port is open
func checkGossipConnection() {
// listen on TCP gossip port
lPeer := local.GetInstance()
// use the port of the gossip service
gossipAddr := lPeer.Services().Get(service.GossipKey)
_, gossipPort, err := net.SplitHostPort(gossipAddr.String())
if err != nil {
panic(err)
}
// resolve the bind address
address := net.JoinHostPort(config.Node.GetString(local.CFG_BIND), gossipPort)
localAddr, err := net.ResolveTCPAddr(gossipAddr.Network(), address)
if err != nil {
log.Fatalf("Error resolving %s: %v", local.CFG_BIND, err)
}
listener, err := net.ListenTCP(gossipAddr.Network(), localAddr)
if err != nil {
log.Fatalf("Error listening: %v", err)
}
defer listener.Close()
srv := server.ServeTCP(lPeer, listener, log)
defer srv.Close()
// do the actual check
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
conn, acceptErr := srv.AcceptPeer(&lPeer.Peer)
if acceptErr != nil {
return
}
_ = conn.Close()
}()
conn, err := srv.DialPeer(&lPeer.Peer)
if err != nil {
log.Errorf("Error testing: %s", err)
log.Panicf("Please check that %s is publicly reachable at %s/%s",
banner.AppName, gossipAddr.String(), gossipAddr.Network())
}
_ = conn.Close()
wg.Wait()
}
...@@ -14,10 +14,10 @@ import ( ...@@ -14,10 +14,10 @@ import (
"runtime" "runtime"
"time" "time"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
...@@ -58,14 +58,14 @@ var ( ...@@ -58,14 +58,14 @@ var (
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
log = logger.NewLogger(PLUGIN_NAME) log = logger.NewLogger(PLUGIN_NAME)
if parameter.NodeConfig.GetBool(CFG_DISABLE_EVENTS) { if config.Node.GetBool(CFG_DISABLE_EVENTS) {
log.Fatalf("%s in config.json needs to be false so that events can be captured!", CFG_DISABLE_EVENTS) log.Fatalf("%s in config.json needs to be false so that events can be captured!", CFG_DISABLE_EVENTS)
return return
} }
c, err := net.Dial("udp", parameter.NodeConfig.GetString(CFG_SERVER_ADDRESS)) c, err := net.Dial("udp", config.Node.GetString(CFG_SERVER_ADDRESS))
if err != nil { if err != nil {
log.Fatalf("Could not create UDP socket to '%s'. %v", parameter.NodeConfig.GetString(CFG_SERVER_ADDRESS), err) log.Fatalf("Could not create UDP socket to '%s'. %v", config.Node.GetString(CFG_SERVER_ADDRESS), err)
return return
} }
conn = c conn = c
...@@ -101,7 +101,7 @@ func run(plugin *node.Plugin) { ...@@ -101,7 +101,7 @@ func run(plugin *node.Plugin) {
func sendLogMsg(level logger.Level, name string, msg string) { func sendLogMsg(level logger.Level, name string, msg string) {
m := logMessage{ m := logMessage{
cli.AppVersion, banner.AppVersion,
myGitHead, myGitHead,
myGitBranch, myGitBranch,
myID, myID,
......
...@@ -4,58 +4,46 @@ import ( ...@@ -4,58 +4,46 @@ import (
"net/http" "net/http"
"sync" "sync"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata" "github.com/mr-tron/base58"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/labstack/echo" "github.com/labstack/echo"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/iotaledger/iota.go/consts"
"github.com/iotaledger/iota.go/guards"
. "github.com/iotaledger/iota.go/trinary"
) )
type ExplorerTx struct { type ExplorerTx struct {
Hash Hash `json:"hash"` Hash string `json:"hash"`
SignatureMessageFragment Trytes `json:"signature_message_fragment"` SignatureMessageFragment string `json:"signature_message_fragment"`
Address Hash `json:"address"` Address string `json:"address"`
Value int64 `json:"value"` Value int64 `json:"value"`
Timestamp uint `json:"timestamp"` Timestamp uint `json:"timestamp"`
Trunk Hash `json:"trunk"` Trunk string `json:"trunk"`
Branch Hash `json:"branch"` Branch string `json:"branch"`
Solid bool `json:"solid"` Solid bool `json:"solid"`
MWM int `json:"mwm"` MWM int `json:"mwm"`
} }
func createExplorerTx(hash Hash, tx *value_transaction.ValueTransaction) (*ExplorerTx, error) { func createExplorerTx(tx *transaction.Transaction) (*ExplorerTx, error) {
transactionId := tx.GetId()
txMetadata, err := tangle.GetTransactionMetadata(hash, transactionmetadata.New) txMetadata := tangle.Instance.GetTransactionMetadata(transactionId)
if err != nil {
return nil, err
}
t := &ExplorerTx{ t := &ExplorerTx{
Hash: tx.GetHash(), Hash: transactionId.String(),
SignatureMessageFragment: tx.GetSignatureMessageFragment(), SignatureMessageFragment: "",
Address: tx.GetAddress(), Address: "",
Timestamp: tx.GetTimestamp(), Timestamp: 0,
Value: tx.GetValue(), Value: 0,
Trunk: tx.GetTrunkTransactionHash(), Trunk: tx.GetTrunkTransactionId().String(),
Branch: tx.GetBranchTransactionHash(), Branch: tx.GetBranchTransactionId().String(),
Solid: txMetadata.GetSolid(), Solid: txMetadata.Unwrap().IsSolid(),
}
// compute mwm
trits := MustTrytesToTrits(hash)
var mwm int
for i := len(trits) - 1; i >= 0; i-- {
if trits[i] == 0 {
mwm++
continue
} }
break
} // TODO: COMPUTE MWM
t.MWM = mwm t.MWM = 0
return t, nil return t, nil
} }
...@@ -69,13 +57,31 @@ type SearchResult struct { ...@@ -69,13 +57,31 @@ type SearchResult struct {
Milestone *ExplorerTx `json:"milestone"` Milestone *ExplorerTx `json:"milestone"`
} }
func transactionIdFromString(transactionId string) (transaction.Id, error) {
// TODO: CHECK LENGTH
transactionIdBytes, err := base58.Decode(transactionId)
if err != nil {
return transaction.EmptyId, err
}
// TODO: REMOVE CHECKSUM FROM STRING
return transaction.NewId(transactionIdBytes), nil
}
func setupExplorerRoutes(routeGroup *echo.Group) { func setupExplorerRoutes(routeGroup *echo.Group) {
routeGroup.GET("/tx/:hash", func(c echo.Context) (err error) {
transactionId, err := transactionIdFromString(c.Param("hash"))
if err != nil {
return
}
routeGroup.GET("/tx/:hash", func(c echo.Context) error { t, err := findTransaction(transactionId)
t, err := findTransaction(c.Param("hash"))
if err != nil { if err != nil {
return err return
} }
return c.JSON(http.StatusOK, t) return c.JSON(http.StatusOK, t)
}) })
...@@ -95,14 +101,17 @@ func setupExplorerRoutes(routeGroup *echo.Group) { ...@@ -95,14 +101,17 @@ func setupExplorerRoutes(routeGroup *echo.Group) {
return errors.Wrapf(ErrInvalidParameter, "search hash invalid: %s", search) return errors.Wrapf(ErrInvalidParameter, "search hash invalid: %s", search)
} }
// auto. remove checksum
search = search[:81]
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(2) wg.Add(2)
go func() { go func() {
defer wg.Done() defer wg.Done()
tx, err := findTransaction(search)
transactionId, err := transactionIdFromString(search)
if err != nil {
return
}
tx, err := findTransaction(transactionId)
if err == nil { if err == nil {
result.Tx = tx result.Tx = tx
} }
...@@ -121,24 +130,22 @@ func setupExplorerRoutes(routeGroup *echo.Group) { ...@@ -121,24 +130,22 @@ func setupExplorerRoutes(routeGroup *echo.Group) {
}) })
} }
func findTransaction(hash Hash) (*ExplorerTx, error) { func findTransaction(transactionId transaction.Id) (explorerTx *ExplorerTx, err error) {
if !guards.IsTrytesOfExactLength(hash, consts.HashTrytesSize) { if !tangle.Instance.GetTransaction(transactionId).Consume(func(transaction *transaction.Transaction) {
return nil, errors.Wrapf(ErrInvalidParameter, "hash invalid: %s", hash) explorerTx, err = createExplorerTx(transaction)
}) {
err = errors.Wrapf(ErrNotFound, "tx hash: %s", transactionId.String())
} }
tx, err := tangle.GetTransaction(hash) return
if err != nil {
return nil, err
}
if tx == nil {
return nil, errors.Wrapf(ErrNotFound, "tx hash: %s", hash)
} }
t, err := createExplorerTx(hash, tx) func findAddress(address string) (*ExplorerAdress, error) {
return t, err return nil, errors.Wrapf(ErrNotFound, "address %s not found", address)
}
// TODO: ADD ADDRESS LOOKUPS ONCE THE VALUE TRANSFER ONTOLOGY IS MERGED
func findAddress(hash Hash) (*ExplorerAdress, error) { /*
if len(hash) > 81 { if len(hash) > 81 {
hash = hash[:81] hash = hash[:81]
} }
...@@ -146,7 +153,7 @@ func findAddress(hash Hash) (*ExplorerAdress, error) { ...@@ -146,7 +153,7 @@ func findAddress(hash Hash) (*ExplorerAdress, error) {
return nil, errors.Wrapf(ErrInvalidParameter, "hash invalid: %s", hash) return nil, errors.Wrapf(ErrInvalidParameter, "hash invalid: %s", hash)
} }
txHashes, err := tangle.ReadTransactionHashesForAddressFromDatabase(hash) txHashes, err := tangle_old.ReadTransactionHashesForAddressFromDatabase(hash)
if err != nil { if err != nil {
return nil, ErrInternalError return nil, ErrInternalError
} }
...@@ -159,7 +166,7 @@ func findAddress(hash Hash) (*ExplorerAdress, error) { ...@@ -159,7 +166,7 @@ func findAddress(hash Hash) (*ExplorerAdress, error) {
for i := 0; i < len(txHashes); i++ { for i := 0; i < len(txHashes); i++ {
txHash := txHashes[i] txHash := txHashes[i]
tx, err := tangle.GetTransaction(hash) tx, err := tangle_old.GetTransaction(hash)
if err != nil { if err != nil {
continue continue
} }
...@@ -174,4 +181,5 @@ func findAddress(hash Hash) (*ExplorerAdress, error) { ...@@ -174,4 +181,5 @@ func findAddress(hash Hash) (*ExplorerAdress, error) {
} }
return &ExplorerAdress{Txs: txs}, nil return &ExplorerAdress{Txs: txs}, nil
*/
} }
...@@ -3,12 +3,14 @@ package spa ...@@ -3,12 +3,14 @@ package spa
import ( import (
"time" "time"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/shutdown"
tangle_plugin "github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool" "github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/tangle"
) )
var liveFeedWorkerCount = 1 var liveFeedWorkerCount = 1
...@@ -17,28 +19,33 @@ var liveFeedWorkerPool *workerpool.WorkerPool ...@@ -17,28 +19,33 @@ var liveFeedWorkerPool *workerpool.WorkerPool
func configureLiveFeed() { func configureLiveFeed() {
liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
t := task.Param(0).(*value_transaction.ValueTransaction) task.Param(0).(*transaction.CachedTransaction).Consume(func(transaction *transaction.Transaction) {
sendToAllWSClient(&msg{MsgTypeTx, &tx{t.GetHash(), t.GetValue()}}) sendToAllWSClient(&msg{MsgTypeTx, &tx{transaction.GetId().String(), 0}})
})
task.Return(nil) task.Return(nil)
}, workerpool.WorkerCount(liveFeedWorkerCount), workerpool.QueueSize(liveFeedWorkerQueueSize)) }, workerpool.WorkerCount(liveFeedWorkerCount), workerpool.QueueSize(liveFeedWorkerQueueSize))
} }
func runLiveFeed() { func runLiveFeed() {
newTxRateLimiter := time.NewTicker(time.Second / 10) newTxRateLimiter := time.NewTicker(time.Second / 10)
notifyNewTx := events.NewClosure(func(tx *value_transaction.ValueTransaction) { notifyNewTx := events.NewClosure(func(tx *transaction.CachedTransaction, metadata *transactionmetadata.CachedTransactionMetadata) {
metadata.Release()
select { select {
case <-newTxRateLimiter.C: case <-newTxRateLimiter.C:
liveFeedWorkerPool.TrySubmit(tx) liveFeedWorkerPool.TrySubmit(tx)
default: default:
tx.Release()
} }
}) })
daemon.BackgroundWorker("SPA[TxUpdater]", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("SPA[TxUpdater]", func(shutdownSignal <-chan struct{}) {
tangle_plugin.Events.TransactionStored.Attach(notifyNewTx) tangle.Instance.Events.TransactionAttached.Attach(notifyNewTx)
liveFeedWorkerPool.Start() liveFeedWorkerPool.Start()
<-shutdownSignal <-shutdownSignal
log.Info("Stopping SPA[TxUpdater] ...") log.Info("Stopping SPA[TxUpdater] ...")
tangle_plugin.Events.TransactionStored.Detach(notifyNewTx) tangle.Instance.Events.TransactionAttached.Detach(notifyNewTx)
newTxRateLimiter.Stop() newTxRateLimiter.Stop()
liveFeedWorkerPool.Stop() liveFeedWorkerPool.Stop()
log.Info("Stopping SPA[TxUpdater] ... done") log.Info("Stopping SPA[TxUpdater] ... done")
......
...@@ -7,16 +7,17 @@ import ( ...@@ -7,16 +7,17 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/iotaledger/goshimmer/packages/parameter" "github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
...@@ -32,7 +33,7 @@ var ( ...@@ -32,7 +33,7 @@ var (
nodeStartAt = time.Now() nodeStartAt = time.Now()
clientsMu sync.Mutex clientsMu sync.Mutex
clients = make(map[uint64]chan interface{}, 0) clients = make(map[uint64]chan interface{})
nextClientID uint64 = 0 nextClientID uint64 = 0
wsSendWorkerCount = 1 wsSendWorkerCount = 1
...@@ -54,7 +55,6 @@ func configure(plugin *node.Plugin) { ...@@ -54,7 +55,6 @@ func configure(plugin *node.Plugin) {
} }
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
notifyStatus := events.NewClosure(func(tps uint64) { notifyStatus := events.NewClosure(func(tps uint64) {
wsSendWorkerPool.TrySubmit(tps) wsSendWorkerPool.TrySubmit(tps)
}) })
...@@ -80,10 +80,10 @@ func run(plugin *node.Plugin) { ...@@ -80,10 +80,10 @@ func run(plugin *node.Plugin) {
e.HideBanner = true e.HideBanner = true
e.Use(middleware.Recover()) e.Use(middleware.Recover())
if parameter.NodeConfig.GetBool(CFG_BASIC_AUTH_ENABLED) { if config.Node.GetBool(CFG_BASIC_AUTH_ENABLED) {
e.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) { e.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if username == parameter.NodeConfig.GetString(CFG_BASIC_AUTH_USERNAME) && if username == config.Node.GetString(CFG_BASIC_AUTH_USERNAME) &&
password == parameter.NodeConfig.GetString(CFG_BASIC_AUTH_PASSWORD) { password == config.Node.GetString(CFG_BASIC_AUTH_PASSWORD) {
return true, nil return true, nil
} }
return false, nil return false, nil
...@@ -91,7 +91,7 @@ func run(plugin *node.Plugin) { ...@@ -91,7 +91,7 @@ func run(plugin *node.Plugin) {
} }
setupRoutes(e) setupRoutes(e)
addr := parameter.NodeConfig.GetString(CFG_BIND_ADDRESS) addr := config.Node.GetString(CFG_BIND_ADDRESS)
log.Infof("You can now access the dashboard using: http://%s", addr) log.Infof("You can now access the dashboard using: http://%s", addr)
go e.Start(addr) go e.Start(addr)
...@@ -201,7 +201,7 @@ func currentNodeStatus() *nodestatus { ...@@ -201,7 +201,7 @@ func currentNodeStatus() *nodestatus {
status.ID = local.GetInstance().ID().String() status.ID = local.GetInstance().ID().String()
// node status // node status
status.Version = cli.AppVersion status.Version = banner.AppVersion
status.Uptime = time.Since(nodeStartAt).Milliseconds() status.Uptime = time.Since(nodeStartAt).Milliseconds()
// memory metrics // memory metrics
......
...@@ -7,10 +7,9 @@ import ( ...@@ -7,10 +7,9 @@ import (
"time" "time"
"github.com/gobuffalo/packr/v2" "github.com/gobuffalo/packr/v2"
"github.com/iotaledger/goshimmer/packages/parameter" "github.com/iotaledger/goshimmer/plugins/config"
"github.com/labstack/echo" "github.com/labstack/echo"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
var ErrInvalidParameter = errors.New("invalid parameter") var ErrInvalidParameter = errors.New("invalid parameter")
...@@ -23,7 +22,7 @@ var appBox = packr.New("SPA_App", "./frontend/build") ...@@ -23,7 +22,7 @@ var appBox = packr.New("SPA_App", "./frontend/build")
var assetsBox = packr.New("SPA_Assets", "./frontend/src/assets") var assetsBox = packr.New("SPA_Assets", "./frontend/src/assets")
func indexRoute(e echo.Context) error { func indexRoute(e echo.Context) error {
if parameter.NodeConfig.GetBool(CFG_DEV) { if config.Node.GetBool(CFG_DEV) {
res, err := http.Get("http://127.0.0.1:9090/") res, err := http.Get("http://127.0.0.1:9090/")
if err != nil { if err != nil {
return err return err
...@@ -43,7 +42,7 @@ func indexRoute(e echo.Context) error { ...@@ -43,7 +42,7 @@ func indexRoute(e echo.Context) error {
func setupRoutes(e *echo.Echo) { func setupRoutes(e *echo.Echo) {
if parameter.NodeConfig.GetBool("dashboard.dev") { if config.Node.GetBool("dashboard.dev") {
e.Static("/assets", "./plugins/spa/frontend/src/assets") e.Static("/assets", "./plugins/spa/frontend/src/assets")
} else { } else {
......
package tangle
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/model/approvers"
"github.com/iotaledger/hive.go/lru_cache"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
// region global public api ////////////////////////////////////////////////////////////////////////////////////////////
// GetApprovers retrieves approvers from the database.
func GetApprovers(transactionHash trinary.Trytes, computeIfAbsent ...func(trinary.Trytes) *approvers.Approvers) (result *approvers.Approvers, err error) {
if cacheResult := approversCache.ComputeIfAbsent(transactionHash, func() interface{} {
if dbApprovers, dbErr := getApproversFromDatabase(transactionHash); dbErr != nil {
err = dbErr
return nil
} else if dbApprovers != nil {
return dbApprovers
} else {
if len(computeIfAbsent) >= 1 {
return computeIfAbsent[0](transactionHash)
}
return nil
}
}); cacheResult != nil && cacheResult.(*approvers.Approvers) != nil {
result = cacheResult.(*approvers.Approvers)
}
return
}
func ContainsApprovers(transactionHash trinary.Trytes) (result bool, err error) {
if approversCache.Contains(transactionHash) {
result = true
} else {
result, err = databaseContainsApprovers(transactionHash)
}
return
}
func StoreApprovers(approvers *approvers.Approvers) {
approversCache.Set(approvers.GetHash(), approvers)
}
// region lru cache ////////////////////////////////////////////////////////////////////////////////////////////////////
var approversCache = lru_cache.NewLRUCache(APPROVERS_CACHE_SIZE, &lru_cache.LRUCacheOptions{
EvictionCallback: onEvictApprovers,
EvictionBatchSize: 100,
})
func onEvictApprovers(_ interface{}, values interface{}) {
// TODO: replace with apply
for _, obj := range values.([]interface{}) {
if approvers := obj.(*approvers.Approvers); approvers.GetModified() {
if err := storeApproversInDatabase(approvers); err != nil {
panic(err)
}
}
}
}
func FlushApproversCache() {
approversCache.DeleteAll()
}
const (
APPROVERS_CACHE_SIZE = 50000
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region database /////////////////////////////////////////////////////////////////////////////////////////////////////
var approversDatabase database.Database
func configureApproversDatabase() {
if db, err := database.Get(database.DBPrefixApprovers, database.GetBadgerInstance()); err != nil {
panic(err)
} else {
approversDatabase = db
}
}
func storeApproversInDatabase(approvers *approvers.Approvers) error {
if approvers.GetModified() {
if err := approversDatabase.Set(database.Entry{Key: typeutils.StringToBytes(approvers.GetHash()), Value: approvers.Marshal()}); err != nil {
return fmt.Errorf("%w: failed to store approvers: %s", ErrDatabaseError, err)
}
approvers.SetModified(false)
}
return nil
}
func getApproversFromDatabase(transactionHash trinary.Trytes) (*approvers.Approvers, error) {
approversData, err := approversDatabase.Get(typeutils.StringToBytes(transactionHash))
if err != nil {
if err == database.ErrKeyNotFound {
return nil, nil
}
return nil, fmt.Errorf("%w: failed to retrieve approvers: %s", ErrDatabaseError, err)
}
var result approvers.Approvers
if err = result.Unmarshal(approversData.Value); err != nil {
panic(err)
}
return &result, nil
}
func databaseContainsApprovers(transactionHash trinary.Trytes) (bool, error) {
if contains, err := approversDatabase.Contains(typeutils.StringToBytes(transactionHash)); err != nil {
return false, fmt.Errorf("%w: failed to check if the approvers exist: %s", ErrDatabaseError, err)
} else {
return contains, nil
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
package tangle
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/hive.go/lru_cache"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
// region global public api ////////////////////////////////////////////////////////////////////////////////////////////
// GetBundle retrieves bundle from the database.
func GetBundle(headerTransactionHash trinary.Trytes, computeIfAbsent ...func(trinary.Trytes) (*bundle.Bundle, error)) (result *bundle.Bundle, err error) {
if cacheResult := bundleCache.ComputeIfAbsent(headerTransactionHash, func() interface{} {
if dbBundle, dbErr := getBundleFromDatabase(headerTransactionHash); dbErr != nil {
err = dbErr
return nil
} else if dbBundle != nil {
return dbBundle
} else {
if len(computeIfAbsent) >= 1 {
if computedBundle, computedErr := computeIfAbsent[0](headerTransactionHash); computedErr != nil {
err = computedErr
} else {
return computedBundle
}
}
return nil
}
}); cacheResult != nil && cacheResult.(*bundle.Bundle) != nil {
result = cacheResult.(*bundle.Bundle)
}
return
}
func ContainsBundle(headerTransactionHash trinary.Trytes) (result bool, err error) {
if bundleCache.Contains(headerTransactionHash) {
result = true
} else {
result, err = databaseContainsBundle(headerTransactionHash)
}
return
}
func StoreBundle(bundle *bundle.Bundle) {
bundleCache.Set(bundle.GetHash(), bundle)
}
// region lru cache ////////////////////////////////////////////////////////////////////////////////////////////////////
var bundleCache = lru_cache.NewLRUCache(BUNDLE_CACHE_SIZE, &lru_cache.LRUCacheOptions{
EvictionCallback: onEvictBundles,
EvictionBatchSize: 100,
})
func onEvictBundles(_ interface{}, values interface{}) {
// TODO: replace with apply
for _, obj := range values.([]interface{}) {
if bndl := obj.(*bundle.Bundle); bndl.GetModified() {
if err := storeBundleInDatabase(bndl); err != nil {
panic(err)
}
}
}
}
func FlushBundleCache() {
bundleCache.DeleteAll()
}
const (
BUNDLE_CACHE_SIZE = 500
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region database /////////////////////////////////////////////////////////////////////////////////////////////////////
var bundleDatabase database.Database
func configureBundleDatabase() {
if db, err := database.Get(database.DBPrefixBundle, database.GetBadgerInstance()); err != nil {
panic(err)
} else {
bundleDatabase = db
}
}
func storeBundleInDatabase(bundle *bundle.Bundle) error {
if bundle.GetModified() {
if err := bundleDatabase.Set(database.Entry{Key: typeutils.StringToBytes(bundle.GetHash()), Value: bundle.Marshal()}); err != nil {
return fmt.Errorf("%w: failed to store bundle: %s", ErrDatabaseError, err)
}
bundle.SetModified(false)
}
return nil
}
func getBundleFromDatabase(transactionHash trinary.Trytes) (*bundle.Bundle, error) {
bundleData, err := bundleDatabase.Get(typeutils.StringToBytes(transactionHash))
if err != nil {
if err == database.ErrKeyNotFound {
return nil, nil
}
return nil, fmt.Errorf("%w: failed to retrieve bundle: %s", ErrDatabaseError, err)
}
var result bundle.Bundle
if err = result.Unmarshal(bundleData.Value); err != nil {
panic(err)
}
return &result, nil
}
func databaseContainsBundle(transactionHash trinary.Trytes) (bool, error) {
if contains, err := bundleDatabase.Contains(typeutils.StringToBytes(transactionHash)); err != nil {
return false, fmt.Errorf("%w: failed to check if the bundle exists: %s", ErrDatabaseError, err)
} else {
return contains, nil
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
package tangle
import "errors"
var (
ErrDatabaseError = errors.New("database error")
)
package tangle
import (
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/hive.go/events"
)
var Events = struct {
TransactionStored *events.Event
TransactionSolid *events.Event
}{
TransactionStored: events.NewEvent(transactionCaller),
TransactionSolid: events.NewEvent(transactionCaller),
}
func transactionCaller(handler interface{}, params ...interface{}) {
handler.(func(*value_transaction.ValueTransaction))(params[0].(*value_transaction.ValueTransaction))
}
package tangle
import (
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
func databaseKeyForHashPrefixedHash(address trinary.Hash, transactionHash trinary.Hash) []byte {
//return append(databaseKeyForHashPrefix(address), trinary.MustTrytesToBytes(transactionHash)...)
return append(databaseKeyForHashPrefix(address), typeutils.StringToBytes(transactionHash)...)
}
func databaseKeyForHashPrefix(hash trinary.Hash) []byte {
//return trinary.MustTrytesToBytes(hash)
return typeutils.StringToBytes(hash)
}
package tangle package tangle
import ( import (
"time" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
"github.com/iotaledger/goshimmer/packages/binary/tangle"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/binary/tangle/tipselector"
"github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser"
"github.com/iotaledger/goshimmer/packages/binary/tangle/transactionrequester"
"github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil"
"github.com/iotaledger/iota.go/trinary"
) )
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
var PLUGIN = node.NewPlugin("Tangle", node.Enabled, configure, run) var PLUGIN = node.NewPlugin("Tangle", node.Enabled, configure, run)
var TransactionParser *transactionparser.TransactionParser
var TransactionRequester *transactionrequester.TransactionRequester
var TipSelector *tipselector.TipSelector
var Instance *tangle.Tangle
var log *logger.Logger var log *logger.Logger
func configure(*node.Plugin) { func configure(*node.Plugin) {
log = logger.NewLogger("Tangle") log = logger.NewLogger("Tangle")
configureTransactionDatabase() // create instances
configureTransactionMetaDataDatabase() TransactionParser = transactionparser.New()
configureApproversDatabase() TransactionRequester = transactionrequester.New()
configureBundleDatabase() TipSelector = tipselector.New()
configureTransactionHashesForAddressDatabase() Instance = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet)
configureSolidifier()
daemon.BackgroundWorker("Cache Flush", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
log.Info("Flushing caches to database...")
FlushTransactionCache()
FlushTransactionMetadata()
FlushApproversCache()
FlushBundleCache()
log.Info("Flushing caches to database... done")
log.Info("Syncing database to disk...") // setup TransactionParser
database.GetBadgerInstance().Close() TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction, peer *peer.Peer) {
log.Info("Syncing database to disk... done") // TODO: ADD PEER
}, shutdown.ShutdownPriorityTangle)
} Instance.AttachTransaction(transaction)
}))
func run(*node.Plugin) { // setup TransactionRequester
Instance.Events.TransactionMissing.Attach(events.NewClosure(TransactionRequester.ScheduleRequest))
Instance.Events.MissingTransactionReceived.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) {
cachedTransactionMetadata.Release()
daemon.BackgroundWorker("Badger garbage collection", func(shutdownSignal <-chan struct{}) { cachedTransaction.Consume(func(transaction *transaction.Transaction) {
timeutil.Ticker(func() { TransactionRequester.StopRequest(transaction.GetId())
database.CleanupBadgerInstance(log) })
}, 5*time.Minute, shutdownSignal) }))
}, shutdown.ShutdownPriorityBadgerGarbageCollection)
runSolidifier() // setup TipSelector
} Instance.Events.TransactionSolid.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) {
cachedTransactionMetadata.Release()
// Requester provides the functionality to request a transaction from the network. cachedTransaction.Consume(TipSelector.AddTip)
type Requester interface { }))
RequestTransaction(hash trinary.Hash)
} }
type RequesterFunc func(hash trinary.Hash) func run(*node.Plugin) {
_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
func (f RequesterFunc) RequestTransaction(hash trinary.Hash) { f(hash) } <-shutdownSignal
// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// TransactionParser.Shutdown()
Instance.Shutdown()
}, shutdown.ShutdownPriorityTangle)
}
package tangle
import (
"runtime"
"time"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/model/approvers"
"github.com/iotaledger/goshimmer/packages/model/meta_transaction"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/iota.go/trinary"
)
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
const UnsolidInterval = time.Minute
var (
workerCount = runtime.NumCPU()
workerPool *workerpool.WorkerPool
requestedTxs *UnsolidTxs
requester Requester
)
func SetRequester(req Requester) {
requester = req
}
func configureSolidifier() {
workerPool = workerpool.New(func(task workerpool.Task) {
processMetaTransaction(task.Param(0).(*meta_transaction.MetaTransaction))
task.Return(nil)
}, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000))
requestedTxs = NewUnsolidTxs()
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) {
metaTx, err := meta_transaction.FromBytes(ev.Data)
if err != nil {
log.Warnf("invalid transaction: %s", err)
return
}
if err = metaTx.Validate(); err != nil {
log.Warnf("invalid transaction: %s", err)
return
}
workerPool.Submit(metaTx)
}))
}
func runSolidifier() {
daemon.BackgroundWorker("Tangle Solidifier", func(shutdownSignal <-chan struct{}) {
log.Info("Starting Solidifier ...")
workerPool.Start()
log.Info("Starting Solidifier ... done")
<-shutdownSignal
log.Info("Stopping Solidifier ...")
workerPool.StopAndWait()
log.Info("Stopping Solidifier ... done")
}, shutdown.ShutdownPrioritySolidifier)
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) {
metaTransactionHash := metaTransaction.GetHash()
var newTransaction bool
tx, err := GetTransaction(metaTransactionHash, func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction {
newTransaction = true
tx := value_transaction.FromMetaTransaction(metaTransaction)
tx.SetModified(true)
return tx
})
if err != nil {
log.Errorf("Unable to process transaction %s: %s", metaTransactionHash, err.Error())
return
}
if newTransaction {
log.Debugw("process new transaction", "hash", tx.GetHash())
processNewTransaction(tx)
requestedTxs.Remove(tx.GetHash())
updateRequestedTxs()
}
}
func processNewTransaction(transaction *value_transaction.ValueTransaction) {
Events.TransactionStored.Trigger(transaction)
// store transaction hash for address in DB
if err := StoreTransactionHashForAddressInDatabase(
&TxHashForAddress{
Address: transaction.GetAddress(),
TxHash: transaction.GetHash(),
},
); err != nil {
log.Errorw(err.Error())
}
transactionHash := transaction.GetHash()
// register tx as approver for trunk
if trunkApprovers, err := GetApprovers(transaction.GetTrunkTransactionHash(), approvers.New); err != nil {
log.Errorf("Unable to get approvers of transaction %s: %s", transaction.GetTrunkTransactionHash(), err.Error())
return
} else {
trunkApprovers.Add(transactionHash)
}
// register tx as approver for branch
if branchApprovers, err := GetApprovers(transaction.GetBranchTransactionHash(), approvers.New); err != nil {
log.Errorf("Unable to get approvers of transaction %s: %s", transaction.GetBranchTransactionHash(), err.Error())
return
} else {
branchApprovers.Add(transactionHash)
}
isSolid, err := isSolid(transactionHash)
if err != nil {
log.Errorf("Unable to check solidity: %s", err.Error())
}
// if the transaction was solidified propagate this information
if isSolid {
if err := propagateSolidity(transaction.GetHash()); err != nil {
log.Errorf("Unable to propagate solidity: %s", err.Error())
}
}
}
// isSolid checks whether the transaction with the given hash is solid. A transaction is solid, if it is
// either marked as solid or all its referenced transactions are in the database.
func isSolid(hash trinary.Hash) (bool, error) {
// the genesis is always solid
if hash == meta_transaction.BRANCH_NULL_HASH {
return true, nil
}
// if the transaction is not in the DB, request it
transaction, err := GetTransaction(hash)
if err != nil {
return false, err
}
if transaction == nil {
if requestedTxs.Add(hash) {
requestTransaction(hash)
}
return false, nil
}
// check whether the transaction is marked solid
metadata, err := GetTransactionMetadata(hash, transactionmetadata.New)
if err != nil {
return false, err
}
if metadata.GetSolid() {
return true, nil
}
branch := contains(transaction.GetBranchTransactionHash())
trunk := contains(transaction.GetTrunkTransactionHash())
if !branch || !trunk {
return false, nil
}
// everything is good, mark the transaction as solid
return true, markSolid(transaction)
}
func contains(hash trinary.Hash) bool {
if hash == meta_transaction.BRANCH_NULL_HASH {
return true
}
if contains, _ := ContainsTransaction(hash); !contains {
if requestedTxs.Add(hash) {
requestTransaction(hash)
}
return false
}
return true
}
func isMarkedSolid(hash trinary.Hash) (bool, error) {
if hash == meta_transaction.BRANCH_NULL_HASH {
return true, nil
}
metadata, err := GetTransactionMetadata(hash, transactionmetadata.New)
if err != nil {
return false, err
}
return metadata.GetSolid(), nil
}
func markSolid(transaction *value_transaction.ValueTransaction) error {
txMetadata, err := GetTransactionMetadata(transaction.GetHash(), transactionmetadata.New)
if err != nil {
return err
}
if txMetadata.SetSolid(true) {
log.Debugw("transaction solidified", "hash", transaction.GetHash())
Events.TransactionSolid.Trigger(transaction)
return propagateSolidity(transaction.GetHash())
}
return nil
}
func propagateSolidity(transactionHash trinary.Trytes) error {
approvingTransactions, err := GetApprovers(transactionHash, approvers.New)
if err != nil {
return err
}
for _, hash := range approvingTransactions.GetHashes() {
approver, err := GetTransaction(hash)
if err != nil {
return err
}
if approver != nil {
branchSolid, err := isMarkedSolid(approver.GetBranchTransactionHash())
if err != nil {
return err
}
if !branchSolid {
continue
}
trunkSolid, err := isMarkedSolid(approver.GetTrunkTransactionHash())
if err != nil {
return err
}
if !trunkSolid {
continue
}
if err := markSolid(approver); err != nil {
return err
}
}
}
return nil
}
func updateRequestedTxs() {
targetTime := time.Now().Add(-UnsolidInterval)
txs := requestedTxs.Update(targetTime)
for _, txHash := range txs {
if contains, _ := ContainsTransaction(txHash); contains {
requestedTxs.Remove(txHash)
continue
}
requestTransaction(txHash)
}
}
func requestTransaction(hash trinary.Trytes) {
if requester == nil {
return
}
log.Debugw("Requesting tx", "hash", hash)
requester.RequestTransaction(hash)
}
package tangle
import (
"io/ioutil"
stdlog "log"
"os"
"sync/atomic"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/model/meta_transaction"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/iota.go/trinary"
"github.com/stretchr/testify/require"
)
// use much lower min weight magnitude for the tests
const testMWM = 8
func init() {
if err := parameter.LoadDefaultConfig(false); err != nil {
stdlog.Fatalf("Failed to initialize config: %s", err)
}
if err := logger.InitGlobalLogger(parameter.NodeConfig); err != nil {
stdlog.Fatalf("Failed to initialize config: %s", err)
}
}
func TestTangle(t *testing.T) {
dir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
defer os.Remove(dir)
// use the tempdir for the database
parameter.NodeConfig.Set(database.CFG_DIRECTORY, dir)
// start a test node
node.Start(node.Plugins(PLUGIN))
defer node.Shutdown()
t.Run("ReadTransactionHashesForAddressFromDatabase", func(t *testing.T) {
tx1 := value_transaction.New()
tx1.SetAddress("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
tx1.SetTimestamp(uint(time.Now().UnixNano()))
require.NoError(t, tx1.DoProofOfWork(testMWM))
tx2 := value_transaction.New()
tx2.SetTimestamp(uint(time.Now().UnixNano()))
require.NoError(t, tx2.DoProofOfWork(testMWM))
transactionReceived(&gossip.TransactionReceivedEvent{Data: tx1.GetBytes()})
txAddr, err := ReadTransactionHashesForAddressFromDatabase(tx1.GetAddress())
require.NoError(t, err)
require.ElementsMatch(t, []trinary.Hash{tx1.GetHash()}, txAddr)
})
t.Run("ProofOfWork", func(t *testing.T) {
tx1 := value_transaction.New()
tx1.SetTimestamp(uint(time.Now().UnixNano()))
require.NoError(t, tx1.DoProofOfWork(1))
tx2 := value_transaction.New()
tx2.SetTimestamp(uint(time.Now().UnixNano()))
require.NoError(t, tx2.DoProofOfWork(testMWM))
var counter int32
closure := events.NewClosure(func(transaction *value_transaction.ValueTransaction) {
atomic.AddInt32(&counter, 1)
})
Events.TransactionSolid.Attach(closure)
defer Events.TransactionSolid.Detach(closure)
transactionReceived(&gossip.TransactionReceivedEvent{Data: tx1.GetBytes()})
transactionReceived(&gossip.TransactionReceivedEvent{Data: tx2.GetBytes()})
time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 1, counter)
})
t.Run("Solidifier", func(t *testing.T) {
transaction1 := value_transaction.New()
transaction1.SetTimestamp(uint(time.Now().UnixNano()))
require.NoError(t, transaction1.DoProofOfWork(testMWM))
transaction2 := value_transaction.New()
transaction2.SetTimestamp(uint(time.Now().UnixNano()))
transaction2.SetBranchTransactionHash(transaction1.GetHash())
require.NoError(t, transaction2.DoProofOfWork(testMWM))
transaction3 := value_transaction.New()
transaction3.SetTimestamp(uint(time.Now().UnixNano()))
transaction3.SetBranchTransactionHash(transaction2.GetHash())
require.NoError(t, transaction3.DoProofOfWork(testMWM))
transaction4 := value_transaction.New()
transaction4.SetTimestamp(uint(time.Now().UnixNano()))
transaction4.SetBranchTransactionHash(transaction3.GetHash())
require.NoError(t, transaction4.DoProofOfWork(testMWM))
var counter int32
closure := events.NewClosure(func(tx *value_transaction.ValueTransaction) {
atomic.AddInt32(&counter, 1)
log.Infof("Transaction solid: hash=%s", tx.GetHash())
})
Events.TransactionSolid.Attach(closure)
defer Events.TransactionSolid.Detach(closure)
// only transaction3 should be requested
SetRequester(RequesterFunc(func(hash trinary.Hash) {
if transaction3.GetHash() == hash {
// return the transaction data
transactionReceived(&gossip.TransactionReceivedEvent{Data: transaction3.GetBytes()})
}
}))
transactionReceived(&gossip.TransactionReceivedEvent{Data: transaction1.GetBytes()})
transactionReceived(&gossip.TransactionReceivedEvent{Data: transaction2.GetBytes()})
// transactionReceived(&gossip.TransactionReceivedEvent{Data: transaction3.GetBytes()})
transactionReceived(&gossip.TransactionReceivedEvent{Data: transaction4.GetBytes()})
time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 4, counter)
})
}
// transactionReceived mocks the TransactionReceived event by allowing lower mwm
func transactionReceived(ev *gossip.TransactionReceivedEvent) {
metaTx, err := meta_transaction.FromBytes(ev.Data)
if err != nil {
log.Warnf("invalid transaction: %s", err)
return
}
if metaTx.GetWeightMagnitude() < testMWM {
log.Warnf("invalid weight magnitude: %d / %d", metaTx.GetWeightMagnitude(), testMWM)
return
}
processMetaTransaction(metaTx)
}
package tangle
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/hive.go/lru_cache"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
// region public api ///////////////////////////////////////////////////////////////////////////////////////////////////
func GetTransaction(transactionHash trinary.Trytes, computeIfAbsent ...func(trinary.Trytes) *value_transaction.ValueTransaction) (result *value_transaction.ValueTransaction, err error) {
if cacheResult := transactionCache.ComputeIfAbsent(transactionHash, func() interface{} {
if transaction, dbErr := getTransactionFromDatabase(transactionHash); dbErr != nil {
err = dbErr
return nil
} else if transaction != nil {
return transaction
} else {
if len(computeIfAbsent) >= 1 {
return computeIfAbsent[0](transactionHash)
}
return nil
}
}); !typeutils.IsInterfaceNil(cacheResult) {
result = cacheResult.(*value_transaction.ValueTransaction)
}
return
}
func ContainsTransaction(transactionHash trinary.Trytes) (result bool, err error) {
if transactionCache.Contains(transactionHash) {
result = true
} else {
result, err = databaseContainsTransaction(transactionHash)
}
return
}
func StoreTransaction(transaction *value_transaction.ValueTransaction) {
transactionCache.Set(transaction.GetHash(), transaction)
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region lru cache ////////////////////////////////////////////////////////////////////////////////////////////////////
var transactionCache = lru_cache.NewLRUCache(TRANSACTION_CACHE_SIZE, &lru_cache.LRUCacheOptions{
EvictionCallback: onEvictTransactions,
EvictionBatchSize: 200,
})
func onEvictTransactions(_ interface{}, values interface{}) {
// TODO: replace with apply
for _, obj := range values.([]interface{}) {
if tx := obj.(*value_transaction.ValueTransaction); tx.GetModified() {
if err := storeTransactionInDatabase(tx); err != nil {
panic(err)
}
}
}
}
func FlushTransactionCache() {
transactionCache.DeleteAll()
}
const (
TRANSACTION_CACHE_SIZE = 500
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region database /////////////////////////////////////////////////////////////////////////////////////////////////////
var transactionDatabase database.Database
func configureTransactionDatabase() {
if db, err := database.Get(database.DBPrefixTransaction, database.GetBadgerInstance()); err != nil {
panic(err)
} else {
transactionDatabase = db
}
}
func storeTransactionInDatabase(transaction *value_transaction.ValueTransaction) error {
if transaction.GetModified() {
if err := transactionDatabase.Set(database.Entry{Key: typeutils.StringToBytes(transaction.GetHash()), Value: transaction.MetaTransaction.GetBytes()}); err != nil {
return fmt.Errorf("%w: failed to store transaction: %s", ErrDatabaseError, err.Error())
}
transaction.SetModified(false)
}
return nil
}
func getTransactionFromDatabase(transactionHash trinary.Trytes) (*value_transaction.ValueTransaction, error) {
txData, err := transactionDatabase.Get(typeutils.StringToBytes(transactionHash))
if err != nil {
if err == database.ErrKeyNotFound {
return nil, nil
}
return nil, fmt.Errorf("%w: failed to retrieve transaction: %s", ErrDatabaseError, err)
}
return value_transaction.FromBytes(txData.Value), nil
}
func databaseContainsTransaction(transactionHash trinary.Trytes) (bool, error) {
if contains, err := transactionDatabase.Contains(typeutils.StringToBytes(transactionHash)); err != nil {
return contains, fmt.Errorf("%w: failed to check if the transaction exists: %s", ErrDatabaseError, err)
} else {
return contains, nil
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
package tangle
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/hive.go/lru_cache"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
// region public api ///////////////////////////////////////////////////////////////////////////////////////////////////
func GetTransactionMetadata(transactionHash trinary.Trytes, computeIfAbsent ...func(trinary.Trytes) *transactionmetadata.TransactionMetadata) (result *transactionmetadata.TransactionMetadata, err error) {
if cacheResult := transactionMetadataCache.ComputeIfAbsent(transactionHash, func() interface{} {
if transactionMetadata, dbErr := getTransactionMetadataFromDatabase(transactionHash); dbErr != nil {
err = dbErr
return nil
} else if transactionMetadata != nil {
return transactionMetadata
} else {
if len(computeIfAbsent) >= 1 {
return computeIfAbsent[0](transactionHash)
}
return nil
}
}); !typeutils.IsInterfaceNil(cacheResult) {
result = cacheResult.(*transactionmetadata.TransactionMetadata)
}
return
}
func ContainsTransactionMetadata(transactionHash trinary.Trytes) (result bool, err error) {
if transactionMetadataCache.Contains(transactionHash) {
result = true
} else {
result, err = databaseContainsTransactionMetadata(transactionHash)
}
return
}
func StoreTransactionMetadata(transactionMetadata *transactionmetadata.TransactionMetadata) {
transactionMetadataCache.Set(transactionMetadata.GetHash(), transactionMetadata)
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region lru cache ////////////////////////////////////////////////////////////////////////////////////////////////////
var transactionMetadataCache = lru_cache.NewLRUCache(TRANSACTION_METADATA_CACHE_SIZE, &lru_cache.LRUCacheOptions{
EvictionCallback: onEvictTransactionMetadatas,
EvictionBatchSize: 200,
})
func onEvictTransactionMetadatas(_ interface{}, values interface{}) {
// TODO: replace with apply
for _, obj := range values.([]interface{}) {
if txMetadata := obj.(*transactionmetadata.TransactionMetadata); txMetadata.GetModified() {
if err := storeTransactionMetadataInDatabase(txMetadata); err != nil {
panic(err)
}
}
}
}
func FlushTransactionMetadata() {
transactionCache.DeleteAll()
}
const (
TRANSACTION_METADATA_CACHE_SIZE = 500
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region database /////////////////////////////////////////////////////////////////////////////////////////////////////
var transactionMetadataDatabase database.Database
func configureTransactionMetaDataDatabase() {
if db, err := database.Get(database.DBPrefixTransactionMetadata, database.GetBadgerInstance()); err != nil {
panic(err)
} else {
transactionMetadataDatabase = db
}
}
func storeTransactionMetadataInDatabase(metadata *transactionmetadata.TransactionMetadata) error {
if metadata.GetModified() {
if marshaledMetadata, err := metadata.Marshal(); err != nil {
return err
} else {
if err := transactionMetadataDatabase.Set(database.Entry{Key: typeutils.StringToBytes(metadata.GetHash()), Value: marshaledMetadata}); err != nil {
return fmt.Errorf("%w: failed to store transaction metadata: %s", ErrDatabaseError, err)
}
metadata.SetModified(false)
}
}
return nil
}
func getTransactionMetadataFromDatabase(transactionHash trinary.Trytes) (*transactionmetadata.TransactionMetadata, error) {
txMetadata, err := transactionMetadataDatabase.Get(typeutils.StringToBytes(transactionHash))
if err != nil {
if err == database.ErrKeyNotFound {
return nil, nil
}
return nil, fmt.Errorf("%w: failed to retrieve transaction: %s", ErrDatabaseError, err)
}
var result transactionmetadata.TransactionMetadata
if err := result.Unmarshal(txMetadata.Value); err != nil {
panic(err)
}
return &result, nil
}
func databaseContainsTransactionMetadata(transactionHash trinary.Trytes) (bool, error) {
if contains, err := transactionMetadataDatabase.Contains(typeutils.StringToBytes(transactionHash)); err != nil {
return contains, fmt.Errorf("%w: failed to check if the transaction metadata exists: %s", ErrDatabaseError, err)
} else {
return contains, nil
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment