Skip to content

Commit

Permalink
Drop messages that are no longer useful for GPBFT progression
Browse files Browse the repository at this point in the history
As a GPBFT instance progresses some messages become irrelevant, in that
they do not effectively aid the progress of the instance for
participants. Instead, GPBFT offers other built-in mechanisms to aid
progress of lagging participants such as selective
rebroadcast, propagation of DECIDE messages from the previous instance
and finality certificate exchange.

The changes here introduce a dedicated error type returned as part of
message validation to signal that although a message is valid it is no
longer relevant. This error type is then checked by pubsub to avoid
further propagation of those messages.

This reduces the redundant pubsub traffic for the network participants.

Fixes #583
  • Loading branch information
masih committed Sep 20, 2024
1 parent 5724153 commit 4cb81c0
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
3 changes: 3 additions & 0 deletions gpbft/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
//
// See SupplementalData.
ErrValidationWrongSupplement = newValidationError("unexpected supplemental data")
// ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance,
// and is not worth propagating to others.
ErrValidationNotRelevant = newValidationError("message is valid but not relevant")

// ErrReceivedWrongInstance signals that a message is received with mismatching instance ID.
ErrReceivedWrongInstance = errors.New("received message for wrong instance")
Expand Down
1 change: 1 addition & 0 deletions gpbft/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) {
{name: "ErrValidationInvalid", subject: ErrValidationInvalid},
{name: "ErrValidationWrongBase", subject: ErrValidationWrongBase},
{name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement},
{name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
48 changes: 48 additions & 0 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
} else if alreadyValidated, err := p.validationCache.Contains(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil {
log.Errorw("failed to check already validated messages", "err", err)
} else if alreadyValidated {
if !p.isMessageRelevant(msg) {
return nil, ErrValidationNotRelevant
}
metrics.validationCache.Add(context.TODO(), 1, metric.WithAttributes(attrCacheHit, attrCacheKindMessage))
return &validatedMessage{msg: msg}, nil
} else {
Expand Down Expand Up @@ -243,6 +246,11 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
return nil, fmt.Errorf("message %v has unexpected justification: %w", msg, ErrValidationInvalid)
}

if !p.isMessageRelevant(msg) {
// Message is valid but no longer relevant in the context of progressing GPBFT.
return nil, ErrValidationNotRelevant
}

if cacheMessage {
if _, err := p.validationCache.Add(msg.Vote.Instance, messageCacheNamespace, buf.Bytes()); err != nil {
log.Warnw("failed to cache to already validated message", "err", err)
Expand Down Expand Up @@ -366,6 +374,46 @@ func (p *Participant) validateJustification(msg *GMessage, comt *committee) erro
return nil
}

// isMessageRelevant determines whether a given message is useful in terms of
// aiding the progress of an instance to the best of our knowledge.
func (p *Participant) isMessageRelevant(msg *GMessage) bool {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
var currentRound uint64
var currentPhase Phase
if p.gpbft != nil {
currentRound = p.gpbft.round
currentPhase = p.gpbft.phase
}

// Relevant messages are:
// 1. DECIDE messages from previous instance,
// 2. some messages from current instance (pending further checks), or
// 3. any message from future instance.
switch msg.Vote.Instance {
case p.currentInstance:
// Message is from the current round; proceed to check other conditions.
case min(0, p.currentInstance-1):
return msg.Vote.Phase == DECIDE_PHASE
default:
return msg.Vote.Instance > p.currentInstance
}

// Whe we are at DECIDE phase the only relevant messages are DECIDE messages.
// Otherwise, relevant messages are either QUALITY, DECIDE, messages from
// previous round, current round or future rounds.
switch {
case currentPhase == DECIDE_PHASE:
return msg.Vote.Phase == DECIDE_PHASE
case msg.Vote.Phase == QUALITY_PHASE,
msg.Vote.Phase == DECIDE_PHASE,
msg.Vote.Round >= min(0, currentRound-1):
return true
default:
return false
}
}

// Receives a validated Granite message from some other participant.
func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) {
if !p.apiMutex.TryLock() {
Expand Down
4 changes: 4 additions & 0 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
case errors.Is(err, gpbft.ErrValidationTooOld):
// we got the message too late
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// The message is valid but will not effectively aid progress of GPBFT. Ignore it
// to stop its further propagation across the network.
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugf("commitee error during validation: %+v", err)
return pubsub.ValidationIgnore
Expand Down
6 changes: 5 additions & 1 deletion sim/sim.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sim

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -131,7 +132,10 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
}
var err error
moreTicks, err = s.network.Tick(s.adversary)
if err != nil {
switch {
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// Ignore error signalling valid messages that are no longer useful for the progress of GPBFT.
case err != nil:
return fmt.Errorf("error performing simulation phase: %w", err)
}
}
Expand Down

0 comments on commit 4cb81c0

Please sign in to comment.