Skip to content

Commit

Permalink
Refactor validation logic into a mutex-free pluggable struct
Browse files Browse the repository at this point in the history
Refactor the validation logic out of `Participant` and into its own
dedicated struct that is mutex-free and listens to the progress made by
the participant to infer the correct validation path.

The change above significantly reduces the need for mutex control over
current instance, which makes it easier to plug in extra conditional
behaviour, e.g. #583.

Fixes #561
  • Loading branch information
masih committed Sep 23, 2024
1 parent f5af3ff commit 9fad427
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 248 deletions.
16 changes: 14 additions & 2 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ type instance struct {
// independently of protocol phases/rounds.
decision *quorumState
// tracer traces logic logs for debugging and simulation purposes.
tracer Tracer
tracer Tracer
progress ProgressObserver
}

func newInstance(
Expand All @@ -221,7 +222,8 @@ func newInstance(
data *SupplementalData,
powerTable *PowerTable,
aggregateVerifier Aggregate,
beacon []byte) (*instance, error) {
beacon []byte,
progress ProgressObserver) (*instance, error) {
if input.IsZero() {
return nil, fmt.Errorf("input is empty")
}
Expand Down Expand Up @@ -251,6 +253,7 @@ func newInstance(
},
decision: newQuorumState(powerTable),
tracer: participant.tracer,
progress: progress,
}, nil
}

Expand Down Expand Up @@ -483,6 +486,7 @@ func (i *instance) beginQuality() error {
}
// Broadcast input value and wait to receive from others.
i.phase = QUALITY_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil)
Expand Down Expand Up @@ -537,6 +541,7 @@ func (i *instance) beginConverge(justification *Justification) {
}

i.phase = CONVERGE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -599,6 +604,7 @@ func (i *instance) tryConverge() error {
func (i *instance) beginPrepare(justification *Justification) {
// Broadcast preparation of value and wait for everyone to respond.
i.phase = PREPARE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -639,6 +645,7 @@ func (i *instance) tryPrepare() error {

func (i *instance) beginCommit() {
i.phase = COMMIT_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -715,6 +722,7 @@ func (i *instance) tryCommit(round uint64) error {

func (i *instance) beginDecide(round uint64) {
i.phase = DECIDE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.resetRebroadcastParams()
var justification *Justification
// Value cannot be empty here.
Expand All @@ -740,10 +748,12 @@ func (i *instance) beginDecide(round uint64) {
// The provided justification must justify the value being decided.
func (i *instance) skipToDecide(value ECChain, justification *Justification) {
i.phase = DECIDE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.proposal = value
i.value = i.proposal
i.resetRebroadcastParams()
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)

metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrDecidePhase))
metrics.currentPhase.Record(context.TODO(), int64(DECIDE_PHASE))
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToDecide))
Expand Down Expand Up @@ -844,9 +854,11 @@ func (i *instance) addCandidate(c ECChain) bool {
func (i *instance) terminate(decision *Justification) {
i.log("✅ terminated %s during round %d", &i.value, i.round)
i.phase = TERMINATED_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.value = decision.Vote.Value
i.terminationValue = decision
i.resetRebroadcastParams()

metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrTerminatedPhase))
metrics.roundHistogram.Record(context.TODO(), int64(i.round))
metrics.currentPhase.Record(context.TODO(), int64(TERMINATED_PHASE))
Expand Down
2 changes: 1 addition & 1 deletion gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestGPBFT_WithEvenPowerDistribution(t *testing.T) {
t.Run("Queues future instance messages during current instance", func(t *testing.T) {
instance, driver := newInstanceAndDriver(t)
futureInstance := emulator.NewInstance(t,
42,
8,
gpbft.PowerEntries{
gpbft.PowerEntry{
ID: 0,
Expand Down
14 changes: 13 additions & 1 deletion gpbft/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"time"
)

var (
const (
defaultDelta = 3 * time.Second
defaultDeltaBackOffExponent = 2.0
defaultMaxCachedInstances = 10
defaultMaxCachedMessagesPerInstance = 25_000
defaultCommitteeLookback = 10
)

// Option represents a configurable parameter.
Expand All @@ -22,6 +23,7 @@ type options struct {
delta time.Duration
deltaBackOffExponent float64

committeeLookback uint64
maxLookaheadRounds uint64
rebroadcastAfter func(int) time.Duration

Expand All @@ -36,6 +38,7 @@ func newOptions(o ...Option) (*options, error) {
opts := &options{
delta: defaultDelta,
deltaBackOffExponent: defaultDeltaBackOffExponent,
committeeLookback: defaultCommitteeLookback,
rebroadcastAfter: defaultRebroadcastAfter,
maxCachedInstances: defaultMaxCachedInstances,
maxCachedMessagesPerInstance: defaultMaxCachedMessagesPerInstance,
Expand Down Expand Up @@ -118,6 +121,15 @@ func WithMaxCachedMessagesPerInstance(v int) Option {
}
}

// WithCommitteeLookback sets the number of instances in the past from which the
// committee for the latest instance is derived. Defaults to 10 if unset.
func WithCommitteeLookback(lookback uint64) Option {
return func(o *options) error {
o.committeeLookback = lookback
return nil

Check warning on line 129 in gpbft/options.go

View check run for this annotation

Codecov / codecov/patch

gpbft/options.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}

var defaultRebroadcastAfter = exponentialBackoffer(1.3, 0.1, 3*time.Second, 30*time.Second)

// WithRebroadcastBackoff sets the duration after the gPBFT timeout has elapsed, at
Expand Down
Loading

0 comments on commit 9fad427

Please sign in to comment.