From a0e155b842e126eb049f51d72d354d7411fca81b Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Fri, 20 Sep 2024 15:34:28 +0200 Subject: [PATCH] Drop messages that are no longer useful for GPBFT progression As a GPBFT instance progresses some messages become irrelevant, in that they do not effectively aid the progress of the instance for participants. Instead, GPBFT offers other built-in mechanisms to aid progress of lagging participants such as selective rebroadcast, propagation of DECIDE messages from the previous instance and finality certificate exchange. The changes here introduce a dedicated error type returned as part of message validation to signal that although a message is valid it is no longer relevant. This error type is then checked by pubsub to avoid further propagation of those messages. This reduces the redundant pubsub traffic for the network participants. Fixes #583 --- gpbft/errors.go | 3 + gpbft/errors_test.go | 1 + gpbft/gpbft.go | 183 ++++++++++++++++++++++++++---------------- gpbft/gpbft_test.go | 3 +- gpbft/participant.go | 104 ++++++++++++++++++------ host.go | 4 + sim/network.go | 24 +++--- sim/sim.go | 16 ++-- test/withhold_test.go | 8 +- 9 files changed, 235 insertions(+), 111 deletions(-) diff --git a/gpbft/errors.go b/gpbft/errors.go index 3ec3dff7..5b37a7e6 100644 --- a/gpbft/errors.go +++ b/gpbft/errors.go @@ -25,6 +25,9 @@ var ( // // See SupplementalData. ErrValidationWrongSupplement = newValidationError("unexpected supplemental data") + // ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance, + // and is not worth propagating to others. + ErrValidationNotRelevant = newValidationError("message is valid but not relevant") // ErrReceivedWrongInstance signals that a message is received with mismatching instance ID. ErrReceivedWrongInstance = errors.New("received message for wrong instance") diff --git a/gpbft/errors_test.go b/gpbft/errors_test.go index fc8ff105..d37ea582 100644 --- a/gpbft/errors_test.go +++ b/gpbft/errors_test.go @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) { {name: "ErrValidationInvalid", subject: ErrValidationInvalid}, {name: "ErrValidationWrongBase", subject: ErrValidationWrongBase}, {name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement}, + {name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/gpbft/gpbft.go b/gpbft/gpbft.go index c024d8ba..3d65da33 100644 --- a/gpbft/gpbft.go +++ b/gpbft/gpbft.go @@ -9,6 +9,7 @@ import ( "math" "slices" "sort" + "sync" "time" "github.com/filecoin-project/go-bitfield" @@ -158,9 +159,13 @@ type instance struct { powerTable PowerTable // The beacon value from the base chain, used for tickets in this instance. beacon []byte - // Current round number. + // roundPhaseMutex guards access to round and phase. + roundPhaseMutex sync.RWMutex + // round represents the current round number. This field must not be accessed + // directly. See: getCurrentRound, setCurrentRound, IncrementAndGetCurrentRound. round uint64 - // Current phase in the round. + // phase represents the current phase in the round. This field must not be + // accessed directly. See: getCurrentPhase, setCurrentPhase. phase Phase // Time at which the current phase can or must end. // For QUALITY, PREPARE, and COMMIT, this is the latest time (the phase can end sooner). @@ -222,19 +227,12 @@ func newInstance( if input.IsZero() { return nil, fmt.Errorf("input is empty") } - metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrInitialPhase)) - metrics.currentInstance.Record(context.TODO(), int64(instanceID)) - metrics.currentPhase.Record(context.TODO(), int64(INITIAL_PHASE)) - metrics.currentRound.Record(context.TODO(), 0) - - return &instance{ + i := instance{ participant: participant, instanceID: instanceID, input: input, powerTable: powerTable, beacon: beacon, - round: 0, - phase: INITIAL_PHASE, supplementalData: data, proposal: input, value: ECChain{}, @@ -247,7 +245,16 @@ func newInstance( }, decision: newQuorumState(powerTable), tracer: participant.tracer, - }, nil + } + i.setCurrentRound(0) + i.setCurrentPhase(INITIAL_PHASE) + + metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrInitialPhase)) + metrics.currentInstance.Record(context.TODO(), int64(instanceID)) + metrics.currentPhase.Record(context.TODO(), int64(INITIAL_PHASE)) + metrics.currentRound.Record(context.TODO(), 0) + + return &i, nil } type roundState struct { @@ -327,7 +334,7 @@ func (i *instance) ReceiveAlarm() error { } func (i *instance) Describe() string { - return fmt.Sprintf("{%d}, round %d, phase %s", i.instanceID, i.round, i.phase) + return fmt.Sprintf("{%d}, round %d, phase %s", i.instanceID, i.getCurrentRound(), i.getCurrentPhase()) } // Processes a single message. @@ -350,11 +357,12 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { ErrValidationWrongBase, &msg.Vote.Value, i.input.Base()) } - if i.phase == TERMINATED_PHASE { + phase := i.getCurrentPhase() + if phase == TERMINATED_PHASE { return false, nil // No-op } // Ignore CONVERGE and PREPARE messages for prior rounds. - forPriorRound := msg.Vote.Round < i.round + forPriorRound := msg.Vote.Round < i.getCurrentRound() if (forPriorRound && msg.Vote.Phase == CONVERGE_PHASE) || (forPriorRound && msg.Vote.Phase == PREPARE_PHASE) { return false, nil @@ -363,7 +371,7 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { // Drop message that: // * belong to future rounds, beyond the configured max lookahead threshold, and // * carry no justification, i.e. are spammable. - beyondMaxLookaheadRounds := msg.Vote.Round > i.round+i.participant.maxLookaheadRounds + beyondMaxLookaheadRounds := msg.Vote.Round > i.getCurrentRound()+i.participant.maxLookaheadRounds if beyondMaxLookaheadRounds && isSpammable(msg) { return false, nil } @@ -378,7 +386,7 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value) // If the instance has surpassed QUALITY phase, update the candidates based // on possible quorum of input prefixes. - if i.phase != QUALITY_PHASE { + if phase != QUALITY_PHASE { return true, i.updateCandidatesFromQuality() } case CONVERGE_PHASE: @@ -399,12 +407,12 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { // to a new round. Late-arriving COMMITs can still (must) cause a local decision, // *in that round*. Try to complete the COMMIT phase for the round specified by // the message. - if i.phase != DECIDE_PHASE { + if phase != DECIDE_PHASE { return true, i.tryCommit(msg.Vote.Round) } case DECIDE_PHASE: i.decision.Receive(msg.Sender, msg.Vote.Value, msg.Signature) - if i.phase != DECIDE_PHASE { + if phase != DECIDE_PHASE { i.skipToDecide(msg.Vote.Value, msg.Justification) } default: @@ -435,7 +443,7 @@ func (i *instance) postReceive(roundsReceived ...uint64) { func (i *instance) shouldSkipToRound(round uint64, state *roundState) (ECChain, *Justification, bool) { // Check if the given round is ahead of current round and this instance is not in // DECIDE phase. - if round <= i.round || i.phase == DECIDE_PHASE { + if round <= i.getCurrentRound() || i.getCurrentPhase() == DECIDE_PHASE { return nil, nil, false } if !state.prepared.ReceivedFromWeakQuorum() { @@ -453,8 +461,9 @@ func (i *instance) shouldSkipToRound(round uint64, state *roundState) (ECChain, // Attempts to complete the current phase and round. func (i *instance) tryCurrentPhase() error { - i.log("try phase %s", i.phase) - switch i.phase { + phase := i.getCurrentPhase() + i.log("try phase %s", phase) + switch phase { case QUALITY_PHASE: return i.tryQuality() case CONVERGE_PHASE: @@ -462,26 +471,26 @@ func (i *instance) tryCurrentPhase() error { case PREPARE_PHASE: return i.tryPrepare() case COMMIT_PHASE: - return i.tryCommit(i.round) + return i.tryCommit(i.getCurrentRound()) case DECIDE_PHASE: return i.tryDecide() case TERMINATED_PHASE: return nil // No-op default: - return fmt.Errorf("unexpected phase %s", i.phase) + return fmt.Errorf("unexpected phase %s", phase) } } // Sends this node's QUALITY message and begins the QUALITY phase. func (i *instance) beginQuality() error { - if i.phase != INITIAL_PHASE { - return fmt.Errorf("cannot transition from %s to %s", i.phase, QUALITY_PHASE) + if phase := i.getCurrentPhase(); phase != INITIAL_PHASE { + return fmt.Errorf("cannot transition from %s to %s", phase, QUALITY_PHASE) } // Broadcast input value and wait to receive from others. - i.phase = QUALITY_PHASE + i.setCurrentPhase(QUALITY_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() - i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil) + i.broadcast(i.getCurrentRound(), QUALITY_PHASE, i.proposal, false, nil) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrQualityPhase)) metrics.currentPhase.Record(context.TODO(), int64(QUALITY_PHASE)) return nil @@ -489,8 +498,8 @@ func (i *instance) beginQuality() error { // Attempts to end the QUALITY phase and begin PREPARE based on current state. func (i *instance) tryQuality() error { - if i.phase != QUALITY_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, QUALITY_PHASE) + if phase := i.getCurrentPhase(); phase != QUALITY_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, QUALITY_PHASE) } // Wait either for a strong quorum that agree on our proposal, or for the timeout @@ -527,36 +536,38 @@ func (i *instance) updateCandidatesFromQuality() error { // beginConverge initiates CONVERGE_PHASE justified by the given justification. func (i *instance) beginConverge(justification *Justification) { - if justification.Vote.Round != i.round-1 { + round := i.getCurrentRound() + if justification.Vote.Round != round-1 { // For safety assert that the justification given belongs to the right round. panic("justification for which to begin converge does not belong to expected round") } - i.phase = CONVERGE_PHASE + i.setCurrentPhase(CONVERGE_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() // Notify the round's convergeState that the self participant has begun the // CONVERGE phase. Because, we cannot guarantee that the CONVERGE message // broadcasts are delivered to self synchronously. - i.getRound(i.round).converged.SetSelfValue(i.proposal, justification) + i.getRound(round).converged.SetSelfValue(i.proposal, justification) - i.broadcast(i.round, CONVERGE_PHASE, i.proposal, true, justification) + i.broadcast(round, CONVERGE_PHASE, i.proposal, true, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrConvergePhase)) metrics.currentPhase.Record(context.TODO(), int64(CONVERGE_PHASE)) } // Attempts to end the CONVERGE phase and begin PREPARE based on current state. func (i *instance) tryConverge() error { - if i.phase != CONVERGE_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, CONVERGE_PHASE) + if phase := i.getCurrentPhase(); phase != CONVERGE_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, CONVERGE_PHASE) } // The CONVERGE phase timeout doesn't wait to hear from >⅔ of power. timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout) if !timeoutExpired { return nil } - commitRoundState := i.getRound(i.round - 1).committed + round := i.getCurrentRound() + commitRoundState := i.getRound(round - 1).committed isValidConvergeValue := func(cv ConvergeValue) bool { // If it is in candidate set @@ -572,7 +583,7 @@ func (i *instance) tryConverge() error { return possibleDecision } - winner := i.getRound(i.round).converged.FindBestTicketProposal(isValidConvergeValue) + winner := i.getRound(round).converged.FindBestTicketProposal(isValidConvergeValue) if !winner.IsValid() { return fmt.Errorf("no values at CONVERGE") } @@ -594,22 +605,23 @@ func (i *instance) tryConverge() error { // Sends this node's PREPARE message and begins the PREPARE phase. func (i *instance) beginPrepare(justification *Justification) { // Broadcast preparation of value and wait for everyone to respond. - i.phase = PREPARE_PHASE + i.setCurrentPhase(PREPARE_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() - i.broadcast(i.round, PREPARE_PHASE, i.value, false, justification) + i.broadcast(i.getCurrentRound(), PREPARE_PHASE, i.value, false, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrPreparePhase)) metrics.currentPhase.Record(context.TODO(), int64(PREPARE_PHASE)) } // Attempts to end the PREPARE phase and begin COMMIT based on current state. func (i *instance) tryPrepare() error { - if i.phase != PREPARE_PHASE { - return fmt.Errorf("unexpected phase %s, expected %s", i.phase, PREPARE_PHASE) + + if phase := i.getCurrentPhase(); phase != PREPARE_PHASE { + return fmt.Errorf("unexpected phase %s, expected %s", phase, PREPARE_PHASE) } - prepared := i.getRound(i.round).prepared + prepared := i.getRound(i.getCurrentRound()).prepared proposalKey := i.proposal.Key() foundQuorum := prepared.HasStrongQuorumFor(proposalKey) timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout) @@ -626,7 +638,7 @@ func (i *instance) tryPrepare() error { i.beginCommit() } else if timedOut { if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } @@ -634,7 +646,8 @@ func (i *instance) tryPrepare() error { } func (i *instance) beginCommit() { - i.phase = COMMIT_PHASE + round := i.getCurrentRound() + i.setCurrentPhase(COMMIT_PHASE) i.phaseTimeout = i.alarmAfterSynchrony() i.resetRebroadcastParams() @@ -643,15 +656,15 @@ func (i *instance) beginCommit() { // No justification is required for committing bottom. var justification *Justification if !i.value.IsZero() { - if quorum, ok := i.getRound(i.round).prepared.FindStrongQuorumFor(i.value.Key()); ok { + if quorum, ok := i.getRound(round).prepared.FindStrongQuorumFor(i.value.Key()); ok { // Found a strong quorum of PREPARE, build the justification for it. - justification = i.buildJustification(quorum, i.round, PREPARE_PHASE, i.value) + justification = i.buildJustification(quorum, round, PREPARE_PHASE, i.value) } else { panic("beginCommit with no strong quorum for non-bottom value") } } - i.broadcast(i.round, COMMIT_PHASE, i.value, false, justification) + i.broadcast(round, COMMIT_PHASE, i.value, false, justification) metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrCommitPhase)) metrics.currentPhase.Record(context.TODO(), int64(COMMIT_PHASE)) } @@ -673,7 +686,7 @@ func (i *instance) tryCommit(round uint64) error { // influencing that decision against their interest, just accepting it. i.value = quorumValue i.beginDecide(round) - case i.round != round, i.phase != COMMIT_PHASE: + case i.getCurrentRound() != round, i.getCurrentPhase() != COMMIT_PHASE: // We are at a phase other than COMMIT or round does not match the current one; // nothing else to do. case foundStrongQuorum: @@ -703,14 +716,14 @@ func (i *instance) tryCommit(round uint64) error { case timedOut: // The phase has timed out. Attempt to re-broadcast messages. if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } return nil } func (i *instance) beginDecide(round uint64) { - i.phase = DECIDE_PHASE + i.setCurrentPhase(DECIDE_PHASE) i.resetRebroadcastParams() var justification *Justification // Value cannot be empty here. @@ -735,7 +748,7 @@ func (i *instance) beginDecide(round uint64) { // without waiting for a strong quorum of COMMITs in any round. // The provided justification must justify the value being decided. func (i *instance) skipToDecide(value ECChain, justification *Justification) { - i.phase = DECIDE_PHASE + i.setCurrentPhase(DECIDE_PHASE) i.proposal = value i.value = i.proposal i.resetRebroadcastParams() @@ -756,13 +769,46 @@ func (i *instance) tryDecide() error { } } else { if err := i.tryRebroadcast(); err != nil { - return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.phase, err) + return fmt.Errorf("failed to rebroadcast at %s phase: %w", i.getCurrentPhase(), err) } } return nil } +func (i *instance) getCurrentRound() uint64 { + i.roundPhaseMutex.RLock() + defer i.roundPhaseMutex.RUnlock() + return i.round +} + +func (i *instance) setCurrentRound(r uint64) { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.round = r +} + +// IncrementAndGetCurrentRound increments the current round by one amount and returns +// the resulting round. +func (i *instance) IncrementAndGetCurrentRound() uint64 { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.round += 1 + return i.round +} + +func (i *instance) getCurrentPhase() Phase { + i.roundPhaseMutex.RLock() + defer i.roundPhaseMutex.RUnlock() + return i.phase +} + +func (i *instance) setCurrentPhase(p Phase) { + i.roundPhaseMutex.Lock() + defer i.roundPhaseMutex.Unlock() + i.phase = p +} + func (i *instance) getRound(r uint64) *roundState { round, ok := i.rounds[r] if !ok { @@ -773,18 +819,18 @@ func (i *instance) getRound(r uint64) *roundState { } func (i *instance) beginNextRound() { - i.log("moving to round %d with %s", i.round+1, i.proposal.String()) - i.round += 1 - metrics.currentRound.Record(context.TODO(), int64(i.round)) + nextRound := i.IncrementAndGetCurrentRound() + i.log("moving to round %d with %s", nextRound, i.proposal.String()) + metrics.currentRound.Record(context.TODO(), int64(nextRound)) - prevRound := i.getRound(i.round - 1) + prevRound := i.getRound(nextRound - 1) // Proposal was updated at the end of COMMIT phase to be some value for which // this node received a COMMIT message (bearing justification), if there were any. // If there were none, there must have been a strong quorum for bottom instead. var justification *Justification if quorum, ok := prevRound.committed.FindStrongQuorumFor(""); ok { // Build justification for strong quorum of COMMITs for bottom in the previous round. - justification = i.buildJustification(quorum, i.round-1, COMMIT_PHASE, ECChain{}) + justification = i.buildJustification(quorum, nextRound-1, COMMIT_PHASE, ECChain{}) } else { // Extract the justification received from some participant (possibly this node itself). justification, ok = prevRound.committed.receivedJustification[i.proposal.Key()] @@ -800,13 +846,13 @@ func (i *instance) beginNextRound() { // // See shouldSkipToRound. func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justification) { - i.log("skipping from round %d to round %d with %s", i.round, round, i.proposal.String()) - i.round = round - metrics.currentRound.Record(context.TODO(), int64(i.round)) + i.log("skipping from round %d to round %d with %s", i.getCurrentRound(), round, i.proposal.String()) + i.setCurrentRound(round) + metrics.currentRound.Record(context.TODO(), int64(round)) metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToRound)) if justification.Vote.Phase == PREPARE_PHASE { - i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, i.round) + i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, round) i.addCandidate(chain) i.proposal = chain } @@ -838,18 +884,19 @@ func (i *instance) addCandidate(c ECChain) bool { } func (i *instance) terminate(decision *Justification) { - i.log("✅ terminated %s during round %d", &i.value, i.round) - i.phase = TERMINATED_PHASE + round := i.getCurrentRound() + i.log("✅ terminated %s during round %d", &i.value, round) + i.setCurrentPhase(TERMINATED_PHASE) i.value = decision.Vote.Value i.terminationValue = decision i.resetRebroadcastParams() metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrTerminatedPhase)) - metrics.roundHistogram.Record(context.TODO(), int64(i.round)) + metrics.roundHistogram.Record(context.TODO(), int64(round)) metrics.currentPhase.Record(context.TODO(), int64(TERMINATED_PHASE)) } func (i *instance) terminated() bool { - return i.phase == TERMINATED_PHASE + return i.getCurrentPhase() == TERMINATED_PHASE } func (i *instance) broadcast(round uint64, phase Phase, value ECChain, createTicket bool, justification *Justification) { @@ -895,7 +942,7 @@ func (i *instance) tryRebroadcast() error { // not have any phase timeout and may be too far in the past. // * Otherwise, use the phase timeout. var rebroadcastTimeoutOffset time.Time - if i.phase == DECIDE_PHASE { + if i.getCurrentPhase() == DECIDE_PHASE { rebroadcastTimeoutOffset = i.participant.host.Time() } else { rebroadcastTimeoutOffset = i.phaseTimeout @@ -978,7 +1025,7 @@ func (i *instance) rebroadcast() error { // Returns the absolute time at which the alarm will fire. func (i *instance) alarmAfterSynchrony() time.Time { delta := time.Duration(float64(i.participant.delta) * - math.Pow(i.participant.deltaBackOffExponent, float64(i.round))) + math.Pow(i.participant.deltaBackOffExponent, float64(i.getCurrentRound()))) timeout := i.participant.host.Time().Add(2 * delta) i.participant.host.SetAlarm(timeout) return timeout @@ -1007,7 +1054,7 @@ func (i *instance) log(format string, args ...any) { if i.tracer != nil { msg := fmt.Sprintf(format, args...) i.tracer.Log("{%d}: %s (round %d, phase %s, proposal %s, value %s)", i.instanceID, msg, - i.round, i.phase, &i.proposal, &i.value) + i.getCurrentRound(), i.getCurrentPhase(), &i.proposal, &i.value) } } diff --git a/gpbft/gpbft_test.go b/gpbft/gpbft_test.go index 4dd9d5f1..506eb18d 100644 --- a/gpbft/gpbft_test.go +++ b/gpbft/gpbft_test.go @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) { } driver.RequireDeliverMessage(newQuality) driver.RequireDeliverMessage(newDecide0) - driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it + // No quorum of decides, should still accept it but be considered not relevant + driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant") driver.RequireDeliverMessage(newDecide1) // Once we've received two decides, we should reject messages from the "new" instance. diff --git a/gpbft/participant.go b/gpbft/participant.go index 07a1d8ee..3adca045 100644 --- a/gpbft/participant.go +++ b/gpbft/participant.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "sort" "sync" + "sync/atomic" "time" "github.com/filecoin-project/go-f3/internal/caching" @@ -33,7 +34,7 @@ type Participant struct { // if both are to be taken. apiMutex sync.Mutex // Mutex protecting currentInstance and committees cache for concurrent validation. - // Note that not every access need be protected: + // Note that not every access needs to be protected: // - writes to currentInstance, and reads from it during validation, // - reads from or writes to committees (which is written during validation). instanceMutex sync.Mutex @@ -41,9 +42,8 @@ type Participant struct { currentInstance uint64 // Cache of committees for the current or future instances. committees map[uint64]*committee - - // Current Granite instance. - gpbft *instance + // gpbft represents the current GPBFT instance. + gpbft atomic.Pointer[instance] // Messages queued for future instances. mqueue *messageQueue // The round number during which the last instance was terminated. @@ -127,10 +127,11 @@ func (p *Participant) CurrentRound() uint64 { panic("concurrent API method invocation") } defer p.apiMutex.Unlock() - if p.gpbft == nil { + gpbft := p.gpbft.Load() + if gpbft == nil { return 0 } - return p.gpbft.round + return gpbft.getCurrentRound() } func (p *Participant) CurrentInstance() uint64 { @@ -174,6 +175,9 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er } else if alreadyValidated, err := p.validationCache.Contains(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil { log.Errorw("failed to check already validated messages", "err", err) } else if alreadyValidated { + if !p.isMessageRelevant(msg) { + return nil, ErrValidationNotRelevant + } metrics.validationCache.Add(context.TODO(), 1, metric.WithAttributes(attrCacheHit, attrCacheKindMessage)) return &validatedMessage{msg: msg}, nil } else { @@ -243,6 +247,11 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er return nil, fmt.Errorf("message %v has unexpected justification: %w", msg, ErrValidationInvalid) } + if !p.isMessageRelevant(msg) { + // Message is valid but no longer relevant in the context of progressing GPBFT. + return nil, ErrValidationNotRelevant + } + if cacheMessage { if _, err := p.validationCache.Add(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil { log.Warnw("failed to cache to already validated message", "err", err) @@ -366,6 +375,47 @@ func (p *Participant) validateJustification(msg *GMessage, comt *committee) erro return nil } +// isMessageRelevant determines whether a given message is useful in terms of +// aiding the progress of an instance to the best of our knowledge. +func (p *Participant) isMessageRelevant(msg *GMessage) bool { + var currentRound uint64 + var currentPhase Phase + if gpbft := p.gpbft.Load(); gpbft != nil { + currentRound = gpbft.getCurrentRound() + currentPhase = gpbft.getCurrentPhase() + } + + p.instanceMutex.Lock() + defer p.instanceMutex.Unlock() + // Relevant messages are: + // 1. DECIDE messages from previous instance, + // 2. some messages from current instance (pending further checks), or + // 3. any message from future instance. + switch msg.Vote.Instance { + case p.currentInstance - 1: + return msg.Vote.Phase == DECIDE_PHASE + case p.currentInstance: + // Message is from the current round; proceed to check other conditions. + default: + return msg.Vote.Instance > p.currentInstance + } + + // When we are at DECIDE phase the only relevant messages are DECIDE messages. + // Otherwise, relevant messages are either QUALITY, DECIDE, messages from + // previous round, current round or future rounds. + switch { + case currentPhase == DECIDE_PHASE: + return msg.Vote.Phase == DECIDE_PHASE + case msg.Vote.Phase == QUALITY_PHASE, + msg.Vote.Phase == DECIDE_PHASE, + msg.Vote.Round == currentRound-1, // Use explicit case for previous round to avoid unt64 overflow. + msg.Vote.Round >= currentRound: + return true + default: + return false + } +} + // Receives a validated Granite message from some other participant. func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) { if !p.apiMutex.TryLock() { @@ -390,8 +440,8 @@ func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) { } // If the message is for the current instance, deliver immediately. - if p.gpbft != nil && msg.Vote.Instance == p.currentInstance { - if err := p.gpbft.Receive(msg); err != nil { + if gpbft := p.gpbft.Load(); gpbft != nil && msg.Vote.Instance == p.currentInstance { + if err := gpbft.Receive(msg); err != nil { return fmt.Errorf("%w: %w", ErrReceivedInternalError, err) } p.handleDecision() @@ -416,11 +466,11 @@ func (p *Participant) ReceiveAlarm() (err error) { } }() - if p.gpbft == nil { + if gpbft := p.gpbft.Load(); gpbft == nil { // The alarm is for fetching the next chain and beginning a new instance. - return p.beginInstance() - } - if err := p.gpbft.ReceiveAlarm(); err != nil { + err := p.beginInstance() + return err + } else if err := gpbft.ReceiveAlarm(); err != nil { return fmt.Errorf("failed receiving alarm: %w", err) } p.handleDecision() @@ -445,20 +495,22 @@ func (p *Participant) beginInstance() error { if err != nil { return err } - if p.gpbft, err = newInstance(p, p.currentInstance, chain, data, *comt.power, comt.beacon); err != nil { + gpbft, err := newInstance(p, p.currentInstance, chain, data, *comt.power, comt.beacon) + if err != nil { return fmt.Errorf("failed creating new gpbft instance: %w", err) } - if err := p.gpbft.Start(); err != nil { + p.gpbft.Store(gpbft) + if err := gpbft.Start(); err != nil { return fmt.Errorf("failed starting gpbft instance: %w", err) } // Deliver any queued messages for the new instance. - queued := p.mqueue.Drain(p.gpbft.instanceID) + queued := p.mqueue.Drain(gpbft.instanceID) if p.tracingEnabled() { for _, msg := range queued { - p.trace("Delivering queued {%d} ← P%d: %v", p.gpbft.instanceID, msg.Sender, msg) + p.trace("Delivering queued {%d} ← P%d: %v", gpbft.instanceID, msg.Sender, msg) } } - if err := p.gpbft.ReceiveMany(queued); err != nil { + if err := gpbft.ReceiveMany(queued); err != nil { return fmt.Errorf("delivering queued messages: %w", err) } p.handleDecision() @@ -513,12 +565,12 @@ func (p *Participant) handleDecision() { func (p *Participant) finishCurrentInstance() *Justification { var decision *Justification - if p.gpbft != nil { - decision = p.gpbft.terminationValue - p.terminatedDuringRound = p.gpbft.round - p.validationCache.RemoveGroup(p.gpbft.instanceID) + if gpbft := p.gpbft.Load(); gpbft != nil { + decision = gpbft.terminationValue + p.terminatedDuringRound = gpbft.getCurrentRound() + p.validationCache.RemoveGroup(gpbft.instanceID) } - p.gpbft = nil + p.gpbft.Store(nil) return decision } @@ -542,14 +594,16 @@ func (p *Participant) beginNextInstance(nextInstance uint64) { } func (p *Participant) terminated() bool { - return p.gpbft != nil && p.gpbft.phase == TERMINATED_PHASE + gpbft := p.gpbft.Load() + return gpbft != nil && gpbft.getCurrentPhase() == TERMINATED_PHASE } func (p *Participant) Describe() string { - if p.gpbft == nil { + gpbft := p.gpbft.Load() + if gpbft == nil { return "nil" } - return p.gpbft.Describe() + return gpbft.Describe() } func (p *Participant) tracingEnabled() bool { diff --git a/host.go b/host.go index 1d5bdf87..ab1bf5f3 100644 --- a/host.go +++ b/host.go @@ -333,6 +333,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg case errors.Is(err, gpbft.ErrValidationTooOld): // we got the message too late return pubsub.ValidationIgnore + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // The message is valid but will not effectively aid progress of GPBFT. Ignore it + // to stop its further propagation across the network. + return pubsub.ValidationIgnore case errors.Is(err, gpbft.ErrValidationNoCommittee): log.Debugf("commitee error during validation: %+v", err) return pubsub.ValidationIgnore diff --git a/sim/network.go b/sim/network.go index 618e0faf..c1a50799 100644 --- a/sim/network.go +++ b/sim/network.go @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) { ) } +// HasMoreTicks checks whether there are any messages left to propagate across +// the network participants. See Tick. +func (n *Network) HasMoreTicks() bool { + return n.queue.Len() > 0 +} + // Tick disseminates one message among participants and returns whether there are // any more messages to process. -func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { +func (n *Network) Tick(adv *adversary.Adversary) error { msg := n.queue.Remove() n.clock = msg.deliverAt receiver, found := n.participants[msg.dest] if !found { - return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) + return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) } switch payload := msg.payload.(type) { case string: if payload != "ALARM" { - return false, fmt.Errorf("unknwon string message payload: %s", payload) + return fmt.Errorf("unknwon string message payload: %s", payload) } n.log(TraceRecvd, "P%d %s", msg.source, payload) if err := receiver.ReceiveAlarm(); err != nil { - return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) } case gpbft.GMessage: // If GST has not elapsed, check if adversary allows the propagation of message. @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { } else if !adv.AllowMessage(msg.source, msg.dest, payload) { // GST has not passed and adversary blocks the delivery of message; proceed to // next tick. - return n.queue.Len() > 0, nil + return nil } } validated, err := receiver.ValidateMessage(&payload) @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { // Silently drop old messages. break } - return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) } n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload) if err := receiver.ReceiveMessage(validated); err != nil { - return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) } default: - return false, fmt.Errorf("unknown message payload: %v", payload) + return fmt.Errorf("unknown message payload: %v", payload) } - return n.queue.Len() > 0, nil + return nil } func (n *Network) log(level int, format string, args ...interface{}) { diff --git a/sim/sim.go b/sim/sim.go index 80516435..6e9b0d4a 100644 --- a/sim/sim.go +++ b/sim/sim.go @@ -1,6 +1,7 @@ package sim import ( + "errors" "fmt" "strings" @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { } // Run until there are no more messages, meaning termination or deadlock. - moreTicks := true - for moreTicks { + for s.network.HasMoreTicks() { if err := s.ec.Err(); err != nil { return fmt.Errorf("error in decision: %w", err) } @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { break } } - var err error - moreTicks, err = s.network.Tick(s.adversary) - if err != nil { + + switch err := s.network.Tick(s.adversary); { + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // Ignore error signalling valid messages that are no longer useful for the + // progress of GPBFT. This can occur in normal operation depending on the order + // of delivered messages. In production, deployment this error is used to signal + // that the message does not need to be propagated among participants. In + // simulation, we simply ignore it. + case err != nil: return fmt.Errorf("error performing simulation phase: %w", err) } } diff --git a/test/withhold_test.go b/test/withhold_test.go index 709944ac..fe20ebed 100644 --- a/test/withhold_test.go +++ b/test/withhold_test.go @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) { } // The adversary could convince the victim to decide a, so all must decide a. require.NoError(t, err) - decision := sm.GetInstance(0).GetDecision(0) - require.NotNil(t, decision) - require.Equal(t, &a, decision) + for _, victim := range victims { + decision := sm.GetInstance(0).GetDecision(victim) + require.NotNil(t, decision) + require.Equal(t, &a, decision) + } }) } }