Skip to content

Commit

Permalink
Drop messages that are no longer useful for GPBFT progression
Browse files Browse the repository at this point in the history
As a GPBFT instance progresses some messages become irrelevant, in that
they do not effectively aid the progress of the instance for
participants. Instead, GPBFT offers other built-in mechanisms to aid
progress of lagging participants such as selective
rebroadcast, propagation of DECIDE messages from the previous instance
and finality certificate exchange.

The changes here introduce a dedicated error type returned as part of
message validation to signal that although a message is valid it is no
longer relevant. This error type is then checked by pubsub to avoid
further propagation of those messages.

This reduces the redundant pubsub traffic for the network participants.

Fixes #583
  • Loading branch information
masih committed Sep 23, 2024
1 parent 9fad427 commit 7975dfe
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 23 deletions.
3 changes: 3 additions & 0 deletions gpbft/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
//
// See SupplementalData.
ErrValidationWrongSupplement = newValidationError("unexpected supplemental data")
// ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance,
// and is not worth propagating to others.
ErrValidationNotRelevant = newValidationError("message is valid but not relevant")

// ErrReceivedWrongInstance signals that a message is received with mismatching instance ID.
ErrReceivedWrongInstance = errors.New("received message for wrong instance")
Expand Down
1 change: 1 addition & 0 deletions gpbft/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) {
{name: "ErrValidationInvalid", subject: ErrValidationInvalid},
{name: "ErrValidationWrongBase", subject: ErrValidationWrongBase},
{name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement},
{name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) {
}
driver.RequireDeliverMessage(newQuality)
driver.RequireDeliverMessage(newDecide0)
driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it
// No quorum of decides, should still accept it but be considered not relevant
driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant")
driver.RequireDeliverMessage(newDecide1)

// Once we've received two decides, we should reject messages from the "new" instance.
Expand Down
2 changes: 1 addition & 1 deletion gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Participant struct {
// if both are to be taken.
apiMutex sync.Mutex
// Mutex protecting currentInstance and committees cache for concurrent validation.
// Note that not every access need be protected:
// Note that not every access needs to be protected:
// - writes to currentInstance, and reads from it during validation,
// - reads from or writes to committees (which is written during validation).
instanceMutex sync.Mutex
Expand Down
26 changes: 22 additions & 4 deletions gpbft/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,33 @@ func (v *cachingValidator) ValidateMessage(msg *GMessage) (valid ValidatedMessag
}

// Infer whether to proceed validating the message relative to the current instance.
switch currentInstance := v.progress.Load().id; {
case msg.Vote.Instance >= currentInstance+v.committeeLookback:
switch current := v.progress.Load(); {
case msg.Vote.Instance >= current.id+v.committeeLookback:
// Message is beyond current + committee lookback.
return nil, ErrValidationNoCommittee
case msg.Vote.Instance >= currentInstance,
msg.Vote.Instance == currentInstance-1 && msg.Vote.Phase == DECIDE_PHASE:
case msg.Vote.Instance > current.id,
msg.Vote.Instance == current.id-1 && msg.Vote.Phase == DECIDE_PHASE:
// Only proceed to validate the message if it:
// * belongs to an instance within the range of current to current + committee lookback, or
// * is a DECIDE message belonging to previous instance.
case msg.Vote.Instance == current.id:
// Message belongs to current instance. Only validate messages that are relevant,
// i.e.:
// * When current instance is at DECIDE phase only validate DECIDE messages.
// * Otherwise, only validate messages that would be rebroadcasted, i.e. QUALITY,
// DECIDE, messages from previous round, and messages from current round.
// Anything else is not relevant.
switch {
case current.phase == DECIDE_PHASE && msg.Vote.Phase != DECIDE_PHASE:
return nil, ErrValidationNotRelevant
case msg.Vote.Phase == QUALITY_PHASE,
msg.Vote.Phase == DECIDE_PHASE,
msg.Vote.Round == current.round-1, // Use explicit case for previous round to avoid unt64 overflow.
msg.Vote.Round >= current.round:
// Message is relevant. Progress to further validation.
default:
return nil, ErrValidationNotRelevant
}
default:
// Message belongs to an instance older than the previous instance.
return nil, ErrValidationTooOld
Expand Down
4 changes: 4 additions & 0 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
case errors.Is(err, gpbft.ErrValidationTooOld):
// we got the message too late
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// The message is valid but will not effectively aid progress of GPBFT. Ignore it
// to stop its further propagation across the network.
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugf("commitee error during validation: %+v", err)
return pubsub.ValidationIgnore
Expand Down
24 changes: 15 additions & 9 deletions sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
)
}

// HasMoreTicks checks whether there are any messages left to propagate across
// the network participants. See Tick.
func (n *Network) HasMoreTicks() bool {
return n.queue.Len() > 0
}

// Tick disseminates one message among participants and returns whether there are
// any more messages to process.
func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
func (n *Network) Tick(adv *adversary.Adversary) error {
msg := n.queue.Remove()
n.clock = msg.deliverAt

receiver, found := n.participants[msg.dest]
if !found {
return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
}
switch payload := msg.payload.(type) {
case string:
if payload != "ALARM" {
return false, fmt.Errorf("unknwon string message payload: %s", payload)
return fmt.Errorf("unknwon string message payload: %s", payload)
}
n.log(TraceRecvd, "P%d %s", msg.source, payload)
if err := receiver.ReceiveAlarm(); err != nil {
return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
}
case gpbft.GMessage:
// If GST has not elapsed, check if adversary allows the propagation of message.
Expand All @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
} else if !adv.AllowMessage(msg.source, msg.dest, payload) {
// GST has not passed and adversary blocks the delivery of message; proceed to
// next tick.
return n.queue.Len() > 0, nil
return nil
}
}
validated, err := receiver.ValidateMessage(&payload)
Expand All @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
// Silently drop old messages.
break
}
return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
}
n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload)
if err := receiver.ReceiveMessage(validated); err != nil {
return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
}
default:
return false, fmt.Errorf("unknown message payload: %v", payload)
return fmt.Errorf("unknown message payload: %v", payload)
}
return n.queue.Len() > 0, nil
return nil
}

func (n *Network) log(level int, format string, args ...interface{}) {
Expand Down
16 changes: 11 additions & 5 deletions sim/sim.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sim

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
}

// Run until there are no more messages, meaning termination or deadlock.
moreTicks := true
for moreTicks {
for s.network.HasMoreTicks() {
if err := s.ec.Err(); err != nil {
return fmt.Errorf("error in decision: %w", err)
}
Expand Down Expand Up @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
break
}
}
var err error
moreTicks, err = s.network.Tick(s.adversary)
if err != nil {

switch err := s.network.Tick(s.adversary); {
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// Ignore error signalling valid messages that are no longer useful for the
// progress of GPBFT. This can occur in normal operation depending on the order
// of delivered messages. In production, deployment this error is used to signal
// that the message does not need to be propagated among participants. In
// simulation, we simply ignore it.
case err != nil:
return fmt.Errorf("error performing simulation phase: %w", err)
}
}
Expand Down
8 changes: 5 additions & 3 deletions test/withhold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) {
}
// The adversary could convince the victim to decide a, so all must decide a.
require.NoError(t, err)
decision := sm.GetInstance(0).GetDecision(0)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
for _, victim := range victims {
decision := sm.GetInstance(0).GetDecision(victim)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
}
})
}
}

0 comments on commit 7975dfe

Please sign in to comment.