Skip to content
Snippets Groups Projects
Select Git revision
  • 7548e4bc71c8e59ce18974955cebb29c78727b03
  • without_tipselection default
  • develop protected
  • fix/grafana-local-dashboard
  • wasp
  • fix/dashboard-explorer-freeze
  • master
  • feat/timerqueue
  • test/sync_debug_and_650
  • feat/sync_revamp_inv
  • wip/sync
  • tool/db-recovery
  • portcheck/fix
  • fix/synchronization
  • feat/new-dashboard-analysis
  • feat/refactored-analysis-dashboard
  • feat/new-analysis-dashboard
  • test/demo-prometheus-fpc
  • prometheus_metrics
  • wip/analysis-server
  • merge/fpc-test-value-transfer
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.3
  • v0.1.2
  • v0.1.1
  • v0.1.0
28 results

tipselection.go

Blame
  • message_parser.go 4.76 KiB
    package messageparser
    
    import (
    	"sync"
    
    	"gitlab.imt-atlantique.fr/i18colle/goshimmer_without_tipselection/packages/binary/messagelayer/message"
    	"gitlab.imt-atlantique.fr/i18colle/goshimmer_without_tipselection/packages/binary/messagelayer/messageparser/builtinfilters"
    
    	"github.com/iotaledger/hive.go/autopeering/peer"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/typeutils"
    )
    
    // MessageParser parses messages and bytes and emits corresponding events for parsed and rejected messages.
    type MessageParser struct {
    	bytesFilters   []BytesFilter
    	messageFilters []MessageFilter
    	Events         Events
    
    	byteFiltersModified    typeutils.AtomicBool
    	messageFiltersModified typeutils.AtomicBool
    	bytesFiltersMutex      sync.Mutex
    	messageFiltersMutex    sync.Mutex
    }
    
    // New creates a new message parser.
    func New() (result *MessageParser) {
    	result = &MessageParser{
    		bytesFilters:   make([]BytesFilter, 0),
    		messageFilters: make([]MessageFilter, 0),
    		Events: Events{
    			MessageParsed: events.NewEvent(func(handler interface{}, params ...interface{}) {
    				handler.(func(*message.Message, *peer.Peer))(params[0].(*message.Message), params[1].(*peer.Peer))
    			}),
    			BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
    				handler.(func([]byte, error, *peer.Peer))(params[0].([]byte), params[1].(error), params[2].(*peer.Peer))
    			}),
    			MessageRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
    				handler.(func(*message.Message, error, *peer.Peer))(params[0].(*message.Message), params[1].(error), params[2].(*peer.Peer))
    			}),
    		},
    	}
    
    	// add builtin filters
    	result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter())
    	result.AddMessageFilter(builtinfilters.NewMessageSignatureFilter())
    	return
    }
    
    // Parse parses the given message bytes.
    func (messageParser *MessageParser) Parse(messageBytes []byte, peer *peer.Peer) {
    	messageParser.setupBytesFilterDataFlow()
    	messageParser.setupMessageFilterDataFlow()
    	messageParser.bytesFilters[0].Filter(messageBytes, peer)
    }
    
    // AddBytesFilter adds the given bytes filter to the parser.
    func (messageParser *MessageParser) AddBytesFilter(filter BytesFilter) {
    	messageParser.bytesFiltersMutex.Lock()
    	messageParser.bytesFilters = append(messageParser.bytesFilters, filter)
    	messageParser.bytesFiltersMutex.Unlock()
    	messageParser.byteFiltersModified.Set()
    }
    
    // AddMessageFilter adds a new message filter to the parser.
    func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) {
    	messageParser.messageFiltersMutex.Lock()
    	messageParser.messageFilters = append(messageParser.messageFilters, filter)
    	messageParser.messageFiltersMutex.Unlock()
    	messageParser.messageFiltersModified.Set()
    }
    
    // sets up the byte filter data flow chain.
    func (messageParser *MessageParser) setupBytesFilterDataFlow() {
    	if !messageParser.byteFiltersModified.IsSet() {
    		return
    	}
    
    	messageParser.bytesFiltersMutex.Lock()
    	if messageParser.byteFiltersModified.IsSet() {
    		messageParser.byteFiltersModified.SetTo(false)
    
    		numberOfBytesFilters := len(messageParser.bytesFilters)
    		for i := 0; i < numberOfBytesFilters; i++ {
    			if i == numberOfBytesFilters-1 {
    				messageParser.bytesFilters[i].OnAccept(messageParser.parseMessage)
    			} else {
    				messageParser.bytesFilters[i].OnAccept(messageParser.bytesFilters[i+1].Filter)
    			}
    			messageParser.bytesFilters[i].OnReject(func(bytes []byte, err error, peer *peer.Peer) {
    				messageParser.Events.BytesRejected.Trigger(bytes, err, peer)
    			})
    		}
    	}
    	messageParser.bytesFiltersMutex.Unlock()
    }
    
    // sets up the message filter data flow chain.
    func (messageParser *MessageParser) setupMessageFilterDataFlow() {
    	if !messageParser.messageFiltersModified.IsSet() {
    		return
    	}
    
    	messageParser.messageFiltersMutex.Lock()
    	if messageParser.messageFiltersModified.IsSet() {
    		messageParser.messageFiltersModified.SetTo(false)
    
    		numberOfMessageFilters := len(messageParser.messageFilters)
    		for i := 0; i < numberOfMessageFilters; i++ {
    			if i == numberOfMessageFilters-1 {
    				messageParser.messageFilters[i].OnAccept(func(msg *message.Message, peer *peer.Peer) {
    					messageParser.Events.MessageParsed.Trigger(msg, peer)
    				})
    			} else {
    				messageParser.messageFilters[i].OnAccept(messageParser.messageFilters[i+1].Filter)
    			}
    			messageParser.messageFilters[i].OnReject(func(msg *message.Message, err error, peer *peer.Peer) {
    				messageParser.Events.MessageRejected.Trigger(msg, err, peer)
    			})
    		}
    	}
    	messageParser.messageFiltersMutex.Unlock()
    }
    
    // parses the given message and emits
    func (messageParser *MessageParser) parseMessage(bytes []byte, peer *peer.Peer) {
    	if parsedMessage, err, _ := message.FromBytes(bytes); err != nil {
    		messageParser.Events.BytesRejected.Trigger(bytes, err, peer)
    	} else {
    		messageParser.messageFilters[0].Filter(parsedMessage, peer)
    	}
    }