Skip to content
Snippets Groups Projects
Unverified Commit 2e07cdb3 authored by capossele's avatar capossele
Browse files

:heavy_minus_sign: Remove filter shutdown

parent c75a9cfa
Branches
No related tags found
No related merge requests found
Showing
with 12 additions and 42 deletions
...@@ -69,9 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, ...@@ -69,9 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message,
filter.onRejectCallback = callback filter.onRejectCallback = callback
} }
// Shutdown shuts down the filter.
func (filter *SignatureFilter) Shutdown() {}
// getAcceptCallback returns the callback that is executed when a message passes the filter. // getAcceptCallback returns the callback that is executed when a message passes the filter.
func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) {
filter.onAcceptCallbackMutex.RLock() filter.onAcceptCallbackMutex.RLock()
......
...@@ -25,6 +25,8 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { ...@@ -25,6 +25,8 @@ func NewMessageSignatureFilter() *MessageSignatureFilter {
return &MessageSignatureFilter{} return &MessageSignatureFilter{}
} }
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) {
if msg.VerifySignature() { if msg.VerifySignature() {
filter.getAcceptCallback()(msg, peer) filter.getAcceptCallback()(msg, peer)
...@@ -33,20 +35,20 @@ func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Pe ...@@ -33,20 +35,20 @@ func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Pe
filter.getRejectCallback()(msg, ErrInvalidSignature, peer) filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
} }
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
} }
// OnReject registers the given callback as the rejection function of the filter.
func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) { func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock() filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock() filter.onRejectCallbackMutex.Unlock()
} }
func (filter *MessageSignatureFilter) Shutdown() {}
func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.RLock() filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback result = filter.onAcceptCallback
......
...@@ -58,9 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { ...@@ -58,9 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) {
f.rejectCallback = callback f.rejectCallback = callback
} }
// Shutdown shuts down the filter.
func (f *PowFilter) Shutdown() {}
func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
......
...@@ -18,15 +18,14 @@ import ( ...@@ -18,15 +18,14 @@ import (
) )
var ( var (
testPayload = payload.NewData([]byte("test")) testPayload = payload.NewData([]byte("test"))
testPeer *peer.Peer = nil testPeer *peer.Peer
testWorker = pow.New(crypto.BLAKE2b_512, 1) testWorker = pow.New(crypto.BLAKE2b_512, 1)
testDifficulty = 10 testDifficulty = 10
) )
func TestPowFilter_Filter(t *testing.T) { func TestPowFilter_Filter(t *testing.T) {
filter := NewPowFilter(testWorker, testDifficulty) filter := NewPowFilter(testWorker, testDifficulty)
defer filter.Shutdown()
// set callbacks // set callbacks
m := &callbackMock{} m := &callbackMock{}
...@@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) { ...@@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) {
filter.Filter(msgPOWBytes, testPeer) filter.Filter(msgPOWBytes, testPeer)
}) })
filter.Shutdown()
m.AssertExpectations(t) m.AssertExpectations(t)
} }
......
...@@ -28,6 +28,8 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { ...@@ -28,6 +28,8 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter {
} }
} }
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
if filter.bytesFilter.Add(bytes) { if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes, peer) filter.getAcceptCallback()(bytes, peer)
...@@ -36,12 +38,14 @@ func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { ...@@ -36,12 +38,14 @@ func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
} }
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
} }
// OnReject registers the given callback as the rejection function of the filter.
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock() filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback filter.onRejectCallback = callback
...@@ -61,5 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] ...@@ -61,5 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
filter.onRejectCallbackMutex.Unlock() filter.onRejectCallbackMutex.Unlock()
return return
} }
func (filter *RecentlySeenBytesFilter) Shutdown() {}
...@@ -13,6 +13,4 @@ type BytesFilter interface { ...@@ -13,6 +13,4 @@ type BytesFilter interface {
OnAccept(callback func(bytes []byte, peer *peer.Peer)) OnAccept(callback func(bytes []byte, peer *peer.Peer))
// OnReject registers the given callback as the rejection function of the filter. // OnReject registers the given callback as the rejection function of the filter.
OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) OnReject(callback func(bytes []byte, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
} }
...@@ -15,6 +15,4 @@ type MessageFilter interface { ...@@ -15,6 +15,4 @@ type MessageFilter interface {
OnAccept(callback func(msg *message.Message, peer *peer.Peer)) OnAccept(callback func(msg *message.Message, peer *peer.Peer))
// OnAccept registers the given callback as the rejection function of the filter. // OnAccept registers the given callback as the rejection function of the filter.
OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
} }
...@@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) { ...@@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) {
messageParser.messageFiltersModified.Set() messageParser.messageFiltersModified.Set()
} }
// Shutdown shut downs the message parser and its corresponding registered filters.
func (messageParser *MessageParser) Shutdown() {
messageParser.bytesFiltersMutex.Lock()
for _, bytesFilter := range messageParser.bytesFilters {
bytesFilter.Shutdown()
}
messageParser.bytesFiltersMutex.Unlock()
messageParser.messageFiltersMutex.Lock()
for _, messageFilter := range messageParser.messageFilters {
messageFilter.Shutdown()
}
messageParser.messageFiltersMutex.Unlock()
}
// sets up the byte filter data flow chain. // sets up the byte filter data flow chain.
func (messageParser *MessageParser) setupBytesFilterDataFlow() { func (messageParser *MessageParser) setupBytesFilterDataFlow() {
if !messageParser.byteFiltersModified.IsSet() { if !messageParser.byteFiltersModified.IsSet() {
......
...@@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { ...@@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
msgParser.Parse(msgBytes, nil) msgParser.Parse(msgBytes, nil)
} }
msgParser.Shutdown()
} }
func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
...@@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { ...@@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
msgParser.Parse(messageBytes[i], nil) msgParser.Parse(messageBytes[i], nil)
} }
msgParser.Shutdown()
} }
func TestMessageParser_ParseMessage(t *testing.T) { func TestMessageParser_ParseMessage(t *testing.T) {
...@@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) { ...@@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) {
msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) { msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) {
log.Infof("parsed message") log.Infof("parsed message")
})) }))
msgParser.Shutdown()
} }
func newTestMessage(payloadString string) *message.Message { func newTestMessage(payloadString string) *message.Message {
......
...@@ -132,7 +132,6 @@ func run(*node.Plugin) { ...@@ -132,7 +132,6 @@ func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
messageFactory.Shutdown() messageFactory.Shutdown()
messageParser.Shutdown()
_tangle.Shutdown() _tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil { }, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment