From 7975dfed961d16a32ffa1970d348bfe7fe139be8 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Fri, 20 Sep 2024 15:34:28 +0200 Subject: [PATCH] Drop messages that are no longer useful for GPBFT progression As a GPBFT instance progresses some messages become irrelevant, in that they do not effectively aid the progress of the instance for participants. Instead, GPBFT offers other built-in mechanisms to aid progress of lagging participants such as selective rebroadcast, propagation of DECIDE messages from the previous instance and finality certificate exchange. The changes here introduce a dedicated error type returned as part of message validation to signal that although a message is valid it is no longer relevant. This error type is then checked by pubsub to avoid further propagation of those messages. This reduces the redundant pubsub traffic for the network participants. Fixes #583 --- gpbft/errors.go | 3 +++ gpbft/errors_test.go | 1 + gpbft/gpbft_test.go | 3 ++- gpbft/participant.go | 2 +- gpbft/validator.go | 26 ++++++++++++++++++++++---- host.go | 4 ++++ sim/network.go | 24 +++++++++++++++--------- sim/sim.go | 16 +++++++++++----- test/withhold_test.go | 8 +++++--- 9 files changed, 64 insertions(+), 23 deletions(-) diff --git a/gpbft/errors.go b/gpbft/errors.go index e0ccb923..19aa99e4 100644 --- a/gpbft/errors.go +++ b/gpbft/errors.go @@ -25,6 +25,9 @@ var ( // // See SupplementalData. ErrValidationWrongSupplement = newValidationError("unexpected supplemental data") + // ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance, + // and is not worth propagating to others. + ErrValidationNotRelevant = newValidationError("message is valid but not relevant") // ErrReceivedWrongInstance signals that a message is received with mismatching instance ID. ErrReceivedWrongInstance = errors.New("received message for wrong instance") diff --git a/gpbft/errors_test.go b/gpbft/errors_test.go index fc8ff105..d37ea582 100644 --- a/gpbft/errors_test.go +++ b/gpbft/errors_test.go @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) { {name: "ErrValidationInvalid", subject: ErrValidationInvalid}, {name: "ErrValidationWrongBase", subject: ErrValidationWrongBase}, {name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement}, + {name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/gpbft/gpbft_test.go b/gpbft/gpbft_test.go index 9fdd0458..f1fe2e8b 100644 --- a/gpbft/gpbft_test.go +++ b/gpbft/gpbft_test.go @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) { } driver.RequireDeliverMessage(newQuality) driver.RequireDeliverMessage(newDecide0) - driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it + // No quorum of decides, should still accept it but be considered not relevant + driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant") driver.RequireDeliverMessage(newDecide1) // Once we've received two decides, we should reject messages from the "new" instance. diff --git a/gpbft/participant.go b/gpbft/participant.go index 2433cfe0..afd45428 100644 --- a/gpbft/participant.go +++ b/gpbft/participant.go @@ -30,7 +30,7 @@ type Participant struct { // if both are to be taken. apiMutex sync.Mutex // Mutex protecting currentInstance and committees cache for concurrent validation. - // Note that not every access need be protected: + // Note that not every access needs to be protected: // - writes to currentInstance, and reads from it during validation, // - reads from or writes to committees (which is written during validation). instanceMutex sync.Mutex diff --git a/gpbft/validator.go b/gpbft/validator.go index 07d22156..52787e7c 100644 --- a/gpbft/validator.go +++ b/gpbft/validator.go @@ -77,15 +77,33 @@ func (v *cachingValidator) ValidateMessage(msg *GMessage) (valid ValidatedMessag } // Infer whether to proceed validating the message relative to the current instance. - switch currentInstance := v.progress.Load().id; { - case msg.Vote.Instance >= currentInstance+v.committeeLookback: + switch current := v.progress.Load(); { + case msg.Vote.Instance >= current.id+v.committeeLookback: // Message is beyond current + committee lookback. return nil, ErrValidationNoCommittee - case msg.Vote.Instance >= currentInstance, - msg.Vote.Instance == currentInstance-1 && msg.Vote.Phase == DECIDE_PHASE: + case msg.Vote.Instance > current.id, + msg.Vote.Instance == current.id-1 && msg.Vote.Phase == DECIDE_PHASE: // Only proceed to validate the message if it: // * belongs to an instance within the range of current to current + committee lookback, or // * is a DECIDE message belonging to previous instance. + case msg.Vote.Instance == current.id: + // Message belongs to current instance. Only validate messages that are relevant, + // i.e.: + // * When current instance is at DECIDE phase only validate DECIDE messages. + // * Otherwise, only validate messages that would be rebroadcasted, i.e. QUALITY, + // DECIDE, messages from previous round, and messages from current round. + // Anything else is not relevant. + switch { + case current.phase == DECIDE_PHASE && msg.Vote.Phase != DECIDE_PHASE: + return nil, ErrValidationNotRelevant + case msg.Vote.Phase == QUALITY_PHASE, + msg.Vote.Phase == DECIDE_PHASE, + msg.Vote.Round == current.round-1, // Use explicit case for previous round to avoid unt64 overflow. + msg.Vote.Round >= current.round: + // Message is relevant. Progress to further validation. + default: + return nil, ErrValidationNotRelevant + } default: // Message belongs to an instance older than the previous instance. return nil, ErrValidationTooOld diff --git a/host.go b/host.go index e24c29d3..a7961f8d 100644 --- a/host.go +++ b/host.go @@ -335,6 +335,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg case errors.Is(err, gpbft.ErrValidationTooOld): // we got the message too late return pubsub.ValidationIgnore + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // The message is valid but will not effectively aid progress of GPBFT. Ignore it + // to stop its further propagation across the network. + return pubsub.ValidationIgnore case errors.Is(err, gpbft.ErrValidationNoCommittee): log.Debugf("commitee error during validation: %+v", err) return pubsub.ValidationIgnore diff --git a/sim/network.go b/sim/network.go index 618e0faf..c1a50799 100644 --- a/sim/network.go +++ b/sim/network.go @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) { ) } +// HasMoreTicks checks whether there are any messages left to propagate across +// the network participants. See Tick. +func (n *Network) HasMoreTicks() bool { + return n.queue.Len() > 0 +} + // Tick disseminates one message among participants and returns whether there are // any more messages to process. -func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { +func (n *Network) Tick(adv *adversary.Adversary) error { msg := n.queue.Remove() n.clock = msg.deliverAt receiver, found := n.participants[msg.dest] if !found { - return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) + return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest) } switch payload := msg.payload.(type) { case string: if payload != "ALARM" { - return false, fmt.Errorf("unknwon string message payload: %s", payload) + return fmt.Errorf("unknwon string message payload: %s", payload) } n.log(TraceRecvd, "P%d %s", msg.source, payload) if err := receiver.ReceiveAlarm(); err != nil { - return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err) } case gpbft.GMessage: // If GST has not elapsed, check if adversary allows the propagation of message. @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { } else if !adv.AllowMessage(msg.source, msg.dest, payload) { // GST has not passed and adversary blocks the delivery of message; proceed to // next tick. - return n.queue.Len() > 0, nil + return nil } } validated, err := receiver.ValidateMessage(&payload) @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) { // Silently drop old messages. break } - return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err) } n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload) if err := receiver.ReceiveMessage(validated); err != nil { - return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) + return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err) } default: - return false, fmt.Errorf("unknown message payload: %v", payload) + return fmt.Errorf("unknown message payload: %v", payload) } - return n.queue.Len() > 0, nil + return nil } func (n *Network) log(level int, format string, args ...interface{}) { diff --git a/sim/sim.go b/sim/sim.go index 8faa9724..9c8d5ecf 100644 --- a/sim/sim.go +++ b/sim/sim.go @@ -1,6 +1,7 @@ package sim import ( + "errors" "fmt" "strings" @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { } // Run until there are no more messages, meaning termination or deadlock. - moreTicks := true - for moreTicks { + for s.network.HasMoreTicks() { if err := s.ec.Err(); err != nil { return fmt.Errorf("error in decision: %w", err) } @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error { break } } - var err error - moreTicks, err = s.network.Tick(s.adversary) - if err != nil { + + switch err := s.network.Tick(s.adversary); { + case errors.Is(err, gpbft.ErrValidationNotRelevant): + // Ignore error signalling valid messages that are no longer useful for the + // progress of GPBFT. This can occur in normal operation depending on the order + // of delivered messages. In production, deployment this error is used to signal + // that the message does not need to be propagated among participants. In + // simulation, we simply ignore it. + case err != nil: return fmt.Errorf("error performing simulation phase: %w", err) } } diff --git a/test/withhold_test.go b/test/withhold_test.go index 709944ac..fe20ebed 100644 --- a/test/withhold_test.go +++ b/test/withhold_test.go @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) { } // The adversary could convince the victim to decide a, so all must decide a. require.NoError(t, err) - decision := sm.GetInstance(0).GetDecision(0) - require.NotNil(t, decision) - require.Equal(t, &a, decision) + for _, victim := range victims { + decision := sm.GetInstance(0).GetDecision(victim) + require.NotNil(t, decision) + require.Equal(t, &a, decision) + } }) } }