Skip to content

Commit

Permalink
Refactor validation logic into a mutex-free pluggable struct
Browse files Browse the repository at this point in the history
Refactor the validation logic out of `Participant` and into its own
dedicated struct that is mutex-free and listens to the progress made by
the participant to infer the correct validation path.

The change above significantly reduces the need for mutex control over
current instance, which makes it easier to plug in extra conditional
behaviour, e.g. #583.

Fixes #561
  • Loading branch information
masih committed Sep 23, 2024
1 parent f5af3ff commit b387cf7
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 258 deletions.
17 changes: 15 additions & 2 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ type instance struct {
// independently of protocol phases/rounds.
decision *quorumState
// tracer traces logic logs for debugging and simulation purposes.
tracer Tracer
tracer Tracer
progress ProgressObserver
}

func newInstance(
Expand All @@ -221,7 +222,8 @@ func newInstance(
data *SupplementalData,
powerTable *PowerTable,
aggregateVerifier Aggregate,
beacon []byte) (*instance, error) {
beacon []byte,
progress ProgressObserver) (*instance, error) {
if input.IsZero() {
return nil, fmt.Errorf("input is empty")
}
Expand Down Expand Up @@ -251,6 +253,7 @@ func newInstance(
},
decision: newQuorumState(powerTable),
tracer: participant.tracer,
progress: progress,
}, nil
}

Expand Down Expand Up @@ -486,6 +489,7 @@ func (i *instance) beginQuality() error {
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrQualityPhase))
metrics.currentPhase.Record(context.TODO(), int64(QUALITY_PHASE))
return nil
Expand Down Expand Up @@ -546,6 +550,7 @@ func (i *instance) beginConverge(justification *Justification) {
i.getRound(i.round).converged.SetSelfValue(i.proposal, justification)

i.broadcast(i.round, CONVERGE_PHASE, i.proposal, true, justification)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrConvergePhase))
metrics.currentPhase.Record(context.TODO(), int64(CONVERGE_PHASE))
}
Expand Down Expand Up @@ -603,6 +608,7 @@ func (i *instance) beginPrepare(justification *Justification) {
i.resetRebroadcastParams()

i.broadcast(i.round, PREPARE_PHASE, i.value, false, justification)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrPreparePhase))
metrics.currentPhase.Record(context.TODO(), int64(PREPARE_PHASE))
}
Expand Down Expand Up @@ -656,6 +662,7 @@ func (i *instance) beginCommit() {
}

i.broadcast(i.round, COMMIT_PHASE, i.value, false, justification)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrCommitPhase))
metrics.currentPhase.Record(context.TODO(), int64(COMMIT_PHASE))
}
Expand Down Expand Up @@ -716,6 +723,7 @@ func (i *instance) tryCommit(round uint64) error {
func (i *instance) beginDecide(round uint64) {
i.phase = DECIDE_PHASE
i.resetRebroadcastParams()

var justification *Justification
// Value cannot be empty here.
if quorum, ok := i.getRound(round).committed.FindStrongQuorumFor(i.value.Key()); ok {
Expand All @@ -731,6 +739,7 @@ func (i *instance) beginDecide(round uint64) {
// Since each node sends only one DECIDE message, they must share the same vote
// in order to be aggregated.
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrDecidePhase))
metrics.currentPhase.Record(context.TODO(), int64(DECIDE_PHASE))
}
Expand All @@ -744,6 +753,7 @@ func (i *instance) skipToDecide(value ECChain, justification *Justification) {
i.value = i.proposal
i.resetRebroadcastParams()
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrDecidePhase))
metrics.currentPhase.Record(context.TODO(), int64(DECIDE_PHASE))
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToDecide))
Expand Down Expand Up @@ -779,6 +789,7 @@ func (i *instance) getRound(r uint64) *roundState {
func (i *instance) beginNextRound() {
i.log("moving to round %d with %s", i.round+1, i.proposal.String())
i.round += 1
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.currentRound.Record(context.TODO(), int64(i.round))

prevRound := i.getRound(i.round - 1)
Expand Down Expand Up @@ -806,6 +817,7 @@ func (i *instance) beginNextRound() {
func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justification) {
i.log("skipping from round %d to round %d with %s", i.round, round, i.proposal.String())
i.round = round
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.currentRound.Record(context.TODO(), int64(i.round))
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToRound))

Expand Down Expand Up @@ -847,6 +859,7 @@ func (i *instance) terminate(decision *Justification) {
i.value = decision.Vote.Value
i.terminationValue = decision
i.resetRebroadcastParams()
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrTerminatedPhase))
metrics.roundHistogram.Record(context.TODO(), int64(i.round))
metrics.currentPhase.Record(context.TODO(), int64(TERMINATED_PHASE))
Expand Down
2 changes: 1 addition & 1 deletion gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestGPBFT_WithEvenPowerDistribution(t *testing.T) {
t.Run("Queues future instance messages during current instance", func(t *testing.T) {
instance, driver := newInstanceAndDriver(t)
futureInstance := emulator.NewInstance(t,
42,
8,
gpbft.PowerEntries{
gpbft.PowerEntry{
ID: 0,
Expand Down
14 changes: 13 additions & 1 deletion gpbft/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"time"
)

var (
const (
defaultDelta = 3 * time.Second
defaultDeltaBackOffExponent = 2.0
defaultMaxCachedInstances = 10
defaultMaxCachedMessagesPerInstance = 25_000
defaultCommitteeLookback = 10
)

// Option represents a configurable parameter.
Expand All @@ -22,6 +23,7 @@ type options struct {
delta time.Duration
deltaBackOffExponent float64

committeeLookback uint64
maxLookaheadRounds uint64
rebroadcastAfter func(int) time.Duration

Expand All @@ -36,6 +38,7 @@ func newOptions(o ...Option) (*options, error) {
opts := &options{
delta: defaultDelta,
deltaBackOffExponent: defaultDeltaBackOffExponent,
committeeLookback: defaultCommitteeLookback,
rebroadcastAfter: defaultRebroadcastAfter,
maxCachedInstances: defaultMaxCachedInstances,
maxCachedMessagesPerInstance: defaultMaxCachedMessagesPerInstance,
Expand Down Expand Up @@ -118,6 +121,15 @@ func WithMaxCachedMessagesPerInstance(v int) Option {
}
}

// WithCommitteeLookback sets the number of instances in the past from which the
// committee for the latest instance is derived. Defaults to 10 if unset.
func WithCommitteeLookback(lookback uint64) Option {
return func(o *options) error {
o.committeeLookback = lookback
return nil

Check warning on line 129 in gpbft/options.go

View check run for this annotation

Codecov / codecov/patch

gpbft/options.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}

var defaultRebroadcastAfter = exponentialBackoffer(1.3, 0.1, 3*time.Second, 30*time.Second)

// WithRebroadcastBackoff sets the duration after the gPBFT timeout has elapsed, at
Expand Down
Loading

0 comments on commit b387cf7

Please sign in to comment.