From 2a5776267161becb3ae6dac5b89e20b0a8d1a6f8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sat, 6 Jul 2024 17:28:00 +0200 Subject: [PATCH] Improve lifecycle management 1. Merge the client and the runner. The distinction was unclear and the client/runner/module tended to reach into each other. This change merges the client/runner and then separates the new "runner" from the module as much as possible. 2. Completely stop/discard the runner when rebootstrapping. The new logic carefully waits for all components to stop before moving on. 3. Simplify locking and make sure we take the locks where appropriate. 4. Merge bootstrap and re-configure logic. The dynamic manifest client no longer cares about _when_ a manifest should be applied, it simply gives it to the module (F3) and let's F3 us its normal bootstrap logic. Finally, I've improved the tests to: 1. Always on exit (checking for errors). 2. Never fail from goroutines. 3. Correctly wait for manifest changes (previously, it would wait for at least one node to change manifests). NOTEs: 1. This removes the ability to reconfig without rebootstrap, but preserves the ability to _pause_ without rebootstrap. 2. This causes bootstrap to start at the time the bootstrap epoch _should_ have happened instead of starting at the next non-null epoch. In practice, this should behave better as all nodes will start at the same time (and will look back 900 epochs anyways). --- cmd/f3/manifest.go | 64 ++-- cmd/f3/run.go | 14 +- ec/powerdelta.go | 29 ++ f3.go | 630 ++++++++++------------------------- go.mod | 2 +- host.go | 399 +++++++++++++++------- manifest/dynamic_manifest.go | 264 +++++++-------- manifest/manifest.go | 42 +-- manifest/manifest_sender.go | 67 ++-- manifest/manifest_test.go | 2 - manifest/static.go | 24 +- store.go | 32 ++ test/f3_test.go | 333 ++++++++---------- 13 files changed, 897 insertions(+), 1005 deletions(-) create mode 100644 ec/powerdelta.go create mode 100644 store.go diff --git a/cmd/f3/manifest.go b/cmd/f3/manifest.go index 735fe228..7143aece 100644 --- a/cmd/f3/manifest.go +++ b/cmd/f3/manifest.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -137,6 +138,8 @@ var manifestServeCmd = cli.Command{ return xerrors.Errorf("initializing libp2p host: %w", err) } + defer func() { _ = host.Close() }() + // Connect to all bootstrap addresses once. This should be sufficient to build // the pubsub mesh, if not then we need to periodically re-connect and/or pull in // the Lotus bootstrapping, which includes DHT connectivity. @@ -171,15 +174,15 @@ var manifestServeCmd = cli.Command{ } manifestPath := c.String("manifest") - loadManifestAndVersion := func() (manifest.Manifest, manifest.Version, error) { + loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) { m, err := loadManifest(manifestPath) if err != nil { - return manifest.Manifest{}, "", xerrors.Errorf("loading manifest: %w", err) + return nil, "", xerrors.Errorf("loading manifest: %w", err) } version, err := m.Version() if err != nil { - return manifest.Manifest{}, "", xerrors.Errorf("versioning manifest: %w", err) + return nil, "", xerrors.Errorf("versioning manifest: %w", err) } return m, version, nil } @@ -200,46 +203,49 @@ var manifestServeCmd = cli.Command{ } _, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion) - go func() { - sender.Start(c.Context) - }() - defer func() { - sender.Stop() - _ = host.Close() - }() - - checkTicker := time.NewTicker(c.Duration("checkInterval")) - for c.Context.Err() == nil { - select { - case <-c.Context.Done(): - return c.Context.Err() - case <-checkTicker.C: - if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil { - _, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err) - } else if manifestVersion != nextManifestVersion { - _, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion) - sender.UpdateManifest(nextManifest) - manifestVersion = nextManifestVersion + checkInterval := c.Duration("checkInterval") + + errgrp, ctx := errgroup.WithContext(c.Context) + errgrp.Go(func() error { return sender.Run(ctx) }) + errgrp.Go(func() error { + checkTicker := time.NewTicker(checkInterval) + defer checkTicker.Stop() + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return nil + case <-checkTicker.C: + if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil { + _, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err) + } else if manifestVersion != nextManifestVersion { + _, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion) + sender.UpdateManifest(nextManifest) + manifestVersion = nextManifestVersion + } } } - } - return nil + + return nil + }) + + return errgrp.Wait() }, } -func getManifest(c *cli.Context) (manifest.Manifest, error) { +func getManifest(c *cli.Context) (*manifest.Manifest, error) { manifestPath := c.String("manifest") return loadManifest(manifestPath) } -func loadManifest(path string) (manifest.Manifest, error) { +func loadManifest(path string) (*manifest.Manifest, error) { f, err := os.Open(path) if err != nil { - return manifest.Manifest{}, xerrors.Errorf("opening %s to load manifest: %w", path, err) + return nil, xerrors.Errorf("opening %s to load manifest: %w", path, err) } defer f.Close() var m manifest.Manifest err = m.Unmarshal(f) - return m, err + return &m, err } diff --git a/cmd/f3/run.go b/cmd/f3/run.go index 2355212c..42804e28 100644 --- a/cmd/f3/run.go +++ b/cmd/f3/run.go @@ -121,20 +121,18 @@ var runCmd = cli.Command{ return xerrors.Errorf("creating module: %w", err) } - mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module)) go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend) - return module.Run(ctx) + if err := module.Start(ctx); err != nil { + return nil + } + <-ctx.Done() + return module.Stop(context.Background()) }, } func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) { - for { - select { - case <-ctx.Done(): - return - default: - } + for ctx.Err() == nil { ch := make(chan *gpbft.MessageBuilder, 4) module.SubscribeForMessagesToSign(ch) diff --git a/ec/powerdelta.go b/ec/powerdelta.go new file mode 100644 index 00000000..10d3bfe7 --- /dev/null +++ b/ec/powerdelta.go @@ -0,0 +1,29 @@ +package ec + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/gpbft" +) + +func WithModifiedPower(backend Backend, delta []certs.PowerTableDelta) Backend { + return &withModifiedPower{ + Backend: backend, + delta: delta, + } +} + +type withModifiedPower struct { + Backend + delta []certs.PowerTableDelta +} + +func (b *withModifiedPower) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { + pt, err := b.Backend.GetPowerTable(ctx, ts) + if err != nil { + return nil, fmt.Errorf("getting power table: %w", err) + } + return certs.ApplyPowerTableDiffs(pt, b.delta) +} diff --git a/f3.go b/f3.go index 9b9fb411..3ffda511 100644 --- a/f3.go +++ b/f3.go @@ -1,141 +1,74 @@ package f3 import ( - "bytes" "context" - "errors" "sync" "time" - "github.com/Kubuxu/go-broadcast" "github.com/filecoin-project/go-f3/certs" "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/Kubuxu/go-broadcast" + "github.com/ipfs/go-datastore" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) type F3 struct { - // mLk protects the runner and certStore - mLk sync.Mutex - runner *gpbftRunner - // certStore is nil until Run is called on the F3 - csLk sync.Mutex - certStore *certstore.Store - - cancelCtx context.CancelFunc - host host.Host - ds datastore.Datastore - ec ec.Backend - log Logger - - // psLk protects pubsub. We need an independent one - // due to its use in Broadcast - psLk sync.Mutex - pubsub *pubsub.PubSub - msgSub *pubsub.Subscription - - client *client -} - -type client struct { - // certStore is nil until Run is called on the F3 - certStore *certstore.Store - manifest manifest.ManifestProvider - ec ec.Backend - + verifier gpbft.Verifier signingMarshaller gpbft.SigningMarshaler + manifestProvider manifest.ManifestProvider busBroadcast broadcast.Channel[*gpbft.MessageBuilder] - gpbft.Verifier - logger Logger - loggerWithSkip Logger - - // Populated after Run is called - messageQueue <-chan gpbft.ValidatedMessage - topic *pubsub.Topic - - // Notifies manifest updates - manifestUpdate <-chan uint64 - // Triggers the cancellation of the incoming message - // routine to avoid delivering any outstanding messages. - incomingCancel func() -} - -func (mc *client) BroadcastMessage(ctx context.Context, mb *gpbft.MessageBuilder) error { - mb.SetNetworkName(mc.manifest.Manifest().NetworkName) - mb.SetSigningMarshaler(mc.signingMarshaller) - mc.busBroadcast.Publish(mb) - return nil -} - -func (mc *client) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { - // Apply power table deltas from the manifest - pt, err := mc.ec.GetPowerTable(ctx, ts) - if err != nil { - return nil, xerrors.Errorf("getting power table: %w", err) - } - return certs.ApplyPowerTableDiffs(pt, mc.manifest.Manifest().PowerUpdate) -} - -func (mc *client) IncomingMessages() <-chan gpbft.ValidatedMessage { - return mc.messageQueue -} - -var _ gpbft.Tracer = (*client)(nil) + host host.Host + ds datastore.Datastore + ec ec.Backend + log Logger + pubsub *pubsub.PubSub -// Log fulfills the gpbft.Tracer interface -func (mc *client) Log(fmt string, args ...any) { - mc.loggerWithSkip.Debugf(fmt, args...) -} + runningCtx context.Context + cancelCtx context.CancelFunc + errgrp *errgroup.Group -func (mc *client) Logger() Logger { - return mc.logger + mu sync.Mutex + cs *certstore.Store + manifest *manifest.Manifest + runner *gpbftRunner } // New creates and setups f3 with libp2p // The context is used for initialization not runtime. // signingMarshaller can be nil for default SigningMarshaler -func New(ctx context.Context, manifest manifest.ManifestProvider, ds datastore.Datastore, h host.Host, +func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore.Datastore, h host.Host, ps *pubsub.PubSub, verif gpbft.Verifier, ec ec.Backend, log Logger, signingMarshaller gpbft.SigningMarshaler) (*F3, error) { - ds = namespace.Wrap(ds, manifest.Manifest().DatastorePrefix()) - loggerWithSkip := log - if zapLogger, ok := log.(*logging.ZapEventLogger); ok { - loggerWithSkip = logging.WithSkip(zapLogger, 1) - } + runningCtx, cancel := context.WithCancel(context.Background()) + errgrp, runningCtx := errgroup.WithContext(runningCtx) + if signingMarshaller == nil { signingMarshaller = gpbft.DefaultSigningMarshaller } - m := F3{ - ds: ds, - host: h, - pubsub: ps, - ec: ec, - log: log, - - client: &client{ - ec: ec, - manifest: manifest, - Verifier: verif, - logger: log, - loggerWithSkip: loggerWithSkip, - signingMarshaller: signingMarshaller, - }, - } - - return &m, nil + return &F3{ + verifier: verif, + signingMarshaller: signingMarshaller, + manifestProvider: manifest, + host: h, + ds: ds, + ec: ec, + log: log, + pubsub: ps, + runningCtx: runningCtx, + cancelCtx: cancel, + errgrp: errgrp, + }, nil } // SubscribeForMessagesToSign is used to subscribe to the message broadcast channel. @@ -145,412 +78,197 @@ func New(ctx context.Context, manifest manifest.ManifestProvider, ds datastore.D // To stop subscribing, either the closer function can be used, or the channel can be abandoned. // Passing a channel multiple times to the Subscribe function will result in a panic. func (m *F3) SubscribeForMessagesToSign(ch chan<- *gpbft.MessageBuilder) (closer func()) { - _, closer = m.client.busBroadcast.Subscribe(ch) + _, closer = m.busBroadcast.Subscribe(ch) return closer } -func (m *F3) Manifest() manifest.Manifest { - return m.client.manifest.Manifest() +func (m *F3) Manifest() *manifest.Manifest { + m.mu.Lock() + defer m.mu.Unlock() + return m.manifest } func (m *F3) Broadcast(ctx context.Context, signatureBuilder *gpbft.SignatureBuilder, msgSig []byte, vrf []byte) { msg := signatureBuilder.Build(msgSig, vrf) - var bw bytes.Buffer - err := msg.MarshalCBOR(&bw) - if err != nil { - m.log.Errorf("marshalling GMessage: %+v", err) - return - } - - m.psLk.Lock() - if m.client.topic != nil { - err = m.client.topic.Publish(ctx, bw.Bytes()) - if err != nil { - m.log.Errorf("publishing on topic: %w", err) - } - } - m.psLk.Unlock() -} - -func (m *F3) setCertStore(cs *certstore.Store) { - m.csLk.Lock() - defer m.csLk.Unlock() - m.certStore = cs - m.client.certStore = cs -} + m.mu.Lock() + runner := m.runner + m.mu.Unlock() -func (m *F3) setupPubsub(runner *gpbftRunner) error { - pubsubTopicName := m.Manifest().PubSubTopic() - - // explicit type to typecheck the anonymous function defintion - // a bit ugly but I don't want gpbftRunner to know about pubsub - var validator pubsub.ValidatorEx = func(ctx context.Context, pID peer.ID, - msg *pubsub.Message) pubsub.ValidationResult { - - var gmsg gpbft.GMessage - err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)) - if err != nil { - return pubsub.ValidationReject - } - validatedMessage, err := runner.ValidateMessage(&gmsg) - if errors.Is(err, gpbft.ErrValidationInvalid) { - m.log.Debugf("validation error during validation: %+v", err) - return pubsub.ValidationReject - } - if err != nil { - m.log.Warnf("unknown error during validation: %+v", err) - return pubsub.ValidationIgnore - } - msg.ValidatorData = validatedMessage - return pubsub.ValidationAccept - } - - err := m.pubsub.RegisterTopicValidator(pubsubTopicName, validator) - if err != nil { - return xerrors.Errorf("registering topic validator: %w", err) + if runner == nil { + m.log.Error("attempted to broadcast message while F3 wasn't running") + return } - topic, err := m.pubsub.Join(pubsubTopicName) + err := runner.BroadcastMessage(msg) if err != nil { - return xerrors.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err) + m.log.Errorf("failed to broadcast message: %+v", err) } - m.client.topic = topic - return nil } -func (m *F3) teardownPubsub(manifest manifest.Manifest) (err error) { - m.psLk.Lock() - defer m.psLk.Unlock() - if m.client.topic != nil { - m.msgSub.Cancel() - err = multierr.Combine( - m.pubsub.UnregisterTopicValidator(manifest.PubSubTopic()), - m.client.topic.Close(), - ) - m.client.topic = nil - } - return err -} - -func (m *F3) initGpbftRunner(ctx context.Context) error { - m.mLk.Lock() - defer m.mLk.Unlock() - - cs, err := certstore.OpenStore(ctx, m.ds) - if err == nil { - m.setCertStore(cs) - } else if errors.Is(err, certstore.ErrNotInitialized) { - err := m.boostrap(ctx) - if err != nil { - return xerrors.Errorf("failed to boostrap: %w", err) - } - } else { - return xerrors.Errorf("opening certstore: %w", err) - } - - m.runner, err = newRunner(m.client.manifest, m.client) - if err != nil { - return xerrors.Errorf("creating gpbft host: %w", err) - } - - m.psLk.Lock() - if err := m.setupPubsub(m.runner); err != nil { - return xerrors.Errorf("setting up pubsub: %w", err) - } - m.msgSub, err = m.client.topic.Subscribe() - if err != nil { - m.psLk.Unlock() - return xerrors.Errorf("subscribing to topic: %w", err) - } - m.psLk.Unlock() - return nil -} +func (m *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) { + m.mu.Lock() + cs := m.cs + m.mu.Unlock() -// Sets up the gpbft runner, this is triggered at initialization -func (m *F3) startGpbftRunner(ctx context.Context, errCh chan error) { - if err := m.initGpbftRunner(ctx); err != nil { - errCh <- xerrors.Errorf("initializing gpbft host: %w", err) - return + if cs == nil { + return nil, xerrors.Errorf("F3 is not running") } - - // the size of the buffer is set to prevent message spamming from pubsub, - // so it is a big enough buffer to be able to accommodate new messages - // at "high-rate", but small enough to avoid spamming or clogging the node. - messageQueue := make(chan gpbft.ValidatedMessage, 20) - m.client.messageQueue = messageQueue - - go func() { - m.mLk.Lock() - latest := m.certStore.Latest() - m.mLk.Unlock() - startInstance := m.Manifest().InitialInstance - if latest != nil { - startInstance = latest.GPBFTInstance + 1 - } - // Check context before starting the instance - select { - case <-ctx.Done(): - // If the context is cancelled before starting, return immediately - errCh <- ctx.Err() - return - default: - // Proceed with running the instance - err := m.runner.Run(startInstance, ctx) - if err != nil { - m.log.Errorf("error returned while running host: %+v", err) - } - errCh <- err - } - }() - - m.handleIncomingMessages(ctx, messageQueue) + return cs.Latest(), nil } -func (m *F3) boostrap(ctx context.Context) error { - head, err := m.ec.GetHead(ctx) - if err != nil { - return xerrors.Errorf("failed to get the head: %w", err) - } - - if head.Epoch() < m.Manifest().BootstrapEpoch { - // wait for bootstrap epoch - for { - head, err := m.ec.GetHead(ctx) - if err != nil { - return xerrors.Errorf("getting head: %w", err) - } - if head.Epoch() >= m.Manifest().BootstrapEpoch { - break - } - - m.log.Infof("wating for bootstrap epoch (%d): currently at epoch %d", m.Manifest().BootstrapEpoch, head.Epoch()) - aim := time.Until(head.Timestamp().Add(m.Manifest().ECPeriod)) - // correct for null epochs - for aim < 0 { - aim += m.Manifest().ECPeriod - } - - select { - case <-time.After(aim): - case <-ctx.Done(): - return ctx.Err() - } - } - } +func (m *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) { + m.mu.Lock() + cs := m.cs + m.mu.Unlock() - ts, err := m.ec.GetTipsetByEpoch(ctx, m.Manifest().BootstrapEpoch-m.Manifest().ECFinality) - if err != nil { - return xerrors.Errorf("getting initial power tipset: %w", err) + if cs == nil { + return nil, xerrors.Errorf("F3 is not running") } + return cs.Get(ctx, instance) +} - initialPowerTable, err := m.ec.GetPowerTable(ctx, ts.Key()) - if err != nil { - return xerrors.Errorf("getting initial power table: %w", err) +// Returns the time at which the F3 instance specified by the passed manifest should be started, or +// 0 if the passed manifest is nil. +func (m *F3) computeBootstrapDelay(manifest *manifest.Manifest) (delay time.Duration, err error) { + if manifest == nil { + return 0, nil } - cs, err := certstore.CreateStore(ctx, m.ds, m.Manifest().InitialInstance, initialPowerTable) + ts, err := m.ec.GetHead(m.runningCtx) if err != nil { - return xerrors.Errorf("creating certstore: %w", err) - } - m.setCertStore(cs) - return nil -} - -func (m *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) { - m.mLk.Lock() - defer m.mLk.Unlock() - if m.certStore == nil { - return nil, xerrors.Errorf("F3 is not running") - } - return m.certStore.Latest(), nil -} -func (m *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) { - m.mLk.Lock() - defer m.mLk.Unlock() - if m.certStore == nil { - return nil, xerrors.Errorf("F3 is not running") - } - return m.certStore.Get(ctx, instance) + err := xerrors.Errorf("failed to get the EC chain head: %w", err) + return 0, err + } + + // We bootstrap now if we're at the correct time, whether or + // not we've hit the correct epoch. The important thing is + // that: + // + // 1. All nodes will use BootstrapEpoch - Finality to pick the base. + // 2. All nodes will bootstrap at the same time. + currentEpoch := ts.Epoch() + if currentEpoch < manifest.BootstrapEpoch { + epochDelay := manifest.BootstrapEpoch - currentEpoch + start := ts.Timestamp().Add(time.Duration(epochDelay) * manifest.ECPeriod) + return max(time.Until(start), 0), nil + } + return 0, nil } // Run start the module. It will exit when context is cancelled. // Or if there is an error from the message handling routines. -func (m *F3) Run(ctx context.Context) (_err error) { - ctx, m.cancelCtx = context.WithCancel(ctx) - defer m.cancelCtx() - - runnerErrCh := make(chan error, 1) - manifestErrCh := make(chan error, 1) - - // bootstrap runner for the initial manifest - go m.startGpbftRunner(ctx, runnerErrCh) - - // run manifest provider. This runs a background goroutine that will - // handle dynamic manifest updates if this is a dynamic manifest provider. - // If it is a static manifest it does nothing. - go m.client.manifest.Run(ctx, manifestErrCh) - - // teardown pubsub on shutdown - defer func() { - teardownErr := m.teardownPubsub(m.Manifest()) - _err = multierr.Append(_err, teardownErr) - }() - - select { - case <-ctx.Done(): - return nil - case err := <-runnerErrCh: - return err - case err := <-manifestErrCh: +func (m *F3) Start(startCtx context.Context) (_err error) { + err := m.manifestProvider.Start(startCtx) + if err != nil { return err } -} - -func (m *F3) Stop() { - m.mLk.Lock() - defer m.mLk.Unlock() - m.pauseRunner() - - if m.cancelCtx != nil { - m.cancelCtx() + // Try to get an initial manifest immediately if possible so uses can query it immediately. + var pendingManifest *manifest.Manifest + select { + case pendingManifest = <-m.manifestProvider.ManifestUpdates(): + m.mu.Lock() + m.manifest = pendingManifest + m.mu.Unlock() + default: } -} -func (m *F3) pauseRunner() { - if m.runner != nil { - m.runner.Stop() + initialDelay, err := m.computeBootstrapDelay(pendingManifest) + if err != nil { + return err } - m.runner = nil -} -func (m *F3) handleIncomingMessages(ctx context.Context, queue chan gpbft.ValidatedMessage) { - ctx, m.client.incomingCancel = context.WithCancel(ctx) -loop: - for { - var msg *pubsub.Message - msg, err := m.msgSub.Next(ctx) - if err != nil { - if ctx.Err() != nil { - break - } - m.log.Errorf("msgPubsub subscription.Next() returned an error: %+v", err) - break - } - gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage) - if !ok { - m.log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) - continue - } - select { - case queue <- gmsg: - case <-ctx.Done(): - break loop + // Try to start immediately if we have a manifest available and don't have to wait to + // bootstrap. That way, we'll be fully started when we return from Start. + if initialDelay == 0 { + if err := m.reconfigure(startCtx, pendingManifest); err != nil { + return xerrors.Errorf("failed to start GPBFT: %w", err) } + pendingManifest = nil } -} -// Callback to be triggered when there is a dynamic manifest change -// If the manifest triggers a rebootstrap it starts a new runner with the new configuration. -// If there is no rebootstrap it only starts a new pubsub topic with a new network name that -// depends on the manifest version so there is no overlap between different configuration instances -func ManifestChangeCallback(m *F3) manifest.OnManifestChange { - // We can only accommodate 1 manifest update at a time, thus - // the buffer size. - manifestUpdate := make(chan uint64, 1) - m.client.manifestUpdate = manifestUpdate - return func(ctx context.Context, prevManifest manifest.Manifest, errCh chan error) { - // Tear down pubsub. - if err := m.teardownPubsub(prevManifest); err != nil { - // for now we just log the error and continue. - // This is not critical, but alternative approaches welcome. - m.log.Errorf("error stopping gpbft runner: %+v", err) + m.errgrp.Go(func() error { + manifestChangeTimer := time.NewTimer(initialDelay) + if pendingManifest == nil && !manifestChangeTimer.Stop() { + <-manifestChangeTimer.C } - // empty message queue from outstanding messages - m.emptyMessageQueue() - if m.Manifest().Pause { - m.pauseCallback() - } else { + defer manifestChangeTimer.Stop() - if m.Manifest().ReBootstrap { - m.withRebootstrapCallback(ctx, errCh) - } else { - m.withoutRebootstrapCallback(ctx, manifestUpdate, errCh) + for m.runningCtx.Err() == nil { + select { + case update := <-m.manifestProvider.ManifestUpdates(): + if pendingManifest != nil && !manifestChangeTimer.Stop() { + <-manifestChangeTimer.C + } + + pendingManifest = update + if delay, err := m.computeBootstrapDelay(update); err != nil { + return err + } else if delay > 0 { + manifestChangeTimer.Reset(delay) + continue + } + case <-manifestChangeTimer.C: + case <-m.runningCtx.Done(): + return nil + } + if err := m.reconfigure(m.runningCtx, pendingManifest); err != nil { + return xerrors.Errorf("failed to reconfigure F3: %w", err) } + pendingManifest = nil } + return nil + }) - } + return nil } -func (m *F3) withRebootstrapCallback(ctx context.Context, errCh chan error) { - m.mLk.Lock() - m.log.Infof("triggering manifest (seq=%d) change with rebootstrap", m.Manifest().Sequence) - - // if runner still up - if m.runner != nil { - m.runner.Stop() - } - - // clear the certstore - if err := m.certStore.DeleteAll(ctx); err != nil { - errCh <- xerrors.Errorf("clearing certstore: %w", err) - m.mLk.Unlock() - return - } - m.mLk.Unlock() - m.startGpbftRunner(ctx, errCh) +func (m *F3) Stop(stopCtx context.Context) (_err error) { + m.cancelCtx() + return multierr.Combine( + m.manifestProvider.Stop(stopCtx), + m.errgrp.Wait(), + ) } -func (m *F3) withoutRebootstrapCallback(ctx context.Context, manifestUpdate chan uint64, errCh chan error) { - m.mLk.Lock() - m.log.Infof("triggering manifest (seq=%d) change without rebootstrap", m.Manifest().Sequence) +func (m *F3) reconfigure(ctx context.Context, manifest *manifest.Manifest) error { + m.mu.Lock() + defer m.mu.Unlock() if m.runner != nil { - m.log.Error("cannot trigger a callback without rebootstrap if the runner is stop") + if err := m.runner.Stop(ctx); err != nil { + return err + } + m.runner = nil + } + if manifest == nil { + return nil } - // immediately stop listening to network messages - m.client.incomingCancel() - m.psLk.Lock() - if err := m.setupPubsub(m.runner); err != nil { - errCh <- xerrors.Errorf("setting up pubsub: %w", err) - return + runnerEc := m.ec + if len(manifest.PowerUpdate) > 0 { + runnerEc = ec.WithModifiedPower(m.ec, manifest.PowerUpdate) } - var err error - m.msgSub, err = m.client.topic.Subscribe() + + cs, err := openCertstore(m.runningCtx, runnerEc, m.ds, manifest) if err != nil { - errCh <- xerrors.Errorf("subscribing to topic: %w", err) - return + return xerrors.Errorf("failed to open certstore: %w", err) } - m.psLk.Unlock() - - messageQueue := make(chan gpbft.ValidatedMessage, 20) - m.client.messageQueue = messageQueue - // notify update to host to pick up the new message queue - fc := m.certStore.Latest() - manifestUpdate <- fc.GPBFTInstance - m.mLk.Unlock() - m.handleIncomingMessages(ctx, messageQueue) -} -func (m *F3) pauseCallback() { - m.mLk.Lock() - defer m.mLk.Unlock() - m.log.Infof("triggering manifest (seq=%d) change with pause", m.Manifest().Sequence) - m.pauseRunner() -} - -func (m *F3) emptyMessageQueue() { - for { - select { - case <-m.client.messageQueue: - m.log.Debug("emptying message queue") - default: - return - } + m.cs = cs + m.manifest = manifest + m.runner, err = newRunner( + ctx, m.cs, runnerEc, m.log, m.pubsub, + m.signingMarshaller, m.verifier, + m.busBroadcast.Publish, m.manifest, + ) + if err != nil { + return err } + + return m.runner.Start(ctx) } type Logger interface { @@ -567,13 +285,21 @@ type Logger interface { // IsRunning returns true if gpbft is running // Used mainly for testing purposes func (m *F3) IsRunning() bool { - m.mLk.Lock() - defer m.mLk.Unlock() + m.mu.Lock() + defer m.mu.Unlock() return m.runner != nil } // GetPowerTable returns the power table for the given tipset // Used mainly for testing purposes func (m *F3) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { - return m.client.GetPowerTable(ctx, ts) + m.mu.Lock() + manifest := m.manifest + m.mu.Unlock() + + if manifest == nil { + return nil, xerrors.Errorf("no known network manifest") + } + + return ec.WithModifiedPower(m.ec, manifest.PowerUpdate).GetPowerTable(ctx, ts) } diff --git a/go.mod b/go.mod index 25c4ff22..077f5e8f 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/whyrusleeping/cbor-gen v0.1.1 go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.23.0 + golang.org/x/sync v0.7.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 ) @@ -120,7 +121,6 @@ require ( golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/tools v0.21.0 // indirect diff --git a/host.go b/host.go index 1da2cdd7..73074e55 100644 --- a/host.go +++ b/host.go @@ -3,38 +3,78 @@ package f3 import ( "bytes" "context" + "errors" "slices" "time" "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" + logging "github.com/ipfs/go-log/v2" + pubsub "github.com/libp2p/go-libp2p-pubsub" + peer "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) +type BroadcastMessage func(*gpbft.MessageBuilder) + // gpbftRunner is responsible for running gpbft.Participant, taking in all concurrent events and // passing them to gpbft in a single thread. type gpbftRunner struct { - client *client + certStore *certstore.Store + manifest *manifest.Manifest + ec ec.Backend + pubsub *pubsub.PubSub + signingMarshaller gpbft.SigningMarshaler + verifier gpbft.Verifier + broadcastCb BroadcastMessage + log, logWithSkip Logger + participant *gpbft.Participant - manifest manifest.ManifestProvider + topic *pubsub.Topic alertTimer *time.Timer runningCtx context.Context + errgrp *errgroup.Group ctxCancel context.CancelFunc - log Logger } -// gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant -type gpbftHost gpbftRunner +func newRunner( + _ context.Context, + cs *certstore.Store, + ec ec.Backend, + log Logger, + ps *pubsub.PubSub, + signer gpbft.SigningMarshaler, + verifier gpbft.Verifier, + broadcastCb BroadcastMessage, + m *manifest.Manifest, +) (*gpbftRunner, error) { + runningCtx, ctxCancel := context.WithCancel(context.Background()) + errgrp, runningCtx := errgroup.WithContext(runningCtx) -func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error) { runner := &gpbftRunner{ - client: client, - manifest: m, - log: client.Logger(), + certStore: cs, + manifest: m, + ec: ec, + pubsub: ps, + signingMarshaller: signer, + verifier: verifier, + broadcastCb: broadcastCb, + log: log, + logWithSkip: log, + runningCtx: runningCtx, + errgrp: errgrp, + ctxCancel: ctxCancel, + } + + if zapLogger, ok := runner.log.(*logging.ZapEventLogger); ok { + runner.logWithSkip = logging.WithSkip(zapLogger, 1) } // create a stopped timer to facilitate alerts requested from gpbft @@ -44,7 +84,7 @@ func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error } runner.log.Infof("Starting gpbft runner") - opts := append(m.GpbftOptions(), gpbft.WithTracer(client)) + opts := append(m.GpbftOptions(), gpbft.WithTracer((*gpbftTracer)(runner))) p, err := gpbft.NewParticipant((*gpbftHost)(runner), opts...) if err != nil { return nil, xerrors.Errorf("creating participant: %w", err) @@ -53,81 +93,91 @@ func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error return runner, nil } -func (h *gpbftRunner) Run(instance uint64, ctx context.Context) (_err error) { - h.runningCtx, h.ctxCancel = context.WithCancel(ctx) - defer h.ctxCancel() +func (h *gpbftRunner) Start(ctx context.Context) (_err error) { + defer func() { + if _err != nil { + _err = multierr.Append(_err, h.Stop(ctx)) + } + }() + + startInstance := h.manifest.InitialInstance + if latest := h.certStore.Latest(); latest != nil { + startInstance = latest.GPBFTInstance + 1 + } + + messageQueue, err := h.startPubsub() + if err != nil { + return err + } - if err := h.participant.StartInstanceAt(instance, time.Now()); err != nil { + if err := h.participant.StartInstanceAt(startInstance, time.Now()); err != nil { return xerrors.Errorf("starting a participant: %w", err) } // Subscribe to new certificates. We don't bother canceling the subscription as that'll // happen automatically when the channel fills. finalityCertificates := make(chan *certs.FinalityCertificate, 4) - _, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - - defer func() { - if _err != nil { - h.log.Errorf("gpbfthost exiting: %+v", _err) - } - }() + _, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) - messageQueue := h.client.IncomingMessages() - for { - - // prioritise finality certificates and alarm delivery - select { - case c, ok := <-finalityCertificates: - if !ok { - finalityCertificates = make(chan *certs.FinalityCertificate, 4) - c, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - } - if err := h.receiveCertificate(c); err != nil { - return err + h.errgrp.Go(func() (_err error) { + defer func() { + if _err != nil && h.runningCtx.Err() == nil { + h.log.Errorf("exited GPBFT runner early: %+v", _err) } - continue - case <-h.alertTimer.C: - if err := h.participant.ReceiveAlarm(); err != nil { - return err + }() + for h.runningCtx.Err() == nil { + // prioritise finality certificates and alarm delivery + select { + case c, ok := <-finalityCertificates: + if !ok { + finalityCertificates = make(chan *certs.FinalityCertificate, 4) + c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) + } + if err := h.receiveCertificate(c); err != nil { + return err + } + continue + case <-h.alertTimer.C: + if err := h.participant.ReceiveAlarm(); err != nil { + return err + } + continue + default: } - continue - default: - } - // Handle messages, finality certificates, and alarms - select { - case c, ok := <-finalityCertificates: - if !ok { - finalityCertificates = make(chan *certs.FinalityCertificate, 4) - c, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - } - if err := h.receiveCertificate(c); err != nil { - return err - } - case <-h.alertTimer.C: - if err := h.participant.ReceiveAlarm(); err != nil { - return err - } - case msg, ok := <-messageQueue: - if !ok { - return xerrors.Errorf("incoming message queue closed") + // Handle messages, finality certificates, and alarms + select { + case c, ok := <-finalityCertificates: + if !ok { + finalityCertificates = make(chan *certs.FinalityCertificate, 4) + c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) + } + if err := h.receiveCertificate(c); err != nil { + return err + } + case <-h.alertTimer.C: + if err := h.participant.ReceiveAlarm(); err != nil { + return err + } + case msg, ok := <-messageQueue: + if !ok { + return xerrors.Errorf("incoming message queue closed") + } + if err := h.participant.ReceiveMessage(msg); err != nil { + // We silently drop failed messages because GPBFT will + // return errors for, e.g., messages from old instances. + // Given the async nature of our pubsub message handling, we + // could easily receive these. + h.log.Debugf("error when processing message: %+v", err) + } + case <-h.runningCtx.Done(): + return nil } - if err := h.participant.ReceiveMessage(msg); err != nil { - return err - } - case start := <-h.client.manifestUpdate: - // Check for manifest update in the inner loop to exit and update the - // messageQueue and start from the last instance - h.log.Debugf("Manifest update detected, refreshing message queue") - if err := h.participant.StartInstanceAt(start, time.Now()); err != nil { - return xerrors.Errorf("on maifest update: %+v", err) - } - messageQueue = h.client.IncomingMessages() - case <-ctx.Done(): - return nil - } - } + } + return nil + }) + return nil } func (h *gpbftRunner) receiveCertificate(c *certs.FinalityCertificate) error { @@ -141,10 +191,9 @@ func (h *gpbftRunner) receiveCertificate(c *certs.FinalityCertificate) error { } func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) time.Time { - manifest := h.manifest.Manifest() - ecDelay := time.Duration(manifest.ECDelayMultiplier * float64(manifest.ECPeriod)) + ecDelay := time.Duration(h.manifest.ECDelayMultiplier * float64(h.manifest.ECPeriod)) - ts, err := h.client.ec.GetTipset(h.runningCtx, cert.ECChain.Head().Key) + ts, err := h.ec.GetTipset(h.runningCtx, cert.ECChain.Head().Key) if err != nil { // this should not happen h.log.Errorf("could not get timestamp of just finalized tipset: %+v", err) @@ -155,16 +204,16 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) // we decided on something new, the tipset that got finalized can at minimum be 30-60s old. return ts.Timestamp().Add(ecDelay) } - if cert.GPBFTInstance == manifest.InitialInstance { + if cert.GPBFTInstance == h.manifest.InitialInstance { // if we are at initial instance, there is no history to look at return ts.Timestamp().Add(ecDelay) } - backoffTable := manifest.BaseDecisionBackoffTable + backoffTable := h.manifest.BaseDecisionBackoffTable attempts := 0 backoffMultipler := 1.0 // to account for the one ECDelay after which we got the base decistion - for instance := cert.GPBFTInstance - 1; instance > manifest.InitialInstance; instance-- { - cert, err := h.client.certStore.Get(h.runningCtx, instance) + for instance := cert.GPBFTInstance - 1; instance > h.manifest.InitialInstance; instance-- { + cert, err := h.certStore.Get(h.runningCtx, instance) if err != nil { h.log.Errorf("error while getting instance %d from certstore: %+v", instance, err) break @@ -186,10 +235,132 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) return ts.Timestamp().Add(backoff) } -func (h *gpbftRunner) ValidateMessage(msg *gpbft.GMessage) (gpbft.ValidatedMessage, error) { - return h.participant.ValidateMessage(msg) +// Sends a message to all other participants. +// The message's sender must be one that the network interface can sign on behalf of. +func (h *gpbftRunner) BroadcastMessage(msg *gpbft.GMessage) error { + if h.topic == nil { + return pubsub.ErrTopicClosed + } + var bw bytes.Buffer + err := msg.MarshalCBOR(&bw) + if err != nil { + return xerrors.Errorf("marshalling GMessage for broadcast: %w", err) + } + + err = h.topic.Publish(h.runningCtx, bw.Bytes()) + if err != nil { + return xerrors.Errorf("publishing message: %w", err) + } + return nil +} + +var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage + +func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, pID peer.ID, + msg *pubsub.Message) pubsub.ValidationResult { + var gmsg gpbft.GMessage + err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)) + if err != nil { + return pubsub.ValidationReject + } + + validatedMessage, err := h.participant.ValidateMessage(&gmsg) + if errors.Is(err, gpbft.ErrValidationInvalid) { + h.log.Debugf("validation error during validation: %+v", err) + return pubsub.ValidationReject + } + if err != nil { + h.log.Warnf("unknown error during validation: %+v", err) + return pubsub.ValidationIgnore + } + msg.ValidatorData = validatedMessage + return pubsub.ValidationAccept +} + +func (h *gpbftRunner) setupPubsub() error { + pubsubTopicName := h.manifest.PubSubTopic() + err := h.pubsub.RegisterTopicValidator(pubsubTopicName, h.validatePubsubMessage) + if err != nil { + return xerrors.Errorf("registering topic validator: %w", err) + } + + topic, err := h.pubsub.Join(pubsubTopicName) + if err != nil { + return xerrors.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err) + } + h.topic = topic + return nil +} + +func (h *gpbftRunner) teardownPubsub() error { + var err error + if h.topic != nil { + err = multierr.Combine( + h.topic.Close(), + h.pubsub.UnregisterTopicValidator(h.topic.String()), + ) + + if errors.Is(err, context.Canceled) { + err = nil + } + } + return err } +func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) { + if err := h.setupPubsub(); err != nil { + return nil, err + } + + sub, err := h.topic.Subscribe() + if err != nil { + return nil, xerrors.Errorf("could not subscribe to pubsub topic: %s: %w", sub.Topic(), err) + } + + messageQueue := make(chan gpbft.ValidatedMessage, 20) + h.errgrp.Go(func() error { + defer func() { + sub.Cancel() + close(messageQueue) + }() + + for h.runningCtx.Err() == nil { + var msg *pubsub.Message + msg, err := sub.Next(h.runningCtx) + if err != nil { + if h.runningCtx.Err() != nil { + return nil + } + return xerrors.Errorf("pubsub message subscription returned an error: %w", err) + } + gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage) + if !ok { + h.log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) + continue + } + select { + case messageQueue <- gmsg: + case <-h.runningCtx.Done(): + return nil + } + } + return nil + }) + return messageQueue, nil +} + +type gpbftTracer gpbftRunner + +// Log fulfills the gpbft.Tracer interface +func (h *gpbftTracer) Log(fmt string, args ...any) { + h.logWithSkip.Debugf(fmt, args...) +} + +var _ gpbft.Tracer = (*gpbftTracer)(nil) + +// gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant +type gpbftHost gpbftRunner + func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, error) { // TODO: optimize when head is way beyond base res := make([]ec.TipSet, 0, 2*gpbft.CHAIN_MAX_LEN) @@ -204,7 +375,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e panic("reorg-ed away from base, dunno what to do, reboostrap is the answer") } var err error - head, err = h.client.ec.GetParent(h.runningCtx, head) + head, err = h.ec.GetParent(h.runningCtx, head) if err != nil { return nil, xerrors.Errorf("walking back the chain: %w", err) } @@ -214,10 +385,12 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e return res[1:], nil } -func (h *gpbftRunner) Stop() { - if h.ctxCancel != nil { - h.ctxCancel() - } +func (h *gpbftRunner) Stop(_ctx context.Context) error { + h.ctxCancel() + return multierr.Combine( + h.errgrp.Wait(), + h.teardownPubsub(), + ) } // Returns inputs to the next GPBFT instance. @@ -229,33 +402,33 @@ func (h *gpbftRunner) Stop() { // ReceiveDecision (or known to be final via some other channel). func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.SupplementalData, gpbft.ECChain, error) { var baseTsk gpbft.TipSetKey - if instance == h.manifest.Manifest().InitialInstance { - ts, err := h.client.ec.GetTipsetByEpoch(h.runningCtx, - h.manifest.Manifest().BootstrapEpoch-h.manifest.Manifest().ECFinality) + if instance == h.manifest.InitialInstance { + ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, + h.manifest.BootstrapEpoch-h.manifest.ECFinality) if err != nil { return nil, nil, xerrors.Errorf("getting boostrap base: %w", err) } baseTsk = ts.Key() } else { - cert, err := h.client.certStore.Get(h.runningCtx, instance-1) + cert, err := h.certStore.Get(h.runningCtx, instance-1) if err != nil { return nil, nil, xerrors.Errorf("getting cert for previous instance(%d): %w", instance-1, err) } baseTsk = cert.ECChain.Head().Key } - baseTs, err := h.client.ec.GetTipset(h.runningCtx, baseTsk) + baseTs, err := h.ec.GetTipset(h.runningCtx, baseTsk) if err != nil { return nil, nil, xerrors.Errorf("getting base TS: %w", err) } - headTs, err := h.client.ec.GetHead(h.runningCtx) + headTs, err := h.ec.GetHead(h.runningCtx) if err != nil { return nil, nil, xerrors.Errorf("getting head TS: %w", err) } - if time.Since(headTs.Timestamp()) < h.manifest.Manifest().ECPeriod { + if time.Since(headTs.Timestamp()) < h.manifest.ECPeriod { // less than ECPeriod since production of the head // agreement is unlikely - headTs, err = h.client.ec.GetParent(h.runningCtx, headTs) + headTs, err = h.ec.GetParent(h.runningCtx, headTs) if err != nil { return nil, nil, xerrors.Errorf("getting the parent of head TS: %w", err) } @@ -270,7 +443,7 @@ func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.Supplemental Epoch: baseTs.Epoch(), Key: baseTs.Key(), } - pte, err := h.client.GetPowerTable(h.runningCtx, baseTs.Key()) + pte, err := h.ec.GetPowerTable(h.runningCtx, baseTs.Key()) if err != nil { return nil, nil, xerrors.Errorf("getting power table for base: %w", err) } @@ -284,7 +457,7 @@ func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.Supplemental suffix[i].Key = collectedChain[i].Key() suffix[i].Epoch = collectedChain[i].Epoch() - pte, err = h.client.GetPowerTable(h.runningCtx, suffix[i].Key) + pte, err = h.ec.GetPowerTable(h.runningCtx, suffix[i].Key) if err != nil { return nil, nil, xerrors.Errorf("getting power table for suffix %d: %w", i, err) } @@ -317,36 +490,36 @@ func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, var powerEntries gpbft.PowerEntries var err error - if instance < h.manifest.Manifest().InitialInstance+h.manifest.Manifest().CommiteeLookback { + if instance < h.manifest.InitialInstance+h.manifest.CommiteeLookback { //boostrap phase - ts, err := h.client.ec.GetTipsetByEpoch(h.runningCtx, h.manifest.Manifest().BootstrapEpoch-h.manifest.Manifest().ECFinality) + ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, h.manifest.BootstrapEpoch-h.manifest.ECFinality) if err != nil { return nil, nil, xerrors.Errorf("getting tipset for boostrap epoch with lookback: %w", err) } powerTsk = ts.Key() - powerEntries, err = h.client.GetPowerTable(h.runningCtx, powerTsk) + powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting power table: %w", err) } } else { - cert, err := h.client.certStore.Get(h.runningCtx, instance-h.manifest.Manifest().CommiteeLookback) + cert, err := h.certStore.Get(h.runningCtx, instance-h.manifest.CommiteeLookback) if err != nil { return nil, nil, xerrors.Errorf("getting finality certificate: %w", err) } powerTsk = cert.ECChain.Head().Key - powerEntries, err = h.client.certStore.GetPowerTable(h.runningCtx, instance) + powerEntries, err = h.certStore.GetPowerTable(h.runningCtx, instance) if err != nil { h.log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) - powerEntries, err = h.client.ec.GetPowerTable(h.runningCtx, powerTsk) + powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting power table: %w", err) } } } - ts, err := h.client.ec.GetTipset(h.runningCtx, powerTsk) + ts, err := h.ec.GetTipset(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting tipset: %w", err) } @@ -362,17 +535,15 @@ func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, // Returns the network's name (for signature separation) func (h *gpbftHost) NetworkName() gpbft.NetworkName { - return h.manifest.Manifest().NetworkName + return h.manifest.NetworkName } // Sends a message to all other participants. // The message's sender must be one that the network interface can sign on behalf of. func (h *gpbftHost) RequestBroadcast(mb *gpbft.MessageBuilder) error { - err := h.client.BroadcastMessage(h.runningCtx, mb) - if err != nil { - h.log.Errorf("broadcasting GMessage: %+v", err) - return err - } + mb.SetNetworkName(h.manifest.NetworkName) + mb.SetSigningMarshaler(h.signingMarshaller) + (h.broadcastCb)(mb) return nil } @@ -431,7 +602,7 @@ func (h *gpbftHost) saveDecision(decision *gpbft.Justification) (*certs.Finality return nil, xerrors.Errorf("certificate is invalid: %w", err) } - err = h.client.certStore.Put(h.runningCtx, cert) + err = h.certStore.Put(h.runningCtx, cert) if err != nil { return nil, xerrors.Errorf("saving ceritifcate in a store: %w", err) } @@ -443,22 +614,22 @@ func (h *gpbftHost) saveDecision(decision *gpbft.Justification) (*certs.Finality // This should usually call `Payload.MarshalForSigning(NetworkName)` except when testing as // that method is slow (computes a merkle tree that's necessary for testing). func (h *gpbftHost) MarshalPayloadForSigning(nn gpbft.NetworkName, p *gpbft.Payload) []byte { - return h.client.signingMarshaller.MarshalPayloadForSigning(nn, p) + return h.signingMarshaller.MarshalPayloadForSigning(nn, p) } // Verifies a signature for the given public key. // Implementations must be safe for concurrent use. func (h *gpbftHost) Verify(pubKey gpbft.PubKey, msg []byte, sig []byte) error { - return h.client.Verify(pubKey, msg, sig) + return h.verifier.Verify(pubKey, msg, sig) } // Aggregates signatures from a participants. func (h *gpbftHost) Aggregate(pubKeys []gpbft.PubKey, sigs [][]byte) ([]byte, error) { - return h.client.Aggregate(pubKeys, sigs) + return h.verifier.Aggregate(pubKeys, sigs) } // VerifyAggregate verifies an aggregate signature. // Implementations must be safe for concurrent use. func (h *gpbftHost) VerifyAggregate(payload []byte, aggSig []byte, signers []gpbft.PubKey) error { - return h.client.VerifyAggregate(payload, aggSig, signers) + return h.verifier.VerifyAggregate(payload, aggSig, signers) } diff --git a/manifest/dynamic_manifest.go b/manifest/dynamic_manifest.go index 4442973b..778fbfae 100644 --- a/manifest/dynamic_manifest.go +++ b/manifest/dynamic_manifest.go @@ -3,27 +3,27 @@ package manifest import ( "bytes" "context" + "encoding/json" + "errors" "fmt" - "sync" - "time" + "io" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" + logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) -var _ ManifestProvider = (*DynamicManifestProvider)(nil) - var log = logging.Logger("f3-dynamic-manifest") -const ( - ManifestPubSubTopicName = "/f3/manifests/0.0.1" - ManifestCheckTick = 5 * time.Second -) +var _ ManifestProvider = (*DynamicManifestProvider)(nil) + +const ManifestPubSubTopicName = "/f3/manifests/0.0.1" // DynamicManifestProvider is a manifest provider that allows // the manifest to be changed at runtime. @@ -31,169 +31,156 @@ type DynamicManifestProvider struct { pubsub *pubsub.PubSub ec ec.Backend manifestServerID peer.ID - manifestTopic *pubsub.Topic - // the lk guards all dynamic manifest-specific fields - lk sync.RWMutex - manifest Manifest - onManifestChange OnManifestChange - nextManifest *Manifest -} + runningCtx context.Context + errgrp *errgroup.Group + cancel context.CancelFunc -func NewDynamicManifestProvider(manifest Manifest, pubsub *pubsub.PubSub, ec ec.Backend, manifestServerID peer.ID) ManifestProvider { - return &DynamicManifestProvider{ - manifest: manifest, - pubsub: pubsub, - ec: ec, - manifestServerID: manifestServerID, - } + initialManifest *Manifest + manifestChanges chan *Manifest } -func (m *DynamicManifestProvider) Manifest() Manifest { - m.lk.RLock() - defer m.lk.RUnlock() - return m.manifest +type ManifestUpdateMessage struct { + MessageSequence uint64 + ManifestVersion uint64 + Manifest *Manifest } -func (m *DynamicManifestProvider) GpbftOptions() []gpbft.Option { - return m.manifest.GpbftOptions() -} +func (mu ManifestUpdateMessage) toManifest() *Manifest { + if mu.Manifest == nil { + return nil + } -func (m *DynamicManifestProvider) SetManifestChangeCallback(mc OnManifestChange) { - m.lk.Lock() - defer m.lk.Unlock() - m.onManifestChange = mc + // When a manifest configuration changes, a new network name is set that depends on the + // manifest version of the previous version to avoid overlapping previous configurations. + cpy := *mu.Manifest + newName := fmt.Sprintf("%s/%d", string(cpy.NetworkName), mu.ManifestVersion) + cpy.NetworkName = gpbft.NetworkName(newName) + return &cpy } -// When a manifest configuration changes, a new network name -// is set that depends on the manifest version of the previous version to avoid -// overlapping previous configurations. -func (m *DynamicManifestProvider) networkNameOnChange() gpbft.NetworkName { - return gpbft.NetworkName(string(m.manifest.NetworkName) + "/" + fmt.Sprintf("%d", m.manifest.Sequence)) +func (m ManifestUpdateMessage) Marshal() ([]byte, error) { + b, err := json.Marshal(m) + if err != nil { + return nil, xerrors.Errorf("marshaling JSON: %w", err) + } + return b, nil } -func (m *DynamicManifestProvider) Run(ctx context.Context, errCh chan error) { - if m.onManifestChange == nil { - errCh <- xerrors.New("onManifestChange is nil. Callback for manifest change required") +func (m *ManifestUpdateMessage) Unmarshal(r io.Reader) error { + err := json.NewDecoder(r).Decode(&m) + if err != nil { + return xerrors.Errorf("decoding JSON: %w", err) } - go m.handleIncomingManifests(ctx, errCh) - m.handleApplyManifest(ctx, errCh) + return nil } -func (m *DynamicManifestProvider) handleApplyManifest(ctx context.Context, errCh chan error) { - // add a timer for EC period - ticker := time.NewTicker(ManifestCheckTick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - m.lk.Lock() - if m.nextManifest != nil { - ts, err := m.ec.GetHead(ctx) - if err != nil { - log.Errorf("error fetching chain head: %+v", err) - m.lk.Unlock() - continue - } +func NewDynamicManifestProvider(initialManifest *Manifest, pubsub *pubsub.PubSub, ec ec.Backend, manifestServerID peer.ID) ManifestProvider { + ctx, cancel := context.WithCancel(context.Background()) + errgrp, ctx := errgroup.WithContext(ctx) - // if the upgrade epoch is reached or already passed. - if ts.Epoch() >= m.nextManifest.BootstrapEpoch { - log.Debugf("reached bootstrap epoch, triggering manifest change: %d", ts.Epoch()) - // update the current manifest - prevManifest := m.manifest - m.manifest = *m.nextManifest - nn := m.networkNameOnChange() - m.manifest.NetworkName = nn - m.nextManifest = nil - // trigger manifest change callback. - go m.onManifestChange(ctx, prevManifest, errCh) - m.lk.Unlock() - continue - } - } - m.lk.Unlock() - case <-ctx.Done(): - return - } + return &DynamicManifestProvider{ + pubsub: pubsub, + ec: ec, + manifestServerID: manifestServerID, + runningCtx: ctx, + errgrp: errgrp, + cancel: cancel, + initialManifest: initialManifest, + manifestChanges: make(chan *Manifest, 1), } } -// listen to manifests being broadcast through the network. -func (m *DynamicManifestProvider) handleIncomingManifests(ctx context.Context, errCh chan error) { - if err := m.setupManifestPubsub(); err != nil { - errCh <- xerrors.Errorf("setting up pubsub: %w", err) - return +func (m *DynamicManifestProvider) ManifestUpdates() <-chan *Manifest { + return m.manifestChanges +} + +func (m *DynamicManifestProvider) Stop(ctx context.Context) (_err error) { + m.cancel() + return m.errgrp.Wait() +} + +func (m *DynamicManifestProvider) Start(startCtx context.Context) (_err error) { + if err := m.registerTopicValidator(); err != nil { + return err } - manifestSub, err := m.manifestTopic.Subscribe() + manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName) if err != nil { - errCh <- xerrors.Errorf("subscribing to topic: %w", err) - return + return xerrors.Errorf("could not join manifest pubsub topic: %w", err) } -loop: - for ctx.Err() == nil { - select { - case <-ctx.Done(): - break loop + manifestSub, err := manifestTopic.Subscribe() + if err != nil { + return xerrors.Errorf("subscribing to manifest pubsub topic: %w", err) + } - default: - var msg *pubsub.Message - msg, err = manifestSub.Next(ctx) + // XXX: load the initial manifest from disk! + // And save it! + + m.manifestChanges <- ManifestUpdateMessage{ + MessageSequence: 0, + ManifestVersion: 0, + Manifest: m.initialManifest, + }.toManifest() + + m.errgrp.Go(func() error { + defer func() { + manifestSub.Cancel() + err := multierr.Combine( + manifestTopic.Close(), + m.unregisterTopicValidator(), + ) + // Pubsub likes to return context canceled errors if/when we unregister after + // closing pubsub. Ignore it. + if err != nil && !errors.Is(err, context.Canceled) { + _err = multierr.Append(_err, err) + } + if _err != nil { + log.Error("exited manifest subscription early: %+v", _err) + } + }() + + var msgSeqNumber uint64 + for m.runningCtx.Err() == nil { + msg, err := manifestSub.Next(m.runningCtx) if err != nil { - log.Errorf("manifestPubsub subscription.Next() returned an error: %+v", err) - break + if m.runningCtx.Err() == nil { + return xerrors.Errorf("error from manifest subscription: %w", err) + } + return nil } - manifest, ok := msg.ValidatorData.(*Manifest) + update, ok := msg.ValidatorData.(*ManifestUpdateMessage) if !ok { log.Errorf("invalid manifestValidatorData: %+v", msg.ValidatorData) continue } - m.acceptNextManifest(manifest) + if update.MessageSequence <= msgSeqNumber { + log.Warnf("discarded manifest update %d", update.MessageSequence) + continue + } + log.Infof("received manifest update %d", update.MessageSequence) + msgSeqNumber = update.MessageSequence + select { + case m.manifestChanges <- update.toManifest(): + case <-m.runningCtx.Done(): + return nil + } } - } + return nil + }) - manifestSub.Cancel() - if err := m.teardownManifestPubsub(); err != nil { - errCh <- xerrors.Errorf("shutting down manifest pubsub: %w", err) - } -} - -func (m *DynamicManifestProvider) teardownPubsub(topic *pubsub.Topic, topicName string) error { - return multierr.Combine( - m.pubsub.UnregisterTopicValidator(topicName), - topic.Close(), - ) + return nil } -func (m *DynamicManifestProvider) teardownManifestPubsub() error { - return m.teardownPubsub(m.manifestTopic, ManifestPubSubTopicName) -} - -// Checks if we should accept the manifest that we received through pubsub -// and sets nextManifest if it is the case -func (m *DynamicManifestProvider) acceptNextManifest(manifest *Manifest) { - m.lk.Lock() - defer m.lk.Unlock() - - if manifest.Sequence <= m.manifest.Sequence { - return - } - // TODO: Any additional logic here to determine what manifests to accept or not? - - m.nextManifest = manifest -} - -func (m *DynamicManifestProvider) setupManifestPubsub() (err error) { - topicName := ManifestPubSubTopicName +func (m *DynamicManifestProvider) registerTopicValidator() error { // using the same validator approach used for the message pubsub // to be homogeneous. var validator pubsub.ValidatorEx = func(ctx context.Context, pID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - var manifest Manifest - err := manifest.Unmarshal(bytes.NewReader(msg.Data)) + var update ManifestUpdateMessage + err := update.Unmarshal(bytes.NewReader(msg.Data)) if err != nil { return pubsub.ValidationReject } @@ -208,18 +195,17 @@ func (m *DynamicManifestProvider) setupManifestPubsub() (err error) { // Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests? // These should probably not be ValidationRejects to avoid banning in gossipsub // the centralized server in case of misconfigurations or bugs. - msg.ValidatorData = &manifest + msg.ValidatorData = &update return pubsub.ValidationAccept } - err = m.pubsub.RegisterTopicValidator(topicName, validator) + err := m.pubsub.RegisterTopicValidator(ManifestPubSubTopicName, validator) if err != nil { return xerrors.Errorf("registering topic validator: %w", err) } + return nil +} - m.manifestTopic, err = m.pubsub.Join(topicName) - if err != nil { - return xerrors.Errorf("could not join on pubsub topic: %s: %w", topicName, err) - } - return +func (m *DynamicManifestProvider) unregisterTopicValidator() error { + return m.pubsub.UnregisterTopicValidator(ManifestPubSubTopicName) } diff --git a/manifest/manifest.go b/manifest/manifest.go index b774b2ae..a8b03508 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -39,18 +39,15 @@ var ( } ) -type OnManifestChange func(ctx context.Context, prevManifest Manifest, errCh chan error) +type OnManifestChange func(ctx context.Context, prevManifest Manifest) error type ManifestProvider interface { - // Run starts any background tasks required for the operation - // of the manifest provider. - Run(context.Context, chan error) - // Returns the list of gpbft options to be used for gpbft configuration - GpbftOptions() []gpbft.Option - // Set callback to trigger to apply new manifests from F3. - SetManifestChangeCallback(OnManifestChange) - // Manifest accessor - Manifest() Manifest + // Start any background tasks required for the operation of the manifest provider. + Start(context.Context) error + // Stop stops a running manifest provider. + Stop(context.Context) error + // The channel on which manifest updates are returned. + ManifestUpdates() <-chan *Manifest } type Version string @@ -74,21 +71,12 @@ type EcConfig struct { CommiteeLookback uint64 } -// Manifest identifies the specific configuration for -// the F3 instance currently running. +// Manifest identifies the specific configuration for the F3 instance currently running. type Manifest struct { - // Sequence number of the manifest. - // This is used to identify if a new config needs to be applied - Sequence uint64 // Initial instance to used for the f3 instance InitialInstance uint64 // BootstrapEpoch from which the manifest should be applied BootstrapEpoch int64 - // Flag to determine if the peer should rebootstrap in this configuration - // change at BootstrapEpoch - ReBootstrap bool - // Specifies if the manifest should pause F3 until a new configuration arrives. - Pause bool // Network name to apply for this manifest. NetworkName gpbft.NetworkName // Updates to perform over the power table retrieved by the host @@ -100,10 +88,10 @@ type Manifest struct { *EcConfig } -func LocalDevnetManifest() Manifest { +func LocalDevnetManifest() *Manifest { rng := make([]byte, 4) _, _ = rand.Read(rng) - m := Manifest{ + m := &Manifest{ NetworkName: gpbft.NetworkName(fmt.Sprintf("localnet-%X", rng)), BootstrapEpoch: 1000, EcConfig: DefaultEcConfig, @@ -112,7 +100,7 @@ func LocalDevnetManifest() Manifest { } // Version that uniquely identifies the manifest. -func (m Manifest) Version() (Version, error) { +func (m *Manifest) Version() (Version, error) { b, err := json.Marshal(m) if err != nil { return "", xerrors.Errorf("computing manifest version: %w", err) @@ -123,7 +111,7 @@ func (m Manifest) Version() (Version, error) { // Marshal the manifest into JSON // We use JSON because we need to serialize a float and time.Duration // and the cbor serializer we use do not support these types yet. -func (m Manifest) Marshal() ([]byte, error) { +func (m *Manifest) Marshal() ([]byte, error) { b, err := json.Marshal(m) if err != nil { return nil, xerrors.Errorf("marshaling JSON: %w", err) @@ -139,11 +127,11 @@ func (m *Manifest) Unmarshal(r io.Reader) error { return nil } -func (m Manifest) DatastorePrefix() datastore.Key { +func (m *Manifest) DatastorePrefix() datastore.Key { return datastore.NewKey("/f3/" + string(m.NetworkName)) } -func (m Manifest) PubSubTopic() string { +func (m *Manifest) PubSubTopic() string { return PubSubTopicFromNetworkName(m.NetworkName) } @@ -151,7 +139,7 @@ func PubSubTopicFromNetworkName(nn gpbft.NetworkName) string { return "/f3/granite/0.0.1/" + string(nn) } -func (m Manifest) GpbftOptions() []gpbft.Option { +func (m *Manifest) GpbftOptions() []gpbft.Option { var opts []gpbft.Option if m.GpbftConfig == nil { diff --git a/manifest/manifest_sender.go b/manifest/manifest_sender.go index 75cc21e9..d5fbb3a4 100644 --- a/manifest/manifest_sender.go +++ b/manifest/manifest_sender.go @@ -20,13 +20,14 @@ type ManifestSender struct { interval time.Duration // lock to that guards the update of the manifest. - lk sync.RWMutex - manifest Manifest - // Used to stop publishing manifests - cancel context.CancelFunc + lk sync.Mutex + manifest *Manifest + version uint64 + msgSeq uint64 + paused bool } -func NewManifestSender(h host.Host, pubsub *pubsub.PubSub, firstManifest Manifest, pubishInterval time.Duration) (*ManifestSender, error) { +func NewManifestSender(h host.Host, pubsub *pubsub.PubSub, firstManifest *Manifest, pubishInterval time.Duration) (*ManifestSender, error) { topicName := ManifestPubSubTopicName m := &ManifestSender{ manifest: firstManifest, @@ -52,45 +53,67 @@ func (m *ManifestSender) PeerInfo() peer.AddrInfo { return m.h.Peerstore().PeerInfo(m.h.ID()) } -func (m *ManifestSender) Start(ctx context.Context) { - if m.cancel != nil { - log.Warn("manifest sender already running") - return - } - ctx, m.cancel = context.WithCancel(ctx) - +func (m *ManifestSender) Run(ctx context.Context) error { ticker := time.NewTicker(m.interval) - for { + for ctx.Err() == nil { select { case <-ticker.C: err := m.publishManifest(ctx) if err != nil { - log.Error("error publishing manifest: %w", err) + if ctx.Err() != nil { + return nil + } + return xerrors.Errorf("error publishing manifest: %w", err) } case <-ctx.Done(): - return + return nil } } + return nil } func (m *ManifestSender) publishManifest(ctx context.Context) error { - m.lk.RLock() - defer m.lk.RUnlock() + m.lk.Lock() + defer m.lk.Unlock() - b, err := m.manifest.Marshal() + update := ManifestUpdateMessage{ + MessageSequence: m.msgSeq, + ManifestVersion: m.version, + } + if !m.paused { + update.Manifest = m.manifest + } + + b, err := update.Marshal() if err != nil { return err } return m.manifestTopic.Publish(ctx, b) } -func (m *ManifestSender) UpdateManifest(manifest Manifest) { +func (m *ManifestSender) UpdateManifest(manifest *Manifest) { m.lk.Lock() m.manifest = manifest + m.msgSeq++ + m.version++ + m.paused = false + m.lk.Unlock() +} + +func (m *ManifestSender) Pause() { + m.lk.Lock() + if !m.paused { + m.paused = true + m.msgSeq++ + } m.lk.Unlock() } -func (m *ManifestSender) Stop() { - m.cancel() - m.cancel = nil +func (m *ManifestSender) Resume() { + m.lk.Lock() + if m.paused { + m.paused = false + m.msgSeq++ + } + m.lk.Unlock() } diff --git a/manifest/manifest_test.go b/manifest/manifest_test.go index 0c2909e7..d58de092 100644 --- a/manifest/manifest_test.go +++ b/manifest/manifest_test.go @@ -15,9 +15,7 @@ import ( ) var base manifest.Manifest = manifest.Manifest{ - Sequence: 0, BootstrapEpoch: 10, - ReBootstrap: true, NetworkName: gpbft.NetworkName("test"), PowerUpdate: []certs.PowerTableDelta{ { diff --git a/manifest/static.go b/manifest/static.go index f3f50f56..36e2f54b 100644 --- a/manifest/static.go +++ b/manifest/static.go @@ -2,8 +2,6 @@ package manifest import ( "context" - - "github.com/filecoin-project/go-f3/gpbft" ) var _ ManifestProvider = (*StaticManifestProvider)(nil) @@ -11,21 +9,15 @@ var _ ManifestProvider = (*StaticManifestProvider)(nil) // Static manifest provider that doesn't allow any changes // in runtime to the initial manifest set in the provider type StaticManifestProvider struct { - manifest Manifest -} - -func NewStaticManifestProvider(m Manifest) ManifestProvider { - return &StaticManifestProvider{manifest: m} + ch <-chan *Manifest } -func (m *StaticManifestProvider) GpbftOptions() []gpbft.Option { - return m.manifest.GpbftOptions() +func NewStaticManifestProvider(m *Manifest) ManifestProvider { + ch := make(chan *Manifest, 1) + ch <- m + return &StaticManifestProvider{ch: ch} } -func (m *StaticManifestProvider) Manifest() Manifest { - return m.manifest -} - -func (m *StaticManifestProvider) Run(context.Context, chan error) {} - -func (m *StaticManifestProvider) SetManifestChangeCallback(OnManifestChange) {} +func (m *StaticManifestProvider) Start(context.Context) error { return nil } +func (m *StaticManifestProvider) Stop(context.Context) error { return nil } +func (m *StaticManifestProvider) ManifestUpdates() <-chan *Manifest { return m.ch } diff --git a/store.go b/store.go new file mode 100644 index 00000000..a2f94e67 --- /dev/null +++ b/store.go @@ -0,0 +1,32 @@ +package f3 + +import ( + "context" + + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/ec" + "github.com/filecoin-project/go-f3/manifest" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "golang.org/x/xerrors" +) + +// openCertstore opens the certificate store for the specific manifest (namespaced by the network +// name). +func openCertstore(ctx context.Context, ec ec.Backend, ds datastore.Datastore, m *manifest.Manifest) (*certstore.Store, error) { + + ds = namespace.Wrap(ds, m.DatastorePrefix()) + + ts, err := ec.GetTipsetByEpoch(ctx, m.BootstrapEpoch-m.ECFinality) + if err != nil { + return nil, xerrors.Errorf("getting initial power tipset: %w", err) + } + + initialPowerTable, err := ec.GetPowerTable(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting initial power table: %w", err) + } + + return certstore.OpenOrCreateStore(ctx, ds, m.InitialInstance, initialPowerTable) +} diff --git a/test/f3_test.go b/test/f3_test.go index 830b721b..f7fb55cd 100644 --- a/test/f3_test.go +++ b/test/f3_test.go @@ -20,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -31,134 +32,89 @@ const ( var log = logging.Logger("f3-testing") func TestSimpleF3(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, false) - env.Connect(ctx) - env.run(ctx) - env.waitForInstanceNumber(ctx, 5, 10*time.Second, false) - env.stop() + env.connectAll() + env.start() + env.waitForInstanceNumber(5, 10*time.Second, false) } func TestDynamicManifest_WithoutChanges(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 5, 10*time.Second, false) + env.waitForInstanceNumber(5, 10*time.Second, false) // no changes in manifest require.Equal(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() } func TestDynamicManifest_WithRebootstrap(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) + env.waitForInstanceNumber(3, 15*time.Second, false) + prevInstance := env.nodes[0].currentGpbftInstance() env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.ReBootstrap = true + env.addPowerDeltaForParticipants(&env.manifest, []gpbft.ActorID{2, 3}, big.NewInt(1), false) env.updateManifest() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it rebootstrapped and the number of instances is below prevInstance - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) < prevInstance) - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) + require.True(t, env.nodes[0].currentGpbftInstance() < prevInstance) + env.waitForInstanceNumber(3, 15*time.Second, false) require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() -} - -func TestDynamicManifest_WithoutRebootstrap(t *testing.T) { - ctx := context.Background() - env := newTestEnvironment(t, 2, true) - - env.Connect(ctx) - env.run(ctx) - - prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) - - env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.ReBootstrap = false - env.addPowerDeltaForParticipants(ctx, &env.manifest, []gpbft.ActorID{2, 3}, big.NewInt(1), false) - env.updateManifest() - - env.waitForManifestChange(prev, 15*time.Second, []*testNode{env.nodes[0], env.nodes[1]}) - // check that the runner continued without rebootstrap - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) >= prevInstance) - env.waitForInstanceNumber(ctx, prevInstance+10, 15*time.Second, false) - require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) - env.requireEqualManifests(false) // check that the power table is updated - ts, err := env.ec.GetTipsetByEpoch(ctx, int64(env.nodes[0].CurrentGpbftInstance(t, ctx))) + ts, err := env.ec.GetTipsetByEpoch(env.testCtx, int64(env.nodes[0].currentGpbftInstance())) require.NoError(t, err) - pt, err := env.nodes[0].f3.GetPowerTable(ctx, ts.Key()) + pt, err := env.nodes[0].f3.GetPowerTable(env.testCtx, ts.Key()) require.NoError(t, err) require.Equal(t, len(pt), 4) - env.stop() } func TestDynamicManifest_WithPauseAndRebootstrap(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) + env.waitForInstanceNumber(3, 15*time.Second, false) + prevInstance := env.nodes[0].currentGpbftInstance() - env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.Pause = true - env.manifest.ReBootstrap = true - env.updateManifest() + env.manifestSender.Pause() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it paused - require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) - env.requireEqualManifests(false) env.waitForNodesStoppped(10 * time.Second) // New manifest with sequence 2 to start again F3 prev = env.nodes[0].f3.Manifest() env.manifest.BootstrapEpoch = 956 - env.manifest.Pause = false - env.manifest.Sequence = 2 - env.manifest.ReBootstrap = true env.updateManifest() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it rebootstrapped and the number of instances is below prevInstance - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) < prevInstance) - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) + require.True(t, env.nodes[0].currentGpbftInstance() < prevInstance) + env.waitForInstanceNumber(3, 15*time.Second, false) require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() } var base manifest.Manifest = manifest.Manifest{ - Sequence: 0, BootstrapEpoch: 950, InitialInstance: 0, NetworkName: gpbft.NetworkName("f3-test"), @@ -180,14 +136,14 @@ var base manifest.Manifest = manifest.Manifest{ } type testNode struct { - h host.Host - f3 *f3.F3 - errCh <-chan error + e *testEnv + h host.Host + f3 *f3.F3 } -func (n *testNode) CurrentGpbftInstance(t *testing.T, ctx context.Context) uint64 { - c, err := n.f3.GetLatestCert(ctx) - require.NoError(t, err) +func (n *testNode) currentGpbftInstance() uint64 { + c, err := n.f3.GetLatestCert(n.e.testCtx) + require.NoError(n.e.t, err) if c == nil { return 0 } @@ -196,6 +152,8 @@ func (n *testNode) CurrentGpbftInstance(t *testing.T, ctx context.Context) uint6 type testEnv struct { t *testing.T + errgrp *errgroup.Group + testCtx context.Context signingBackend *signing.FakeBackend nodes []*testNode ec *ec.FakeEC @@ -207,25 +165,29 @@ type testEnv struct { // signals the update to the latest manifest in the environment. func (e *testEnv) updateManifest() { - e.manifestSender.UpdateManifest(e.manifest) + m := e.manifest // copy because we mutate it locally. + e.manifestSender.UpdateManifest(&m) } -func (e *testEnv) newHeadEveryPeriod(ctx context.Context, period time.Duration) { - // set a timer that sets a new head every period - go func() { - for { +func (e *testEnv) newHeadEveryPeriod(period time.Duration) { + e.errgrp.Go(func() error { + // set a timer that sets a new head every period + ticker := time.NewTicker(period) + defer ticker.Stop() + for e.testCtx.Err() == nil { select { - case <-ctx.Done(): - return - case <-time.After(period): + case <-e.testCtx.Done(): + return nil + case <-ticker.C: e.ec.SetCurrentHead(e.ec.GetCurrentHead() + 1) // fmt.Println("Setting new head", e.ec.GetCurrentHead()) } } - }() + return nil + }) } -func (e *testEnv) addPowerDeltaForParticipants(ctx context.Context, m *manifest.Manifest, participants []gpbft.ActorID, power *big.Int, runNodes bool) { +func (e *testEnv) addPowerDeltaForParticipants(m *manifest.Manifest, participants []gpbft.ActorID, power *big.Int, runNodes bool) { for _, n := range participants { nodeLen := len(e.nodes) newNode := false @@ -253,54 +215,65 @@ func (e *testEnv) addPowerDeltaForParticipants(ctx context.Context, m *manifest. require.NoError(e.t, err) } // run - e.runNode(ctx, e.nodes[nodeLen-1]) + e.startNode(nodeLen - 1) } } } // waits for all nodes to reach a specific instance number. // If the `strict` flag is enabled the check also applies to the non-running nodes -func (e *testEnv) waitForInstanceNumber(ctx context.Context, instanceNumber uint64, timeout time.Duration, strict bool) { +func (e *testEnv) waitForInstanceNumber(instanceNumber uint64, timeout time.Duration, strict bool) { require.Eventually(e.t, func() bool { - reached := 0 - for i := 0; i < len(e.nodes); i++ { + for _, n := range e.nodes { // nodes that are not running are not required to reach the instance // (it will actually panic if we try to fetch it because there is no // runner initialized) - if !e.nodes[i].f3.IsRunning() && !strict { - reached++ - } else if e.nodes[i].CurrentGpbftInstance(e.t, ctx) >= instanceNumber && e.nodes[i].f3.IsRunning() { - reached++ + if !n.f3.IsRunning() { + if strict { + return false + } + continue } - if reached >= len(e.nodes) { - return true + if n.currentGpbftInstance() < instanceNumber { + return false } } - return false + return true }, timeout, e.manifest.ECPeriod) } -func (e *testEnv) waitForManifestChange(prev manifest.Manifest, timeout time.Duration, nodes []*testNode) { +func (e *testEnv) waitForManifestChange(prev *manifest.Manifest, timeout time.Duration) { require.Eventually(e.t, func() bool { - reached := 0 - for i := 0; i < len(e.nodes); i++ { - for i := 0; i < len(nodes); i++ { - v1, _ := nodes[i].f3.Manifest().Version() - v2, _ := prev.Version() - if v1 != v2 { - reached++ - } - if reached == len(nodes) { - return true - } + oldVersion, err := prev.Version() + require.NoError(e.t, err) + for _, n := range e.nodes { + if !n.f3.IsRunning() { + continue + } + + m := n.f3.Manifest() + if m == nil { + return false + } + + v, err := m.Version() + require.NoError(e.t, err) + if v == oldVersion { + return false } } - return false + return true }, timeout, ManifestSenderTimeout) } func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { - env := testEnv{t: t, net: mocknet.New()} + ctx, cancel := context.WithCancel(context.Background()) + grp, ctx := errgroup.WithContext(ctx) + env := testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()} + env.t.Cleanup(func() { + cancel() + require.NoError(env.t, env.errgrp.Wait()) + }) // populate manifest m := base @@ -322,7 +295,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { var manifestServer peer.ID if dynamicManifest { - env.newManifestSender(context.Background()) + env.newManifestSender() manifestServer = env.manifestSender.SenderID() } @@ -334,7 +307,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { } func (e *testEnv) initNode(i int, manifestServer peer.ID) { - n, err := e.newF3Instance(context.Background(), i, manifestServer) + n, err := e.newF3Instance(i, manifestServer) require.NoError(e.t, err) e.nodes = append(e.nodes, n) } @@ -378,19 +351,10 @@ func (e *testEnv) waitForNodesStoppped(timeout time.Duration) { e.waitFor(f, timeout) } -func (e *testEnv) runNode(ctx context.Context, n *testNode) { - errCh := make(chan error) - n.errCh = errCh - go func() { - err := n.f3.Run(ctx) - errCh <- err - }() -} - -func (e *testEnv) run(ctx context.Context) { +func (e *testEnv) start() { // Start the nodes - for _, n := range e.nodes { - e.runNode(ctx, n) + for i := range e.nodes { + e.startNode(i) } // wait for nodes to initialize @@ -398,40 +362,22 @@ func (e *testEnv) run(ctx context.Context) { // If it exists, start the manifest sender if e.manifestSender != nil { - go func() { - e.manifestSender.Start(ctx) - }() + e.errgrp.Go(func() error { return e.manifestSender.Run(e.testCtx) }) } // start creating new heads every ECPeriod - e.newHeadEveryPeriod(ctx, e.manifest.ECPeriod) - e.monitorNodesError(ctx) + e.newHeadEveryPeriod(e.manifest.ECPeriod) } -func (e *testEnv) stop() { - for _, n := range e.nodes { - n.f3.Stop() - // close errCh? - } -} - -func (e *testEnv) monitorNodesError(ctx context.Context) { - for _, n := range e.nodes { - go func(n *testNode) { - select { - case <-ctx.Done(): - return - case err := <-n.errCh: - if ctx.Err() != nil { - return - } - require.NoError(e.t, err) - } - }(n) - } +func (e *testEnv) startNode(i int) { + n := e.nodes[i] + require.NoError(e.t, n.f3.Start(e.testCtx)) + e.t.Cleanup(func() { + require.NoError(e.t, n.f3.Stop(context.Background())) + }) } -func (e *testEnv) Connect(ctx context.Context) { +func (e *testEnv) connectAll() { for i, n := range e.nodes { for j := i + 1; j < len(e.nodes); j++ { _, err := e.net.LinkPeers(n.h.ID(), e.nodes[j].h.ID()) @@ -454,24 +400,25 @@ func (e *testEnv) Connect(ctx context.Context) { } } -func (e *testEnv) newManifestSender(ctx context.Context) { +func (e *testEnv) newManifestSender() { h, err := e.net.GenPeer() require.NoError(e.t, err) - ps, err := pubsub.NewGossipSub(ctx, h) + ps, err := pubsub.NewGossipSub(e.testCtx, h) require.NoError(e.t, err) - e.manifestSender, err = manifest.NewManifestSender(h, ps, e.manifest, ManifestSenderTimeout) + m := e.manifest // copy because we mutate this + e.manifestSender, err = manifest.NewManifestSender(h, ps, &m, ManifestSenderTimeout) require.NoError(e.t, err) } -func (e *testEnv) newF3Instance(ctx context.Context, id int, manifestServer peer.ID) (*testNode, error) { +func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, error) { h, err := e.net.GenPeer() if err != nil { return nil, xerrors.Errorf("creating libp2p host: %w", err) } - ps, err := pubsub.NewGossipSub(ctx, h) + ps, err := pubsub.NewGossipSub(e.testCtx, h) if err != nil { return nil, xerrors.Errorf("creating gossipsub: %w", err) } @@ -491,62 +438,58 @@ func (e *testEnv) newF3Instance(ctx context.Context, id int, manifestServer peer return nil, xerrors.Errorf("creating a datastore: %w", err) } + m := e.manifest // copy because we mutate this var mprovider manifest.ManifestProvider if manifestServer != peer.ID("") { - mprovider = manifest.NewDynamicManifestProvider(e.manifest, ps, e.ec, manifestServer) + mprovider = manifest.NewDynamicManifestProvider(&m, ps, e.ec, manifestServer) } else { - mprovider = manifest.NewStaticManifestProvider(e.manifest) + mprovider = manifest.NewStaticManifestProvider(&m) } e.signingBackend.Allow(int(id)) - module, err := f3.New(ctx, mprovider, ds, h, ps, + module, err := f3.New(e.testCtx, mprovider, ds, h, ps, e.signingBackend, e.ec, log, nil) if err != nil { return nil, xerrors.Errorf("creating module: %w", err) } - mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module)) - go runMessageSubscription(ctx, module, gpbft.ActorID(id), e.signingBackend) - return &testNode{h: h, f3: module}, nil + e.errgrp.Go(func() error { + return runMessageSubscription(e.testCtx, module, gpbft.ActorID(id), e.signingBackend) + }) + + return &testNode{e: e, h: h, f3: module}, nil } // TODO: This code is copy-pasta from cmd/f3/run.go, consider taking it out into a shared testing lib. // We could do the same to the F3 test instantiation -func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) { - for { +func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error { + ch := make(chan *gpbft.MessageBuilder, 4) + module.SubscribeForMessagesToSign(ch) + for ctx.Err() == nil { select { - case <-ctx.Done(): - return - default: - } - - ch := make(chan *gpbft.MessageBuilder, 4) - module.SubscribeForMessagesToSign(ch) - inner: - for { - select { - case mb, ok := <-ch: - if !ok { - // the broadcast bus kicked us out - log.Infof("lost message bus subscription, retrying") - break inner - } - signatureBuilder, err := mb.PrepareSigningInputs(actorID) - if err != nil { - log.Errorf("preparing signing inputs: %+v", err) - } - // signatureBuilder can be sent over RPC - payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) - if err != nil { - log.Errorf("signing message: %+v", err) - } - // signatureBuilder and signatures can be returned back over RPC - module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) - case <-ctx.Done(): - return + case mb, ok := <-ch: + if !ok { + // the broadcast bus kicked us out + log.Infof("lost message bus subscription, retrying") + ch = make(chan *gpbft.MessageBuilder, 4) + module.SubscribeForMessagesToSign(ch) + continue + } + signatureBuilder, err := mb.PrepareSigningInputs(actorID) + if err != nil { + return xerrors.Errorf("preparing signing inputs: %w", err) } + // signatureBuilder can be sent over RPC + payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) + if err != nil { + return xerrors.Errorf("signing message: %w", err) + } + // signatureBuilder and signatures can be returned back over RPC + module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) + case <-ctx.Done(): + return nil } - } + return nil }