Skip to content
Snippets Groups Projects
Commit d3274a21 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: daemon.BackgroundWorker has to be named

parent 7e0c61dd
No related branches found
No related tags found
No related merge requests found
Showing
with 127 additions and 44 deletions
...@@ -5,29 +5,53 @@ import ( ...@@ -5,29 +5,53 @@ import (
) )
var ( var (
running bool running bool
wg sync.WaitGroup wg sync.WaitGroup
ShutdownSignal = make(chan int, 1) ShutdownSignal = make(chan int, 1)
backgroundWorkers = make([]func(), 0) backgroundWorkers = make([]func(), 0)
lock = sync.Mutex{} backgroundWorkerNames = make([]string, 0)
runningBackgroundWorkers = make(map[string]bool)
lock = sync.Mutex{}
) )
func runBackgroundWorker(backgroundWorker func()) { func GetRunningBackgroundWorkers() []string {
lock.Lock()
result := make([]string, 0)
for runningBackgroundWorker := range runningBackgroundWorkers {
result = append(result, runningBackgroundWorker)
}
lock.Unlock()
return result
}
func runBackgroundWorker(name string, backgroundWorker func()) {
wg.Add(1) wg.Add(1)
go func() { go func() {
lock.Lock()
runningBackgroundWorkers[name] = true
lock.Unlock()
backgroundWorker() backgroundWorker()
lock.Lock()
delete(runningBackgroundWorkers, name)
lock.Unlock()
wg.Done() wg.Done()
}() }()
} }
func BackgroundWorker(handler func()) { func BackgroundWorker(name string, handler func()) {
lock.Lock() lock.Lock()
if IsRunning() { if IsRunning() {
runBackgroundWorker(handler) runBackgroundWorker(name, handler)
} else { } else {
backgroundWorkerNames = append(backgroundWorkerNames, name)
backgroundWorkers = append(backgroundWorkers, handler) backgroundWorkers = append(backgroundWorkers, handler)
} }
...@@ -45,8 +69,8 @@ func Run() { ...@@ -45,8 +69,8 @@ func Run() {
Events.Run.Trigger() Events.Run.Trigger()
for _, backgroundWorker := range backgroundWorkers { for i, backgroundWorker := range backgroundWorkers {
runBackgroundWorker(backgroundWorker) runBackgroundWorker(backgroundWorkerNames[i], backgroundWorker)
} }
} }
......
...@@ -55,3 +55,16 @@ func (plugin *Plugin) LogFailure(message string) { ...@@ -55,3 +55,16 @@ func (plugin *Plugin) LogFailure(message string) {
func (plugin *Plugin) LogDebug(message string) { func (plugin *Plugin) LogDebug(message string) {
plugin.Node.LogDebug(plugin.Name, message) plugin.Node.LogDebug(plugin.Name, message)
} }
var TestNode = &Node{
loggers: make([]*Logger, 0),
wg: &sync.WaitGroup{},
loadedPlugins: make([]*Plugin, 0),
}
func (plugin *Plugin) InitTest() {
plugin.Node = TestNode
plugin.Events.Configure.Trigger(plugin)
plugin.Events.Run.Trigger(plugin)
}
...@@ -23,7 +23,7 @@ func Start(tps int64) { ...@@ -23,7 +23,7 @@ func Start(tps int64) {
shutdownSignal = make(chan int, 1) shutdownSignal = make(chan int, 1)
func(shutdownSignal chan int) { func(shutdownSignal chan int) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Transaction Spammer", func() {
for { for {
start := time.Now() start := time.Now()
sentCounter := int64(0) sentCounter := int64(0)
......
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
) )
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Analysis Client", func() {
shuttingDown := false shuttingDown := false
for !shuttingDown { for !shuttingDown {
......
...@@ -36,7 +36,7 @@ func Configure(plugin *node.Plugin) { ...@@ -36,7 +36,7 @@ func Configure(plugin *node.Plugin) {
} }
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Analysis Server", func() {
plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...") plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...")
server.Listen(*SERVER_PORT.Value) server.Listen(*SERVER_PORT.Value)
......
...@@ -32,7 +32,7 @@ func Configure(plugin *node.Plugin) { ...@@ -32,7 +32,7 @@ func Configure(plugin *node.Plugin) {
} }
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Analysis HTTP Server", func() {
httpServer.ListenAndServe() httpServer.ListenAndServe()
}) })
} }
...@@ -35,7 +35,7 @@ func Configure(plugin *node.Plugin) { ...@@ -35,7 +35,7 @@ func Configure(plugin *node.Plugin) {
} }
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Neighborhood Updater", func() {
timeutil.Ticker(updateNeighborHood, 1*time.Second) timeutil.Ticker(updateNeighborHood, 1*time.Second)
}) })
} }
......
...@@ -41,7 +41,11 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() { ...@@ -41,7 +41,11 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() {
func sendOutgoingRequests(plugin *node.Plugin) { func sendOutgoingRequests(plugin *node.Plugin) {
for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() { for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() {
time.Sleep(5 * time.Second) select {
case <-daemon.ShutdownSignal:
return
case <-time.After(5 * time.Second):
}
if candidateShouldBeContacted(chosenNeighborCandidate) { if candidateShouldBeContacted(chosenNeighborCandidate) {
if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil { if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil {
......
...@@ -20,8 +20,8 @@ func Configure(plugin *node.Plugin) { ...@@ -20,8 +20,8 @@ func Configure(plugin *node.Plugin) {
} }
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(createChosenNeighborDropper(plugin)) daemon.BackgroundWorker("Autopeering Chosen Neighbor Dropper", createChosenNeighborDropper(plugin))
daemon.BackgroundWorker(createAcceptedNeighborDropper(plugin)) daemon.BackgroundWorker("Autopeering Accepted Neighbor Dropper", createAcceptedNeighborDropper(plugin))
daemon.BackgroundWorker(createOutgoingRequestProcessor(plugin)) daemon.BackgroundWorker("Autopeering Outgoing Request Processor", createOutgoingRequestProcessor(plugin))
daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) daemon.BackgroundWorker("Autopeering Outgoing Ping Processor", createOutgoingPingProcessor(plugin))
} }
...@@ -68,7 +68,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan ...@@ -68,7 +68,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan
if saltToUpdate.ExpirationTime.Before(now) { if saltToUpdate.ExpirationTime.Before(now) {
updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback)
} else { } else {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Salt Updater", func() {
select { select {
case <-time.After(saltToUpdate.ExpirationTime.Sub(now)): case <-time.After(saltToUpdate.ExpirationTime.Sub(now)):
updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback)
......
...@@ -37,7 +37,7 @@ func ConfigureServer(plugin *node.Plugin) { ...@@ -37,7 +37,7 @@ func ConfigureServer(plugin *node.Plugin) {
} }
func RunServer(plugin *node.Plugin) { func RunServer(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Autopeering TCP Server", func() {
if *parameters.ADDRESS.Value == "0.0.0.0" { if *parameters.ADDRESS.Value == "0.0.0.0" {
plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...")
} else { } else {
......
...@@ -41,7 +41,7 @@ func ConfigureServer(plugin *node.Plugin) { ...@@ -41,7 +41,7 @@ func ConfigureServer(plugin *node.Plugin) {
} }
func RunServer(plugin *node.Plugin) { func RunServer(plugin *node.Plugin) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Autopeering UDP Server", func() {
if *parameters.ADDRESS.Value == "0.0.0.0" { if *parameters.ADDRESS.Value == "0.0.0.0" {
plugin.LogInfo("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") plugin.LogInfo("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...")
} else { } else {
......
...@@ -28,7 +28,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) ...@@ -28,7 +28,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction)
if currentTransaction.IsHead() && currentTransaction != headTransaction { if currentTransaction.IsHead() && currentTransaction != headTransaction {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
Events.InvalidBundleReceived.Trigger(newBundle, bundleTransactions) Events.InvalidBundle.Trigger(newBundle, bundleTransactions)
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail") return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail")
} }
...@@ -52,11 +52,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) ...@@ -52,11 +52,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction)
if currentTransaction.IsTail() { if currentTransaction.IsTail() {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
if newBundle.IsValueBundle() { Events.BundleSolid.Trigger(newBundle, bundleTransactions)
Events.ValueBundleReceived.Trigger(newBundle, bundleTransactions)
} else {
Events.DataBundleReceived.Trigger(newBundle, bundleTransactions)
}
return newBundle, nil return newBundle, nil
} }
......
package bundleprocessor
import (
"fmt"
"testing"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/magiconair/properties/assert"
)
func TestProcessSolidBundleHead(t *testing.T) {
tangle.PLUGIN.InitTest()
tx := value_transaction.New()
tx.SetTail(true)
tx.SetValue(3)
tx1 := value_transaction.New()
tx1.SetTrunkTransactionHash(tx.GetHash())
tx1.SetHead(true)
tangle.StoreTransaction(tx)
tangle.StoreTransaction(tx1)
Events.BundleSolid.Attach(events.NewClosure(func(bundle *bundle.Bundle, transactions []*value_transaction.ValueTransaction) {
fmt.Println("IT HAPPENED")
fmt.Println(bundle.GetHash())
}))
result, err := ProcessSolidBundleHead(tx1)
if err != nil {
t.Error(err)
} else {
assert.Equal(t, result.GetHash(), ternary.Trytes("UFWJYEWKMEQDNSQUCUWBGOFRHVBGHVVYEZCLCGRDTRQSMAFALTIPMJEEYFDPMQCNJWLXUWFMBZGHQRO99"), "invalid bundle hash")
assert.Equal(t, result.IsValueBundle(), true, "invalid value bundle status")
}
}
...@@ -7,15 +7,13 @@ import ( ...@@ -7,15 +7,13 @@ import (
) )
var Events = pluginEvents{ var Events = pluginEvents{
DataBundleReceived: events.NewEvent(bundleEventCaller), BundleSolid: events.NewEvent(bundleEventCaller),
ValueBundleReceived: events.NewEvent(bundleEventCaller), InvalidBundle: events.NewEvent(bundleEventCaller),
InvalidBundleReceived: events.NewEvent(bundleEventCaller),
} }
type pluginEvents struct { type pluginEvents struct {
DataBundleReceived *events.Event BundleSolid *events.Event
ValueBundleReceived *events.Event InvalidBundle *events.Event
InvalidBundleReceived *events.Event
} }
func bundleEventCaller(handler interface{}, params ...interface{}) { func bundleEventCaller(handler interface{}, params ...interface{}) {
......
...@@ -34,7 +34,7 @@ func configure(plugin *node.Plugin) { ...@@ -34,7 +34,7 @@ func configure(plugin *node.Plugin) {
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
plugin.LogInfo("Starting Bundle Processor ...") plugin.LogInfo("Starting Bundle Processor ...")
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Bundle Processor", func() {
plugin.LogSuccess("Starting Bundle Processor ... done") plugin.LogSuccess("Starting Bundle Processor ... done")
workerPool.Run() workerPool.Run()
......
...@@ -47,7 +47,7 @@ func runNeighbors(plugin *node.Plugin) { ...@@ -47,7 +47,7 @@ func runNeighbors(plugin *node.Plugin) {
} }
func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Connection Manager ("+neighbor.Identity.StringIdentifier+")", func() {
failedConnectionAttempts := 0 failedConnectionAttempts := 0
for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
......
...@@ -26,7 +26,7 @@ func configureSendQueue(plugin *node.Plugin) { ...@@ -26,7 +26,7 @@ func configureSendQueue(plugin *node.Plugin) {
func runSendQueue(plugin *node.Plugin) { func runSendQueue(plugin *node.Plugin) {
plugin.LogInfo("Starting Send Queue Dispatcher ...") plugin.LogInfo("Starting Send Queue Dispatcher ...")
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Gossip Send Queue Dispatcher", func() {
plugin.LogSuccess("Starting Send Queue Dispatcher ... done") plugin.LogSuccess("Starting Send Queue Dispatcher ... done")
for { for {
...@@ -54,7 +54,7 @@ func runSendQueue(plugin *node.Plugin) { ...@@ -54,7 +54,7 @@ func runSendQueue(plugin *node.Plugin) {
connectedNeighborsMutex.Lock() connectedNeighborsMutex.Lock()
for _, neighborQueue := range neighborQueues { for _, neighborQueue := range neighborQueues {
startNeighborSendQueue(neighborQueue) startNeighborSendQueue(neighborQueue.protocol.Neighbor, neighborQueue)
} }
connectedNeighborsMutex.Unlock() connectedNeighborsMutex.Unlock()
} }
...@@ -104,13 +104,13 @@ func setupEventHandlers(neighbor *Neighbor) { ...@@ -104,13 +104,13 @@ func setupEventHandlers(neighbor *Neighbor) {
})) }))
if daemon.IsRunning() { if daemon.IsRunning() {
startNeighborSendQueue(queue) startNeighborSendQueue(neighbor, queue)
} }
})) }))
} }
func startNeighborSendQueue(neighborQueue *neighborQueue) { func startNeighborSendQueue(neighbor *Neighbor, neighborQueue *neighborQueue) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Gossip Send Queue ("+neighbor.Identity.StringIdentifier+")", func() {
for { for {
select { select {
case <-daemon.ShutdownSignal: case <-daemon.ShutdownSignal:
......
...@@ -73,7 +73,7 @@ func configureServer(plugin *node.Plugin) { ...@@ -73,7 +73,7 @@ func configureServer(plugin *node.Plugin) {
func runServer(plugin *node.Plugin) { func runServer(plugin *node.Plugin) {
plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ...") plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ...")
daemon.BackgroundWorker(func() { daemon.BackgroundWorker("Gossip TCP Server", func() {
plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ... done") plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ... done")
TCPServer.Listen(*PORT.Value) TCPServer.Listen(*PORT.Value)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
"strings"
"syscall" "syscall"
"time" "time"
...@@ -31,7 +32,13 @@ var PLUGIN = node.NewPlugin("Graceful Shutdown", func(plugin *node.Plugin) { ...@@ -31,7 +32,13 @@ var PLUGIN = node.NewPlugin("Graceful Shutdown", func(plugin *node.Plugin) {
secondsSinceStart := x.Sub(start).Seconds() secondsSinceStart := x.Sub(start).Seconds()
if secondsSinceStart <= WAIT_TO_KILL_TIME_IN_SECONDS { if secondsSinceStart <= WAIT_TO_KILL_TIME_IN_SECONDS {
plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing ...") processList := ""
runningBackgroundWorkers := daemon.GetRunningBackgroundWorkers()
if len(runningBackgroundWorkers) >= 1 {
processList = "(" + strings.Join(runningBackgroundWorkers, ", ") + ") "
}
plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing " + processList + "...")
} else { } else {
plugin.LogFailure("Background processes did not terminate in time! Forcing shutdown ...") plugin.LogFailure("Background processes did not terminate in time! Forcing shutdown ...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment