diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index de79193ae32dd98288bf844946072903becf00d4..a7e7e2cc0c254f4248237d4d44a9d5f39bd8d065 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -69,9 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, filter.onRejectCallback = callback } -// Shutdown shuts down the filter. -func (filter *SignatureFilter) Shutdown() {} - // getAcceptCallback returns the callback that is executed when a message passes the filter. func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { filter.onAcceptCallbackMutex.RLock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index 554cc4ccd6651d637005279070b37a968131cc06..02ba960c4e183af9dd7b084c798accaf1d14e458 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -25,6 +25,8 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { return &MessageSignatureFilter{} } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { if msg.VerifySignature() { filter.getAcceptCallback()(msg, peer) @@ -33,20 +35,20 @@ func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Pe filter.getRejectCallback()(msg, ErrInvalidSignature, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback filter.onRejectCallbackMutex.Unlock() } -func (filter *MessageSignatureFilter) Shutdown() {} - func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() result = filter.onAcceptCallback diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index 5de27b283457db365f8803571a1f487075ac7f7b..0147ec3730374158de774f3623c90ac24c8faeca 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -58,9 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { f.rejectCallback = callback } -// Shutdown shuts down the filter. -func (f *PowFilter) Shutdown() {} - func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { f.mu.Lock() defer f.mu.Unlock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go index fe53a9d71cb715dec6a9ad5e26fc87365a680493..b348247bb28c6c5b4f7cf533492b5fa23b21af54 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go @@ -18,15 +18,14 @@ import ( ) var ( - testPayload = payload.NewData([]byte("test")) - testPeer *peer.Peer = nil - testWorker = pow.New(crypto.BLAKE2b_512, 1) - testDifficulty = 10 + testPayload = payload.NewData([]byte("test")) + testPeer *peer.Peer + testWorker = pow.New(crypto.BLAKE2b_512, 1) + testDifficulty = 10 ) func TestPowFilter_Filter(t *testing.T) { filter := NewPowFilter(testWorker, testDifficulty) - defer filter.Shutdown() // set callbacks m := &callbackMock{} @@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) { filter.Filter(msgPOWBytes, testPeer) }) - filter.Shutdown() m.AssertExpectations(t) } diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 6cbeec505062967ff21f79e82f7f9813f94c65fd..855d79426db2487b2badb2c2eab690db85fbf81b 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -28,6 +28,8 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { } } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { if filter.bytesFilter.Add(bytes) { filter.getAcceptCallback()(bytes, peer) @@ -36,12 +38,14 @@ func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback @@ -61,5 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] filter.onRejectCallbackMutex.Unlock() return } - -func (filter *RecentlySeenBytesFilter) Shutdown() {} diff --git a/packages/binary/messagelayer/messageparser/bytes_filter.go b/packages/binary/messagelayer/messageparser/bytes_filter.go index 96d928e582ee330dfc732c002b4119c561e6f19b..7c62a45982127604f7e4bfb84ca94fda93e4d10c 100644 --- a/packages/binary/messagelayer/messageparser/bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/bytes_filter.go @@ -13,6 +13,4 @@ type BytesFilter interface { OnAccept(callback func(bytes []byte, peer *peer.Peer)) // OnReject registers the given callback as the rejection function of the filter. OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_filter.go b/packages/binary/messagelayer/messageparser/message_filter.go index f303a5b6ffc3833391377f69dc8b37224ceefeda..b883f3d12025fb5b2b3c6d54cc7c502b28e291c0 100644 --- a/packages/binary/messagelayer/messageparser/message_filter.go +++ b/packages/binary/messagelayer/messageparser/message_filter.go @@ -15,6 +15,4 @@ type MessageFilter interface { OnAccept(callback func(msg *message.Message, peer *peer.Peer)) // OnAccept registers the given callback as the rejection function of the filter. OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_parser.go b/packages/binary/messagelayer/messageparser/message_parser.go index 8e491be4949927c1eb26e1bb05b1a6632b740108..9c590e63ff6fe81dcb30925ee0abd7498bc58d0f 100644 --- a/packages/binary/messagelayer/messageparser/message_parser.go +++ b/packages/binary/messagelayer/messageparser/message_parser.go @@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) { messageParser.messageFiltersModified.Set() } -// Shutdown shut downs the message parser and its corresponding registered filters. -func (messageParser *MessageParser) Shutdown() { - messageParser.bytesFiltersMutex.Lock() - for _, bytesFilter := range messageParser.bytesFilters { - bytesFilter.Shutdown() - } - messageParser.bytesFiltersMutex.Unlock() - - messageParser.messageFiltersMutex.Lock() - for _, messageFilter := range messageParser.messageFilters { - messageFilter.Shutdown() - } - messageParser.messageFiltersMutex.Unlock() -} - // sets up the byte filter data flow chain. func (messageParser *MessageParser) setupBytesFilterDataFlow() { if !messageParser.byteFiltersModified.IsSet() { diff --git a/packages/binary/messagelayer/messageparser/message_parser_test.go b/packages/binary/messagelayer/messageparser/message_parser_test.go index 2e33aad3d231fb15852b182fbdbe7453891052b4..e85cf4cee9881f6902a2ff6f4de3e0e3c6e2a060 100644 --- a/packages/binary/messagelayer/messageparser/message_parser_test.go +++ b/packages/binary/messagelayer/messageparser/message_parser_test.go @@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(msgBytes, nil) } - - msgParser.Shutdown() } func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { @@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(messageBytes[i], nil) } - - msgParser.Shutdown() } func TestMessageParser_ParseMessage(t *testing.T) { @@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) { msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) { log.Infof("parsed message") })) - - msgParser.Shutdown() } func newTestMessage(payloadString string) *message.Message { diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 084df96ce53f45d156a0122cb868eac6ea1cc871..23967033e30475adf6cfb8bb3c09e80f3a97a0f4 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -132,7 +132,6 @@ func run(*node.Plugin) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal messageFactory.Shutdown() - messageParser.Shutdown() _tangle.Shutdown() }, shutdown.PriorityTangle); err != nil { log.Panicf("Failed to start as daemon: %s", err)