Skip to content
Snippets Groups Projects
Unverified Commit f7533a9e authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Merge branch 'master' into feat/iota.go

parents b5ec7827 d3274a21
No related branches found
No related tags found
No related merge requests found
Showing
with 542 additions and 99 deletions
......@@ -4,6 +4,7 @@ import (
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/bundleprocessor"
"github.com/iotaledger/goshimmer/plugins/cli"
"github.com/iotaledger/goshimmer/plugins/gossip"
gossip_on_solidification "github.com/iotaledger/goshimmer/plugins/gossip-on-solidification"
......@@ -25,6 +26,7 @@ func main() {
gossip.PLUGIN,
gossip_on_solidification.PLUGIN,
tangle.PLUGIN,
bundleprocessor.PLUGIN,
analysis.PLUGIN,
gracefulshutdown.PLUGIN,
tipselection.PLUGIN,
......
package batchworkerpool
import (
"sync"
"time"
)
type BatchWorkerPool struct {
workerFnc func([]Task)
options *Options
calls chan Task
batchedCalls chan []Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
}
func New(workerFnc func([]Task), optionalOptions ...Option) (result *BatchWorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &BatchWorkerPool{
workerFnc: workerFnc,
options: options,
}
result.resetChannels()
return
}
func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
}
wp.mutex.RUnlock()
return
}
func (wp *BatchWorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startBatchDispatcher()
wp.startBatchWorkers()
}
wp.mutex.Unlock()
}
func (wp *BatchWorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *BatchWorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *BatchWorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *BatchWorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.batchedCalls = make(chan []Task, 2*wp.options.WorkerCount)
wp.terminate = make(chan int, 1)
}
func (wp *BatchWorkerPool) startBatchDispatcher() {
calls := wp.calls
terminate := wp.terminate
wp.wait.Add(1)
go func() {
for {
select {
case <-terminate:
wp.wait.Done()
return
case firstCall := <-calls:
batchTask := append(make([]Task, 0), firstCall)
collectionTimeout := time.After(wp.options.BatchCollectionTimeout)
// collect additional requests that arrive within the timeout
CollectAdditionalCalls:
for {
select {
case <-terminate:
wp.wait.Done()
return
case <-collectionTimeout:
break CollectAdditionalCalls
case call := <-wp.calls:
batchTask = append(batchTask, call)
if len(batchTask) == wp.options.BatchSize {
break CollectAdditionalCalls
}
}
}
wp.batchedCalls <- batchTask
}
}
}()
}
func (wp *BatchWorkerPool) startBatchWorkers() {
batchedCalls := wp.batchedCalls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
go func() {
aborted := false
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-batchedCalls:
wp.workerFnc(batchTask)
}
}
wp.wait.Done()
}()
}
}
package batchworkerpool
import (
"runtime"
"time"
)
var DEFAULT_OPTIONS = &Options{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 2 * runtime.NumCPU() * 64,
BatchSize: 64,
BatchCollectionTimeout: 15 * time.Millisecond,
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.WorkerCount = workerCount
}
}
func BatchSize(batchSize int) Option {
return func(args *Options) {
args.BatchSize = batchSize
}
}
func BatchCollectionTimeout(batchCollectionTimeout time.Duration) Option {
return func(args *Options) {
args.BatchCollectionTimeout = batchCollectionTimeout
}
}
func QueueSize(queueSize int) Option {
return func(args *Options) {
args.QueueSize = queueSize
}
}
type Options struct {
WorkerCount int
QueueSize int
BatchSize int
BatchCollectionTimeout time.Duration
}
func (options Options) Override(optionalOptions ...Option) *Options {
result := &options
for _, option := range optionalOptions {
option(result)
}
return result
}
type Option func(*Options)
package batchworkerpool
type Task struct {
params []interface{}
resultChan chan interface{}
}
func (task *Task) Return(result interface{}) {
task.resultChan <- result
close(task.resultChan)
}
func (task *Task) Param(index int) interface{} {
return task.params[index]
}
......@@ -3,80 +3,40 @@ package curl
import (
"fmt"
"strconv"
"time"
"github.com/iotaledger/goshimmer/packages/batchworkerpool"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/iota.go/trinary"
)
type HashRequest struct {
input trinary.Trits
output chan trinary.Trits
}
type BatchHasher struct {
hashRequests chan HashRequest
tasks chan []HashRequest
hashLength int
rounds int
hashLength int
rounds int
workerPool *batchworkerpool.BatchWorkerPool
}
func NewBatchHasher(hashLength int, rounds int) *BatchHasher {
this := &BatchHasher{
hashLength: hashLength,
rounds: rounds,
hashRequests: make(chan HashRequest),
tasks: make(chan []HashRequest, NUMBER_OF_WORKERS),
func NewBatchHasher(hashLength int, rounds int) (result *BatchHasher) {
result = &BatchHasher{
hashLength: hashLength,
rounds: rounds,
}
go this.startDispatcher()
this.startWorkers()
result.workerPool = batchworkerpool.New(result.processHashes, batchworkerpool.BatchSize(strconv.IntSize))
result.workerPool.Start()
return this
return
}
func (this *BatchHasher) startWorkers() {
for i := 0; i < NUMBER_OF_WORKERS; i++ {
go func() {
for {
this.processHashes(<-this.tasks)
}
}()
}
func (this *BatchHasher) Hash(trits ternary.Trits) ternary.Trits {
return (<-this.workerPool.Submit(trits)).(ternary.Trits)
}
func (this *BatchHasher) startDispatcher() {
for {
collectedHashRequests := make([]HashRequest, 0)
// wait for first request to start processing at all
collectedHashRequests = append(collectedHashRequests, <-this.hashRequests)
// collect additional requests that arrive within the timeout
CollectAdditionalRequests:
for {
select {
case hashRequest := <-this.hashRequests:
collectedHashRequests = append(collectedHashRequests, hashRequest)
if len(collectedHashRequests) == strconv.IntSize {
break CollectAdditionalRequests
}
case <-time.After(50 * time.Millisecond):
break CollectAdditionalRequests
}
}
this.tasks <- collectedHashRequests
}
}
func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) {
if len(collectedHashRequests) > 1 {
func (this *BatchHasher) processHashes(tasks []batchworkerpool.Task) {
if len(tasks) > 1 {
// multiplex the requests
multiplexer := ternary.NewBCTernaryMultiplexer()
for _, hashRequest := range collectedHashRequests {
multiplexer.Add(hashRequest.input)
for _, hashRequest := range tasks {
multiplexer.Add(hashRequest.Param(0).(ternary.Trits))
}
bcTrits, err := multiplexer.Extract()
if err != nil {
......@@ -90,33 +50,18 @@ func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) {
// extract the results from the demultiplexer
demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243))
for i, hashRequest := range collectedHashRequests {
hashRequest.output <- demux.Get(i)
close(hashRequest.output)
for i, task := range tasks {
task.Return(demux.Get(i))
}
} else {
var resp = make(trinary.Trits, this.hashLength)
trits := tasks[0].Param(0).(ternary.Trits)
curl := NewCurl(this.hashLength, this.rounds)
curl.Absorb(collectedHashRequests[0].input, 0, len(collectedHashRequests[0].input))
curl.Absorb(trits, 0, len(trits))
curl.Squeeze(resp, 0, this.hashLength)
collectedHashRequests[0].output <- resp
close(collectedHashRequests[0].output)
tasks[0].Return(resp)
}
}
func (this *BatchHasher) Hash(trits trinary.Trits) chan trinary.Trits {
hashRequest := HashRequest{
input: trits,
output: make(chan trinary.Trits, 1),
}
this.hashRequests <- hashRequest
return hashRequest.output
}
const (
NUMBER_OF_WORKERS = 1000
)
package curl
import (
"sync"
"testing"
"github.com/iotaledger/goshimmer/packages/ternary"
)
func BenchmarkBatchHasher_Hash(b *testing.B) {
batchHasher := NewBatchHasher(243, 81)
tritsToHash := ternary.Trytes("A999999FF").ToTrits()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
batchHasher.Hash(tritsToHash)
wg.Done()
}()
}
wg.Wait()
}
......@@ -5,29 +5,53 @@ import (
)
var (
running bool
wg sync.WaitGroup
ShutdownSignal = make(chan int, 1)
backgroundWorkers = make([]func(), 0)
lock = sync.Mutex{}
running bool
wg sync.WaitGroup
ShutdownSignal = make(chan int, 1)
backgroundWorkers = make([]func(), 0)
backgroundWorkerNames = make([]string, 0)
runningBackgroundWorkers = make(map[string]bool)
lock = sync.Mutex{}
)
func runBackgroundWorker(backgroundWorker func()) {
func GetRunningBackgroundWorkers() []string {
lock.Lock()
result := make([]string, 0)
for runningBackgroundWorker := range runningBackgroundWorkers {
result = append(result, runningBackgroundWorker)
}
lock.Unlock()
return result
}
func runBackgroundWorker(name string, backgroundWorker func()) {
wg.Add(1)
go func() {
lock.Lock()
runningBackgroundWorkers[name] = true
lock.Unlock()
backgroundWorker()
lock.Lock()
delete(runningBackgroundWorkers, name)
lock.Unlock()
wg.Done()
}()
}
func BackgroundWorker(handler func()) {
func BackgroundWorker(name string, handler func()) {
lock.Lock()
if IsRunning() {
runBackgroundWorker(handler)
runBackgroundWorker(name, handler)
} else {
backgroundWorkerNames = append(backgroundWorkerNames, name)
backgroundWorkers = append(backgroundWorkers, handler)
}
......@@ -45,8 +69,8 @@ func Run() {
Events.Run.Trigger()
for _, backgroundWorker := range backgroundWorkers {
runBackgroundWorker(backgroundWorker)
for i, backgroundWorker := range backgroundWorkers {
runBackgroundWorker(backgroundWorkerNames[i], backgroundWorker)
}
}
......
......@@ -120,7 +120,7 @@ func (this *MetaTransaction) GetWeightMagnitude() (result int) {
// hashes the transaction using curl (without locking - internal usage)
func (this *MetaTransaction) parseHashRelatedDetails() {
hashTrits := <-curl.CURLP81.Hash(this.trits)
hashTrits := curl.CURLP81.Hash(this.trits)
hashTrytes := trinary.MustTritsToTrytes(hashTrits)
this.hash = &hashTrytes
......
......@@ -55,3 +55,16 @@ func (plugin *Plugin) LogFailure(message string) {
func (plugin *Plugin) LogDebug(message string) {
plugin.Node.LogDebug(plugin.Name, message)
}
var TestNode = &Node{
loggers: make([]*Logger, 0),
wg: &sync.WaitGroup{},
loadedPlugins: make([]*Plugin, 0),
}
func (plugin *Plugin) InitTest() {
plugin.Node = TestNode
plugin.Events.Configure.Trigger(plugin)
plugin.Events.Run.Trigger(plugin)
}
......@@ -23,7 +23,7 @@ func Start(tps int64) {
shutdownSignal = make(chan int, 1)
func(shutdownSignal chan int) {
daemon.BackgroundWorker(func() {
daemon.BackgroundWorker("Transaction Spammer", func() {
for {
start := time.Now()
sentCounter := int64(0)
......
package workerpool
import (
"runtime"
)
var DEFAULT_OPTIONS = &Options{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 4 * runtime.NumCPU(),
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.WorkerCount = workerCount
}
}
func QueueSize(queueSize int) Option {
return func(args *Options) {
args.QueueSize = queueSize
}
}
type Options struct {
WorkerCount int
QueueSize int
}
func (options Options) Override(optionalOptions ...Option) *Options {
result := &options
for _, option := range optionalOptions {
option(result)
}
return result
}
type Option func(*Options)
package workerpool
type Task struct {
params []interface{}
resultChan chan interface{}
}
func (task *Task) Return(result interface{}) {
task.resultChan <- result
close(task.resultChan)
}
func (task *Task) Param(index int) interface{} {
return task.params[index]
}
package workerpool
import (
"sync"
)
type WorkerPool struct {
workerFnc func(Task)
options *Options
calls chan Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
}
func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &WorkerPool{
workerFnc: workerFnc,
options: options,
}
result.resetChannels()
return
}
func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
}
wp.mutex.RUnlock()
return
}
func (wp *WorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startWorkers()
}
wp.mutex.Unlock()
}
func (wp *WorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *WorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *WorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *WorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.terminate = make(chan int, 1)
}
func (wp *WorkerPool) startWorkers() {
calls := wp.calls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
go func() {
aborted := false
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-calls:
wp.workerFnc(batchTask)
}
}
wp.wait.Done()
}()
}
}
package workerpool
import (
"sync"
"testing"
)
func Benchmark(b *testing.B) {
pool := New(func(task Task) {
task.Return(task.Param(0))
}, WorkerCount(10), QueueSize(2000))
pool.Start()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
<-pool.Submit(i)
wg.Done()
}()
}
wg.Wait()
}
......@@ -21,7 +21,7 @@ import (
)
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() {
daemon.BackgroundWorker("Analysis Client", func() {
shuttingDown := false
for !shuttingDown {
......
......@@ -36,7 +36,7 @@ func Configure(plugin *node.Plugin) {
}
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() {
daemon.BackgroundWorker("Analysis Server", func() {
plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...")
server.Listen(*SERVER_PORT.Value)
......
......@@ -32,7 +32,7 @@ func Configure(plugin *node.Plugin) {
}
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() {
daemon.BackgroundWorker("Analysis HTTP Server", func() {
httpServer.ListenAndServe()
})
}
......@@ -35,7 +35,7 @@ func Configure(plugin *node.Plugin) {
}
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(func() {
daemon.BackgroundWorker("Neighborhood Updater", func() {
timeutil.Ticker(updateNeighborHood, 1*time.Second)
})
}
......
......@@ -41,7 +41,11 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() {
func sendOutgoingRequests(plugin *node.Plugin) {
for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() {
time.Sleep(5 * time.Second)
select {
case <-daemon.ShutdownSignal:
return
case <-time.After(5 * time.Second):
}
if candidateShouldBeContacted(chosenNeighborCandidate) {
if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil {
......
......@@ -20,8 +20,8 @@ func Configure(plugin *node.Plugin) {
}
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker(createChosenNeighborDropper(plugin))
daemon.BackgroundWorker(createAcceptedNeighborDropper(plugin))
daemon.BackgroundWorker(createOutgoingRequestProcessor(plugin))
daemon.BackgroundWorker(createOutgoingPingProcessor(plugin))
daemon.BackgroundWorker("Autopeering Chosen Neighbor Dropper", createChosenNeighborDropper(plugin))
daemon.BackgroundWorker("Autopeering Accepted Neighbor Dropper", createAcceptedNeighborDropper(plugin))
daemon.BackgroundWorker("Autopeering Outgoing Request Processor", createOutgoingRequestProcessor(plugin))
daemon.BackgroundWorker("Autopeering Outgoing Ping Processor", createOutgoingPingProcessor(plugin))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment