Skip to content

Commit

Permalink
Drop messages that are no longer useful for GPBFT progression
Browse files Browse the repository at this point in the history
As a GPBFT instance progresses some messages become irrelevant, in that
they do not effectively aid the progress of the instance for
participants. Instead, GPBFT offers other built-in mechanisms to aid
progress of lagging participants such as selective
rebroadcast, propagation of DECIDE messages from the previous instance
and finality certificate exchange.

The changes here introduce a dedicated error type returned as part of
message validation to signal that although a message is valid it is no
longer relevant. This error type is then checked by pubsub to avoid
further propagation of those messages.

This reduces the redundant pubsub traffic for the network participants.

Fixes #583
  • Loading branch information
masih committed Sep 20, 2024
1 parent b802002 commit 76bc018
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 18 deletions.
3 changes: 3 additions & 0 deletions gpbft/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
//
// See SupplementalData.
ErrValidationWrongSupplement = newValidationError("unexpected supplemental data")
// ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance,
// and is not worth propagating to others.
ErrValidationNotRelevant = newValidationError("message is valid but not relevant")

// ErrReceivedWrongInstance signals that a message is received with mismatching instance ID.
ErrReceivedWrongInstance = errors.New("received message for wrong instance")
Expand Down
1 change: 1 addition & 0 deletions gpbft/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) {
{name: "ErrValidationInvalid", subject: ErrValidationInvalid},
{name: "ErrValidationWrongBase", subject: ErrValidationWrongBase},
{name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement},
{name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) {
}
driver.RequireDeliverMessage(newQuality)
driver.RequireDeliverMessage(newDecide0)
driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it
// No quorum of decides, should still accept it but be considered not relevant
driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant")
driver.RequireDeliverMessage(newDecide1)

// Once we've received two decides, we should reject messages from the "new" instance.
Expand Down
49 changes: 49 additions & 0 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
} else if alreadyValidated, err := p.validationCache.Contains(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil {
log.Errorw("failed to check already validated messages", "err", err)
} else if alreadyValidated {
if !p.isMessageRelevant(msg) {
return nil, ErrValidationNotRelevant
}
metrics.validationCache.Add(context.TODO(), 1, metric.WithAttributes(attrCacheHit, attrCacheKindMessage))
return &validatedMessage{msg: msg}, nil
} else {
Expand Down Expand Up @@ -243,6 +246,11 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
return nil, fmt.Errorf("message %v has unexpected justification: %w", msg, ErrValidationInvalid)
}

if !p.isMessageRelevant(msg) {
// Message is valid but no longer relevant in the context of progressing GPBFT.
return nil, ErrValidationNotRelevant
}

if cacheMessage {
if _, err := p.validationCache.Add(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil {
log.Warnw("failed to cache to already validated message", "err", err)
Expand Down Expand Up @@ -366,6 +374,47 @@ func (p *Participant) validateJustification(msg *GMessage, comt *committee) erro
return nil
}

// isMessageRelevant determines whether a given message is useful in terms of
// aiding the progress of an instance to the best of our knowledge.
func (p *Participant) isMessageRelevant(msg *GMessage) bool {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
var currentRound uint64
var currentPhase Phase
if p.gpbft != nil {
currentRound = p.gpbft.round
currentPhase = p.gpbft.phase
}

// Relevant messages are:
// 1. DECIDE messages from previous instance,
// 2. some messages from current instance (pending further checks), or
// 3. any message from future instance.
switch msg.Vote.Instance {
case p.currentInstance - 1:
return msg.Vote.Phase == DECIDE_PHASE
case p.currentInstance:
// Message is from the current round; proceed to check other conditions.
default:
return msg.Vote.Instance > p.currentInstance
}

// When we are at DECIDE phase the only relevant messages are DECIDE messages.
// Otherwise, relevant messages are either QUALITY, DECIDE, messages from
// previous round, current round or future rounds.
switch {
case currentPhase == DECIDE_PHASE:
return msg.Vote.Phase == DECIDE_PHASE
case msg.Vote.Phase == QUALITY_PHASE,
msg.Vote.Phase == DECIDE_PHASE,
msg.Vote.Round == currentRound-1, // Use explicit case for previous round to avoid unt64 overflow.
msg.Vote.Round >= currentRound:
return true
default:
return false
}
}

// Receives a validated Granite message from some other participant.
func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) {
if !p.apiMutex.TryLock() {
Expand Down
4 changes: 4 additions & 0 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
case errors.Is(err, gpbft.ErrValidationTooOld):
// we got the message too late
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// The message is valid but will not effectively aid progress of GPBFT. Ignore it
// to stop its further propagation across the network.
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugf("commitee error during validation: %+v", err)
return pubsub.ValidationIgnore
Expand Down
24 changes: 15 additions & 9 deletions sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
)
}

// HasMoreTicks checks whether there are any messages left to propagate across
// the network participants. See Tick.
func (n *Network) HasMoreTicks() bool {
return n.queue.Len() > 0
}

// Tick disseminates one message among participants and returns whether there are
// any more messages to process.
func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
func (n *Network) Tick(adv *adversary.Adversary) error {
msg := n.queue.Remove()
n.clock = msg.deliverAt

receiver, found := n.participants[msg.dest]
if !found {
return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
}
switch payload := msg.payload.(type) {
case string:
if payload != "ALARM" {
return false, fmt.Errorf("unknwon string message payload: %s", payload)
return fmt.Errorf("unknwon string message payload: %s", payload)
}
n.log(TraceRecvd, "P%d %s", msg.source, payload)
if err := receiver.ReceiveAlarm(); err != nil {
return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
}
case gpbft.GMessage:
// If GST has not elapsed, check if adversary allows the propagation of message.
Expand All @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
} else if !adv.AllowMessage(msg.source, msg.dest, payload) {
// GST has not passed and adversary blocks the delivery of message; proceed to
// next tick.
return n.queue.Len() > 0, nil
return nil
}
}
validated, err := receiver.ValidateMessage(&payload)
Expand All @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
// Silently drop old messages.
break
}
return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
}
n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload)
if err := receiver.ReceiveMessage(validated); err != nil {
return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
}
default:
return false, fmt.Errorf("unknown message payload: %v", payload)
return fmt.Errorf("unknown message payload: %v", payload)
}
return n.queue.Len() > 0, nil
return nil
}

func (n *Network) log(level int, format string, args ...interface{}) {
Expand Down
16 changes: 11 additions & 5 deletions sim/sim.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sim

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
}

// Run until there are no more messages, meaning termination or deadlock.
moreTicks := true
for moreTicks {
for s.network.HasMoreTicks() {
if err := s.ec.Err(); err != nil {
return fmt.Errorf("error in decision: %w", err)
}
Expand Down Expand Up @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
break
}
}
var err error
moreTicks, err = s.network.Tick(s.adversary)
if err != nil {

switch err := s.network.Tick(s.adversary); {
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// Ignore error signalling valid messages that are no longer useful for the
// progress of GPBFT. This can occur in normal operation depending on the order
// of delivered messages. In production, deployment this error is used to signal
// that the message does not need to be propagated among participants. In
// simulation, we simply ignore it.
case err != nil:
return fmt.Errorf("error performing simulation phase: %w", err)
}
}
Expand Down
8 changes: 5 additions & 3 deletions test/withhold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) {
}
// The adversary could convince the victim to decide a, so all must decide a.
require.NoError(t, err)
decision := sm.GetInstance(0).GetDecision(0)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
for _, victim := range victims {
decision := sm.GetInstance(0).GetDecision(victim)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
}
})
}
}

0 comments on commit 76bc018

Please sign in to comment.