Skip to content
Snippets Groups Projects
Unverified Commit 8e62ab07 authored by Levente Pap's avatar Levente Pap
Browse files

OpinionEvent struct for calling Finalized and Failed vote events

parent dd464fb0
No related branches found
No related tags found
No related merge requests found
...@@ -89,8 +89,8 @@ func configure(_ *node.Plugin) { ...@@ -89,8 +89,8 @@ func configure(_ *node.Plugin) {
// configure FPC + link to consensus // configure FPC + link to consensus
configureFPC() configureFPC()
voter.Events().Finalized.Attach(events.NewClosure(FCOB.ProcessVoteResult)) voter.Events().Finalized.Attach(events.NewClosure(FCOB.ProcessVoteResult))
voter.Events().Failed.Attach(events.NewClosure(func(id string, lastOpinion vote.Opinion) { voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", id, lastOpinion) log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", ev.ID, ev.Opinion)
})) }))
// subscribe to message-layer // subscribe to message-layer
......
...@@ -185,6 +185,7 @@ func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opin ...@@ -185,6 +185,7 @@ func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opin
query := &votenet.QueryRequest{Id: ids} query := &votenet.QueryRequest{Id: ids}
reply, err := client.Opinion(ctx, query) reply, err := client.Opinion(ctx, query)
if err != nil { if err != nil {
// TODO: add an event QueryErrorEvent (metrics)
return nil, fmt.Errorf("unable to query opinions: %w", err) return nil, fmt.Errorf("unable to query opinions: %w", err)
} }
......
...@@ -41,15 +41,15 @@ func NewFCOB(tangle *tangle.Tangle, averageNetworkDelay time.Duration) (fcob *FC ...@@ -41,15 +41,15 @@ func NewFCOB(tangle *tangle.Tangle, averageNetworkDelay time.Duration) (fcob *FC
} }
// ProcessVoteResult allows an external voter to hand in the results of the voting process. // ProcessVoteResult allows an external voter to hand in the results of the voting process.
func (fcob *FCOB) ProcessVoteResult(id string, opinion vote.Opinion) { func (fcob *FCOB) ProcessVoteResult(ev *vote.OpinionEvent) {
transactionID, err := transaction.IDFromBase58(id) transactionID, err := transaction.IDFromBase58(ev.ID)
if err != nil { if err != nil {
fcob.Events.Error.Trigger(err) fcob.Events.Error.Trigger(err)
return return
} }
if _, err := fcob.tangle.SetTransactionPreferred(transactionID, opinion == vote.Like); err != nil { if _, err := fcob.tangle.SetTransactionPreferred(transactionID, ev.Opinion == vote.Like); err != nil {
fcob.Events.Error.Trigger(err) fcob.Events.Error.Trigger(err)
} }
} }
......
...@@ -170,12 +170,12 @@ func (f *FPC) finalizeOpinions() { ...@@ -170,12 +170,12 @@ func (f *FPC) finalizeOpinions() {
defer f.ctxsMu.Unlock() defer f.ctxsMu.Unlock()
for id, voteCtx := range f.ctxs { for id, voteCtx := range f.ctxs {
if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) { if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) {
f.events.Finalized.Trigger(id, voteCtx.LastOpinion()) f.events.Finalized.Trigger(&vote.OpinionEvent{ID: id, Opinion: voteCtx.LastOpinion(), Ctx: *voteCtx})
delete(f.ctxs, id) delete(f.ctxs, id)
continue continue
} }
if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext { if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext {
f.events.Failed.Trigger(id, voteCtx.LastOpinion()) f.events.Failed.Trigger(&vote.OpinionEvent{ID: id, Opinion: voteCtx.LastOpinion(), Ctx: *voteCtx})
delete(f.ctxs, id) delete(f.ctxs, id)
} }
} }
......
...@@ -96,8 +96,8 @@ func TestFPCFinalizedEvent(t *testing.T) { ...@@ -96,8 +96,8 @@ func TestFPCFinalizedEvent(t *testing.T) {
paras.QuerySampleSize = 1 paras.QuerySampleSize = 1
voter := fpc.New(opinionGiverFunc, paras) voter := fpc.New(opinionGiverFunc, paras)
var finalizedOpinion *vote.Opinion var finalizedOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, opinion vote.Opinion) { voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
finalizedOpinion = &opinion finalizedOpinion = &ev.Opinion
})) }))
assert.NoError(t, voter.Vote(id, vote.Like)) assert.NoError(t, voter.Vote(id, vote.Like))
...@@ -129,8 +129,8 @@ func TestFPCFailedEvent(t *testing.T) { ...@@ -129,8 +129,8 @@ func TestFPCFailedEvent(t *testing.T) {
paras.FinalizationThreshold = 4 paras.FinalizationThreshold = 4
voter := fpc.New(opinionGiverFunc, paras) voter := fpc.New(opinionGiverFunc, paras)
var failedOpinion *vote.Opinion var failedOpinion *vote.Opinion
voter.Events().Failed.Attach(events.NewClosure(func(id string, opinion vote.Opinion) { voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
failedOpinion = &opinion failedOpinion = &ev.Opinion
})) }))
assert.NoError(t, voter.Vote(id, vote.Like)) assert.NoError(t, voter.Vote(id, vote.Like))
...@@ -170,8 +170,8 @@ func TestFPCVotingMultipleOpinionGivers(t *testing.T) { ...@@ -170,8 +170,8 @@ func TestFPCVotingMultipleOpinionGivers(t *testing.T) {
paras.CoolingOffPeriod = 2 paras.CoolingOffPeriod = 2
voter := fpc.New(opinionGiverFunc, paras) voter := fpc.New(opinionGiverFunc, paras)
var finalOpinion *vote.Opinion var finalOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, finalizedOpinion vote.Opinion) { voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
finalOpinion = &finalizedOpinion finalOpinion = &ev.Opinion
})) }))
assert.NoError(t, voter.Vote(test.id, test.initOpinion)) assert.NoError(t, voter.Vote(test.id, test.initOpinion))
......
...@@ -46,6 +46,7 @@ func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryRe ...@@ -46,6 +46,7 @@ func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryRe
reply.Opinion[i] = int32(vs.opnRetriever(id)) reply.Opinion[i] = int32(vs.opnRetriever(id))
} }
// trigger metrics event
metrics.Events().FPCInboundBytes.Trigger(proto.Size(req)) metrics.Events().FPCInboundBytes.Trigger(proto.Size(req))
metrics.Events().FPCOutboundBytes.Trigger(proto.Size(reply)) metrics.Events().FPCOutboundBytes.Trigger(proto.Size(reply))
return reply, nil return reply, nil
......
...@@ -57,9 +57,19 @@ type RoundStats struct { ...@@ -57,9 +57,19 @@ type RoundStats struct {
QueriedOpinions []QueriedOpinions `json:"queried_opinions"` QueriedOpinions []QueriedOpinions `json:"queried_opinions"`
} }
// OpinionCaller calls the given handler with an Opinion and its associated ID. // OpinionEvent is the struct containing data to be passed around with Finalized and Failed events.
type OpinionEvent struct {
// ID is the of the conflict
ID string
// Opinion is an opinion about a conflict
Opinion Opinion
// Ctx contains all relevant infos regarding the conflict.
Ctx Context
}
// OpinionCaller calls the given handler with an OpinionEvent (containing its opinions, its associated ID and context).
func OpinionCaller(handler interface{}, params ...interface{}) { func OpinionCaller(handler interface{}, params ...interface{}) {
handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion)) handler.(func(ev *OpinionEvent))(params[0].(*OpinionEvent))
} }
// RoundStats calls the given handler with a RoundStats. // RoundStats calls the given handler with a RoundStats.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment