diff --git a/cmd/f3/manifest.go b/cmd/f3/manifest.go index 735fe228..7143aece 100644 --- a/cmd/f3/manifest.go +++ b/cmd/f3/manifest.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -137,6 +138,8 @@ var manifestServeCmd = cli.Command{ return xerrors.Errorf("initializing libp2p host: %w", err) } + defer func() { _ = host.Close() }() + // Connect to all bootstrap addresses once. This should be sufficient to build // the pubsub mesh, if not then we need to periodically re-connect and/or pull in // the Lotus bootstrapping, which includes DHT connectivity. @@ -171,15 +174,15 @@ var manifestServeCmd = cli.Command{ } manifestPath := c.String("manifest") - loadManifestAndVersion := func() (manifest.Manifest, manifest.Version, error) { + loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) { m, err := loadManifest(manifestPath) if err != nil { - return manifest.Manifest{}, "", xerrors.Errorf("loading manifest: %w", err) + return nil, "", xerrors.Errorf("loading manifest: %w", err) } version, err := m.Version() if err != nil { - return manifest.Manifest{}, "", xerrors.Errorf("versioning manifest: %w", err) + return nil, "", xerrors.Errorf("versioning manifest: %w", err) } return m, version, nil } @@ -200,46 +203,49 @@ var manifestServeCmd = cli.Command{ } _, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion) - go func() { - sender.Start(c.Context) - }() - defer func() { - sender.Stop() - _ = host.Close() - }() - - checkTicker := time.NewTicker(c.Duration("checkInterval")) - for c.Context.Err() == nil { - select { - case <-c.Context.Done(): - return c.Context.Err() - case <-checkTicker.C: - if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil { - _, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err) - } else if manifestVersion != nextManifestVersion { - _, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion) - sender.UpdateManifest(nextManifest) - manifestVersion = nextManifestVersion + checkInterval := c.Duration("checkInterval") + + errgrp, ctx := errgroup.WithContext(c.Context) + errgrp.Go(func() error { return sender.Run(ctx) }) + errgrp.Go(func() error { + checkTicker := time.NewTicker(checkInterval) + defer checkTicker.Stop() + + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return nil + case <-checkTicker.C: + if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil { + _, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err) + } else if manifestVersion != nextManifestVersion { + _, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion) + sender.UpdateManifest(nextManifest) + manifestVersion = nextManifestVersion + } } } - } - return nil + + return nil + }) + + return errgrp.Wait() }, } -func getManifest(c *cli.Context) (manifest.Manifest, error) { +func getManifest(c *cli.Context) (*manifest.Manifest, error) { manifestPath := c.String("manifest") return loadManifest(manifestPath) } -func loadManifest(path string) (manifest.Manifest, error) { +func loadManifest(path string) (*manifest.Manifest, error) { f, err := os.Open(path) if err != nil { - return manifest.Manifest{}, xerrors.Errorf("opening %s to load manifest: %w", path, err) + return nil, xerrors.Errorf("opening %s to load manifest: %w", path, err) } defer f.Close() var m manifest.Manifest err = m.Unmarshal(f) - return m, err + return &m, err } diff --git a/cmd/f3/run.go b/cmd/f3/run.go index 2355212c..42804e28 100644 --- a/cmd/f3/run.go +++ b/cmd/f3/run.go @@ -121,20 +121,18 @@ var runCmd = cli.Command{ return xerrors.Errorf("creating module: %w", err) } - mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module)) go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend) - return module.Run(ctx) + if err := module.Start(ctx); err != nil { + return nil + } + <-ctx.Done() + return module.Stop(context.Background()) }, } func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) { - for { - select { - case <-ctx.Done(): - return - default: - } + for ctx.Err() == nil { ch := make(chan *gpbft.MessageBuilder, 4) module.SubscribeForMessagesToSign(ch) diff --git a/ec/powerdelta.go b/ec/powerdelta.go new file mode 100644 index 00000000..10d3bfe7 --- /dev/null +++ b/ec/powerdelta.go @@ -0,0 +1,29 @@ +package ec + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/gpbft" +) + +func WithModifiedPower(backend Backend, delta []certs.PowerTableDelta) Backend { + return &withModifiedPower{ + Backend: backend, + delta: delta, + } +} + +type withModifiedPower struct { + Backend + delta []certs.PowerTableDelta +} + +func (b *withModifiedPower) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { + pt, err := b.Backend.GetPowerTable(ctx, ts) + if err != nil { + return nil, fmt.Errorf("getting power table: %w", err) + } + return certs.ApplyPowerTableDiffs(pt, b.delta) +} diff --git a/f3.go b/f3.go index 9b9fb411..3ffda511 100644 --- a/f3.go +++ b/f3.go @@ -1,141 +1,74 @@ package f3 import ( - "bytes" "context" - "errors" "sync" "time" - "github.com/Kubuxu/go-broadcast" "github.com/filecoin-project/go-f3/certs" "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/Kubuxu/go-broadcast" + "github.com/ipfs/go-datastore" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) type F3 struct { - // mLk protects the runner and certStore - mLk sync.Mutex - runner *gpbftRunner - // certStore is nil until Run is called on the F3 - csLk sync.Mutex - certStore *certstore.Store - - cancelCtx context.CancelFunc - host host.Host - ds datastore.Datastore - ec ec.Backend - log Logger - - // psLk protects pubsub. We need an independent one - // due to its use in Broadcast - psLk sync.Mutex - pubsub *pubsub.PubSub - msgSub *pubsub.Subscription - - client *client -} - -type client struct { - // certStore is nil until Run is called on the F3 - certStore *certstore.Store - manifest manifest.ManifestProvider - ec ec.Backend - + verifier gpbft.Verifier signingMarshaller gpbft.SigningMarshaler + manifestProvider manifest.ManifestProvider busBroadcast broadcast.Channel[*gpbft.MessageBuilder] - gpbft.Verifier - logger Logger - loggerWithSkip Logger - - // Populated after Run is called - messageQueue <-chan gpbft.ValidatedMessage - topic *pubsub.Topic - - // Notifies manifest updates - manifestUpdate <-chan uint64 - // Triggers the cancellation of the incoming message - // routine to avoid delivering any outstanding messages. - incomingCancel func() -} - -func (mc *client) BroadcastMessage(ctx context.Context, mb *gpbft.MessageBuilder) error { - mb.SetNetworkName(mc.manifest.Manifest().NetworkName) - mb.SetSigningMarshaler(mc.signingMarshaller) - mc.busBroadcast.Publish(mb) - return nil -} - -func (mc *client) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { - // Apply power table deltas from the manifest - pt, err := mc.ec.GetPowerTable(ctx, ts) - if err != nil { - return nil, xerrors.Errorf("getting power table: %w", err) - } - return certs.ApplyPowerTableDiffs(pt, mc.manifest.Manifest().PowerUpdate) -} - -func (mc *client) IncomingMessages() <-chan gpbft.ValidatedMessage { - return mc.messageQueue -} - -var _ gpbft.Tracer = (*client)(nil) + host host.Host + ds datastore.Datastore + ec ec.Backend + log Logger + pubsub *pubsub.PubSub -// Log fulfills the gpbft.Tracer interface -func (mc *client) Log(fmt string, args ...any) { - mc.loggerWithSkip.Debugf(fmt, args...) -} + runningCtx context.Context + cancelCtx context.CancelFunc + errgrp *errgroup.Group -func (mc *client) Logger() Logger { - return mc.logger + mu sync.Mutex + cs *certstore.Store + manifest *manifest.Manifest + runner *gpbftRunner } // New creates and setups f3 with libp2p // The context is used for initialization not runtime. // signingMarshaller can be nil for default SigningMarshaler -func New(ctx context.Context, manifest manifest.ManifestProvider, ds datastore.Datastore, h host.Host, +func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore.Datastore, h host.Host, ps *pubsub.PubSub, verif gpbft.Verifier, ec ec.Backend, log Logger, signingMarshaller gpbft.SigningMarshaler) (*F3, error) { - ds = namespace.Wrap(ds, manifest.Manifest().DatastorePrefix()) - loggerWithSkip := log - if zapLogger, ok := log.(*logging.ZapEventLogger); ok { - loggerWithSkip = logging.WithSkip(zapLogger, 1) - } + runningCtx, cancel := context.WithCancel(context.Background()) + errgrp, runningCtx := errgroup.WithContext(runningCtx) + if signingMarshaller == nil { signingMarshaller = gpbft.DefaultSigningMarshaller } - m := F3{ - ds: ds, - host: h, - pubsub: ps, - ec: ec, - log: log, - - client: &client{ - ec: ec, - manifest: manifest, - Verifier: verif, - logger: log, - loggerWithSkip: loggerWithSkip, - signingMarshaller: signingMarshaller, - }, - } - - return &m, nil + return &F3{ + verifier: verif, + signingMarshaller: signingMarshaller, + manifestProvider: manifest, + host: h, + ds: ds, + ec: ec, + log: log, + pubsub: ps, + runningCtx: runningCtx, + cancelCtx: cancel, + errgrp: errgrp, + }, nil } // SubscribeForMessagesToSign is used to subscribe to the message broadcast channel. @@ -145,412 +78,197 @@ func New(ctx context.Context, manifest manifest.ManifestProvider, ds datastore.D // To stop subscribing, either the closer function can be used, or the channel can be abandoned. // Passing a channel multiple times to the Subscribe function will result in a panic. func (m *F3) SubscribeForMessagesToSign(ch chan<- *gpbft.MessageBuilder) (closer func()) { - _, closer = m.client.busBroadcast.Subscribe(ch) + _, closer = m.busBroadcast.Subscribe(ch) return closer } -func (m *F3) Manifest() manifest.Manifest { - return m.client.manifest.Manifest() +func (m *F3) Manifest() *manifest.Manifest { + m.mu.Lock() + defer m.mu.Unlock() + return m.manifest } func (m *F3) Broadcast(ctx context.Context, signatureBuilder *gpbft.SignatureBuilder, msgSig []byte, vrf []byte) { msg := signatureBuilder.Build(msgSig, vrf) - var bw bytes.Buffer - err := msg.MarshalCBOR(&bw) - if err != nil { - m.log.Errorf("marshalling GMessage: %+v", err) - return - } - - m.psLk.Lock() - if m.client.topic != nil { - err = m.client.topic.Publish(ctx, bw.Bytes()) - if err != nil { - m.log.Errorf("publishing on topic: %w", err) - } - } - m.psLk.Unlock() -} - -func (m *F3) setCertStore(cs *certstore.Store) { - m.csLk.Lock() - defer m.csLk.Unlock() - m.certStore = cs - m.client.certStore = cs -} + m.mu.Lock() + runner := m.runner + m.mu.Unlock() -func (m *F3) setupPubsub(runner *gpbftRunner) error { - pubsubTopicName := m.Manifest().PubSubTopic() - - // explicit type to typecheck the anonymous function defintion - // a bit ugly but I don't want gpbftRunner to know about pubsub - var validator pubsub.ValidatorEx = func(ctx context.Context, pID peer.ID, - msg *pubsub.Message) pubsub.ValidationResult { - - var gmsg gpbft.GMessage - err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)) - if err != nil { - return pubsub.ValidationReject - } - validatedMessage, err := runner.ValidateMessage(&gmsg) - if errors.Is(err, gpbft.ErrValidationInvalid) { - m.log.Debugf("validation error during validation: %+v", err) - return pubsub.ValidationReject - } - if err != nil { - m.log.Warnf("unknown error during validation: %+v", err) - return pubsub.ValidationIgnore - } - msg.ValidatorData = validatedMessage - return pubsub.ValidationAccept - } - - err := m.pubsub.RegisterTopicValidator(pubsubTopicName, validator) - if err != nil { - return xerrors.Errorf("registering topic validator: %w", err) + if runner == nil { + m.log.Error("attempted to broadcast message while F3 wasn't running") + return } - topic, err := m.pubsub.Join(pubsubTopicName) + err := runner.BroadcastMessage(msg) if err != nil { - return xerrors.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err) + m.log.Errorf("failed to broadcast message: %+v", err) } - m.client.topic = topic - return nil } -func (m *F3) teardownPubsub(manifest manifest.Manifest) (err error) { - m.psLk.Lock() - defer m.psLk.Unlock() - if m.client.topic != nil { - m.msgSub.Cancel() - err = multierr.Combine( - m.pubsub.UnregisterTopicValidator(manifest.PubSubTopic()), - m.client.topic.Close(), - ) - m.client.topic = nil - } - return err -} - -func (m *F3) initGpbftRunner(ctx context.Context) error { - m.mLk.Lock() - defer m.mLk.Unlock() - - cs, err := certstore.OpenStore(ctx, m.ds) - if err == nil { - m.setCertStore(cs) - } else if errors.Is(err, certstore.ErrNotInitialized) { - err := m.boostrap(ctx) - if err != nil { - return xerrors.Errorf("failed to boostrap: %w", err) - } - } else { - return xerrors.Errorf("opening certstore: %w", err) - } - - m.runner, err = newRunner(m.client.manifest, m.client) - if err != nil { - return xerrors.Errorf("creating gpbft host: %w", err) - } - - m.psLk.Lock() - if err := m.setupPubsub(m.runner); err != nil { - return xerrors.Errorf("setting up pubsub: %w", err) - } - m.msgSub, err = m.client.topic.Subscribe() - if err != nil { - m.psLk.Unlock() - return xerrors.Errorf("subscribing to topic: %w", err) - } - m.psLk.Unlock() - return nil -} +func (m *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) { + m.mu.Lock() + cs := m.cs + m.mu.Unlock() -// Sets up the gpbft runner, this is triggered at initialization -func (m *F3) startGpbftRunner(ctx context.Context, errCh chan error) { - if err := m.initGpbftRunner(ctx); err != nil { - errCh <- xerrors.Errorf("initializing gpbft host: %w", err) - return + if cs == nil { + return nil, xerrors.Errorf("F3 is not running") } - - // the size of the buffer is set to prevent message spamming from pubsub, - // so it is a big enough buffer to be able to accommodate new messages - // at "high-rate", but small enough to avoid spamming or clogging the node. - messageQueue := make(chan gpbft.ValidatedMessage, 20) - m.client.messageQueue = messageQueue - - go func() { - m.mLk.Lock() - latest := m.certStore.Latest() - m.mLk.Unlock() - startInstance := m.Manifest().InitialInstance - if latest != nil { - startInstance = latest.GPBFTInstance + 1 - } - // Check context before starting the instance - select { - case <-ctx.Done(): - // If the context is cancelled before starting, return immediately - errCh <- ctx.Err() - return - default: - // Proceed with running the instance - err := m.runner.Run(startInstance, ctx) - if err != nil { - m.log.Errorf("error returned while running host: %+v", err) - } - errCh <- err - } - }() - - m.handleIncomingMessages(ctx, messageQueue) + return cs.Latest(), nil } -func (m *F3) boostrap(ctx context.Context) error { - head, err := m.ec.GetHead(ctx) - if err != nil { - return xerrors.Errorf("failed to get the head: %w", err) - } - - if head.Epoch() < m.Manifest().BootstrapEpoch { - // wait for bootstrap epoch - for { - head, err := m.ec.GetHead(ctx) - if err != nil { - return xerrors.Errorf("getting head: %w", err) - } - if head.Epoch() >= m.Manifest().BootstrapEpoch { - break - } - - m.log.Infof("wating for bootstrap epoch (%d): currently at epoch %d", m.Manifest().BootstrapEpoch, head.Epoch()) - aim := time.Until(head.Timestamp().Add(m.Manifest().ECPeriod)) - // correct for null epochs - for aim < 0 { - aim += m.Manifest().ECPeriod - } - - select { - case <-time.After(aim): - case <-ctx.Done(): - return ctx.Err() - } - } - } +func (m *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) { + m.mu.Lock() + cs := m.cs + m.mu.Unlock() - ts, err := m.ec.GetTipsetByEpoch(ctx, m.Manifest().BootstrapEpoch-m.Manifest().ECFinality) - if err != nil { - return xerrors.Errorf("getting initial power tipset: %w", err) + if cs == nil { + return nil, xerrors.Errorf("F3 is not running") } + return cs.Get(ctx, instance) +} - initialPowerTable, err := m.ec.GetPowerTable(ctx, ts.Key()) - if err != nil { - return xerrors.Errorf("getting initial power table: %w", err) +// Returns the time at which the F3 instance specified by the passed manifest should be started, or +// 0 if the passed manifest is nil. +func (m *F3) computeBootstrapDelay(manifest *manifest.Manifest) (delay time.Duration, err error) { + if manifest == nil { + return 0, nil } - cs, err := certstore.CreateStore(ctx, m.ds, m.Manifest().InitialInstance, initialPowerTable) + ts, err := m.ec.GetHead(m.runningCtx) if err != nil { - return xerrors.Errorf("creating certstore: %w", err) - } - m.setCertStore(cs) - return nil -} - -func (m *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) { - m.mLk.Lock() - defer m.mLk.Unlock() - if m.certStore == nil { - return nil, xerrors.Errorf("F3 is not running") - } - return m.certStore.Latest(), nil -} -func (m *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) { - m.mLk.Lock() - defer m.mLk.Unlock() - if m.certStore == nil { - return nil, xerrors.Errorf("F3 is not running") - } - return m.certStore.Get(ctx, instance) + err := xerrors.Errorf("failed to get the EC chain head: %w", err) + return 0, err + } + + // We bootstrap now if we're at the correct time, whether or + // not we've hit the correct epoch. The important thing is + // that: + // + // 1. All nodes will use BootstrapEpoch - Finality to pick the base. + // 2. All nodes will bootstrap at the same time. + currentEpoch := ts.Epoch() + if currentEpoch < manifest.BootstrapEpoch { + epochDelay := manifest.BootstrapEpoch - currentEpoch + start := ts.Timestamp().Add(time.Duration(epochDelay) * manifest.ECPeriod) + return max(time.Until(start), 0), nil + } + return 0, nil } // Run start the module. It will exit when context is cancelled. // Or if there is an error from the message handling routines. -func (m *F3) Run(ctx context.Context) (_err error) { - ctx, m.cancelCtx = context.WithCancel(ctx) - defer m.cancelCtx() - - runnerErrCh := make(chan error, 1) - manifestErrCh := make(chan error, 1) - - // bootstrap runner for the initial manifest - go m.startGpbftRunner(ctx, runnerErrCh) - - // run manifest provider. This runs a background goroutine that will - // handle dynamic manifest updates if this is a dynamic manifest provider. - // If it is a static manifest it does nothing. - go m.client.manifest.Run(ctx, manifestErrCh) - - // teardown pubsub on shutdown - defer func() { - teardownErr := m.teardownPubsub(m.Manifest()) - _err = multierr.Append(_err, teardownErr) - }() - - select { - case <-ctx.Done(): - return nil - case err := <-runnerErrCh: - return err - case err := <-manifestErrCh: +func (m *F3) Start(startCtx context.Context) (_err error) { + err := m.manifestProvider.Start(startCtx) + if err != nil { return err } -} - -func (m *F3) Stop() { - m.mLk.Lock() - defer m.mLk.Unlock() - m.pauseRunner() - - if m.cancelCtx != nil { - m.cancelCtx() + // Try to get an initial manifest immediately if possible so uses can query it immediately. + var pendingManifest *manifest.Manifest + select { + case pendingManifest = <-m.manifestProvider.ManifestUpdates(): + m.mu.Lock() + m.manifest = pendingManifest + m.mu.Unlock() + default: } -} -func (m *F3) pauseRunner() { - if m.runner != nil { - m.runner.Stop() + initialDelay, err := m.computeBootstrapDelay(pendingManifest) + if err != nil { + return err } - m.runner = nil -} -func (m *F3) handleIncomingMessages(ctx context.Context, queue chan gpbft.ValidatedMessage) { - ctx, m.client.incomingCancel = context.WithCancel(ctx) -loop: - for { - var msg *pubsub.Message - msg, err := m.msgSub.Next(ctx) - if err != nil { - if ctx.Err() != nil { - break - } - m.log.Errorf("msgPubsub subscription.Next() returned an error: %+v", err) - break - } - gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage) - if !ok { - m.log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) - continue - } - select { - case queue <- gmsg: - case <-ctx.Done(): - break loop + // Try to start immediately if we have a manifest available and don't have to wait to + // bootstrap. That way, we'll be fully started when we return from Start. + if initialDelay == 0 { + if err := m.reconfigure(startCtx, pendingManifest); err != nil { + return xerrors.Errorf("failed to start GPBFT: %w", err) } + pendingManifest = nil } -} -// Callback to be triggered when there is a dynamic manifest change -// If the manifest triggers a rebootstrap it starts a new runner with the new configuration. -// If there is no rebootstrap it only starts a new pubsub topic with a new network name that -// depends on the manifest version so there is no overlap between different configuration instances -func ManifestChangeCallback(m *F3) manifest.OnManifestChange { - // We can only accommodate 1 manifest update at a time, thus - // the buffer size. - manifestUpdate := make(chan uint64, 1) - m.client.manifestUpdate = manifestUpdate - return func(ctx context.Context, prevManifest manifest.Manifest, errCh chan error) { - // Tear down pubsub. - if err := m.teardownPubsub(prevManifest); err != nil { - // for now we just log the error and continue. - // This is not critical, but alternative approaches welcome. - m.log.Errorf("error stopping gpbft runner: %+v", err) + m.errgrp.Go(func() error { + manifestChangeTimer := time.NewTimer(initialDelay) + if pendingManifest == nil && !manifestChangeTimer.Stop() { + <-manifestChangeTimer.C } - // empty message queue from outstanding messages - m.emptyMessageQueue() - if m.Manifest().Pause { - m.pauseCallback() - } else { + defer manifestChangeTimer.Stop() - if m.Manifest().ReBootstrap { - m.withRebootstrapCallback(ctx, errCh) - } else { - m.withoutRebootstrapCallback(ctx, manifestUpdate, errCh) + for m.runningCtx.Err() == nil { + select { + case update := <-m.manifestProvider.ManifestUpdates(): + if pendingManifest != nil && !manifestChangeTimer.Stop() { + <-manifestChangeTimer.C + } + + pendingManifest = update + if delay, err := m.computeBootstrapDelay(update); err != nil { + return err + } else if delay > 0 { + manifestChangeTimer.Reset(delay) + continue + } + case <-manifestChangeTimer.C: + case <-m.runningCtx.Done(): + return nil + } + if err := m.reconfigure(m.runningCtx, pendingManifest); err != nil { + return xerrors.Errorf("failed to reconfigure F3: %w", err) } + pendingManifest = nil } + return nil + }) - } + return nil } -func (m *F3) withRebootstrapCallback(ctx context.Context, errCh chan error) { - m.mLk.Lock() - m.log.Infof("triggering manifest (seq=%d) change with rebootstrap", m.Manifest().Sequence) - - // if runner still up - if m.runner != nil { - m.runner.Stop() - } - - // clear the certstore - if err := m.certStore.DeleteAll(ctx); err != nil { - errCh <- xerrors.Errorf("clearing certstore: %w", err) - m.mLk.Unlock() - return - } - m.mLk.Unlock() - m.startGpbftRunner(ctx, errCh) +func (m *F3) Stop(stopCtx context.Context) (_err error) { + m.cancelCtx() + return multierr.Combine( + m.manifestProvider.Stop(stopCtx), + m.errgrp.Wait(), + ) } -func (m *F3) withoutRebootstrapCallback(ctx context.Context, manifestUpdate chan uint64, errCh chan error) { - m.mLk.Lock() - m.log.Infof("triggering manifest (seq=%d) change without rebootstrap", m.Manifest().Sequence) +func (m *F3) reconfigure(ctx context.Context, manifest *manifest.Manifest) error { + m.mu.Lock() + defer m.mu.Unlock() if m.runner != nil { - m.log.Error("cannot trigger a callback without rebootstrap if the runner is stop") + if err := m.runner.Stop(ctx); err != nil { + return err + } + m.runner = nil + } + if manifest == nil { + return nil } - // immediately stop listening to network messages - m.client.incomingCancel() - m.psLk.Lock() - if err := m.setupPubsub(m.runner); err != nil { - errCh <- xerrors.Errorf("setting up pubsub: %w", err) - return + runnerEc := m.ec + if len(manifest.PowerUpdate) > 0 { + runnerEc = ec.WithModifiedPower(m.ec, manifest.PowerUpdate) } - var err error - m.msgSub, err = m.client.topic.Subscribe() + + cs, err := openCertstore(m.runningCtx, runnerEc, m.ds, manifest) if err != nil { - errCh <- xerrors.Errorf("subscribing to topic: %w", err) - return + return xerrors.Errorf("failed to open certstore: %w", err) } - m.psLk.Unlock() - - messageQueue := make(chan gpbft.ValidatedMessage, 20) - m.client.messageQueue = messageQueue - // notify update to host to pick up the new message queue - fc := m.certStore.Latest() - manifestUpdate <- fc.GPBFTInstance - m.mLk.Unlock() - m.handleIncomingMessages(ctx, messageQueue) -} -func (m *F3) pauseCallback() { - m.mLk.Lock() - defer m.mLk.Unlock() - m.log.Infof("triggering manifest (seq=%d) change with pause", m.Manifest().Sequence) - m.pauseRunner() -} - -func (m *F3) emptyMessageQueue() { - for { - select { - case <-m.client.messageQueue: - m.log.Debug("emptying message queue") - default: - return - } + m.cs = cs + m.manifest = manifest + m.runner, err = newRunner( + ctx, m.cs, runnerEc, m.log, m.pubsub, + m.signingMarshaller, m.verifier, + m.busBroadcast.Publish, m.manifest, + ) + if err != nil { + return err } + + return m.runner.Start(ctx) } type Logger interface { @@ -567,13 +285,21 @@ type Logger interface { // IsRunning returns true if gpbft is running // Used mainly for testing purposes func (m *F3) IsRunning() bool { - m.mLk.Lock() - defer m.mLk.Unlock() + m.mu.Lock() + defer m.mu.Unlock() return m.runner != nil } // GetPowerTable returns the power table for the given tipset // Used mainly for testing purposes func (m *F3) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) { - return m.client.GetPowerTable(ctx, ts) + m.mu.Lock() + manifest := m.manifest + m.mu.Unlock() + + if manifest == nil { + return nil, xerrors.Errorf("no known network manifest") + } + + return ec.WithModifiedPower(m.ec, manifest.PowerUpdate).GetPowerTable(ctx, ts) } diff --git a/go.mod b/go.mod index 25c4ff22..077f5e8f 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/whyrusleeping/cbor-gen v0.1.1 go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.23.0 + golang.org/x/sync v0.7.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 ) @@ -120,7 +121,6 @@ require ( golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/tools v0.21.0 // indirect diff --git a/host.go b/host.go index 1da2cdd7..73074e55 100644 --- a/host.go +++ b/host.go @@ -3,38 +3,78 @@ package f3 import ( "bytes" "context" + "errors" "slices" "time" "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" + logging "github.com/ipfs/go-log/v2" + pubsub "github.com/libp2p/go-libp2p-pubsub" + peer "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) +type BroadcastMessage func(*gpbft.MessageBuilder) + // gpbftRunner is responsible for running gpbft.Participant, taking in all concurrent events and // passing them to gpbft in a single thread. type gpbftRunner struct { - client *client + certStore *certstore.Store + manifest *manifest.Manifest + ec ec.Backend + pubsub *pubsub.PubSub + signingMarshaller gpbft.SigningMarshaler + verifier gpbft.Verifier + broadcastCb BroadcastMessage + log, logWithSkip Logger + participant *gpbft.Participant - manifest manifest.ManifestProvider + topic *pubsub.Topic alertTimer *time.Timer runningCtx context.Context + errgrp *errgroup.Group ctxCancel context.CancelFunc - log Logger } -// gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant -type gpbftHost gpbftRunner +func newRunner( + _ context.Context, + cs *certstore.Store, + ec ec.Backend, + log Logger, + ps *pubsub.PubSub, + signer gpbft.SigningMarshaler, + verifier gpbft.Verifier, + broadcastCb BroadcastMessage, + m *manifest.Manifest, +) (*gpbftRunner, error) { + runningCtx, ctxCancel := context.WithCancel(context.Background()) + errgrp, runningCtx := errgroup.WithContext(runningCtx) -func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error) { runner := &gpbftRunner{ - client: client, - manifest: m, - log: client.Logger(), + certStore: cs, + manifest: m, + ec: ec, + pubsub: ps, + signingMarshaller: signer, + verifier: verifier, + broadcastCb: broadcastCb, + log: log, + logWithSkip: log, + runningCtx: runningCtx, + errgrp: errgrp, + ctxCancel: ctxCancel, + } + + if zapLogger, ok := runner.log.(*logging.ZapEventLogger); ok { + runner.logWithSkip = logging.WithSkip(zapLogger, 1) } // create a stopped timer to facilitate alerts requested from gpbft @@ -44,7 +84,7 @@ func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error } runner.log.Infof("Starting gpbft runner") - opts := append(m.GpbftOptions(), gpbft.WithTracer(client)) + opts := append(m.GpbftOptions(), gpbft.WithTracer((*gpbftTracer)(runner))) p, err := gpbft.NewParticipant((*gpbftHost)(runner), opts...) if err != nil { return nil, xerrors.Errorf("creating participant: %w", err) @@ -53,81 +93,91 @@ func newRunner(m manifest.ManifestProvider, client *client) (*gpbftRunner, error return runner, nil } -func (h *gpbftRunner) Run(instance uint64, ctx context.Context) (_err error) { - h.runningCtx, h.ctxCancel = context.WithCancel(ctx) - defer h.ctxCancel() +func (h *gpbftRunner) Start(ctx context.Context) (_err error) { + defer func() { + if _err != nil { + _err = multierr.Append(_err, h.Stop(ctx)) + } + }() + + startInstance := h.manifest.InitialInstance + if latest := h.certStore.Latest(); latest != nil { + startInstance = latest.GPBFTInstance + 1 + } + + messageQueue, err := h.startPubsub() + if err != nil { + return err + } - if err := h.participant.StartInstanceAt(instance, time.Now()); err != nil { + if err := h.participant.StartInstanceAt(startInstance, time.Now()); err != nil { return xerrors.Errorf("starting a participant: %w", err) } // Subscribe to new certificates. We don't bother canceling the subscription as that'll // happen automatically when the channel fills. finalityCertificates := make(chan *certs.FinalityCertificate, 4) - _, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - - defer func() { - if _err != nil { - h.log.Errorf("gpbfthost exiting: %+v", _err) - } - }() + _, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) - messageQueue := h.client.IncomingMessages() - for { - - // prioritise finality certificates and alarm delivery - select { - case c, ok := <-finalityCertificates: - if !ok { - finalityCertificates = make(chan *certs.FinalityCertificate, 4) - c, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - } - if err := h.receiveCertificate(c); err != nil { - return err + h.errgrp.Go(func() (_err error) { + defer func() { + if _err != nil && h.runningCtx.Err() == nil { + h.log.Errorf("exited GPBFT runner early: %+v", _err) } - continue - case <-h.alertTimer.C: - if err := h.participant.ReceiveAlarm(); err != nil { - return err + }() + for h.runningCtx.Err() == nil { + // prioritise finality certificates and alarm delivery + select { + case c, ok := <-finalityCertificates: + if !ok { + finalityCertificates = make(chan *certs.FinalityCertificate, 4) + c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) + } + if err := h.receiveCertificate(c); err != nil { + return err + } + continue + case <-h.alertTimer.C: + if err := h.participant.ReceiveAlarm(); err != nil { + return err + } + continue + default: } - continue - default: - } - // Handle messages, finality certificates, and alarms - select { - case c, ok := <-finalityCertificates: - if !ok { - finalityCertificates = make(chan *certs.FinalityCertificate, 4) - c, _ = h.client.certStore.SubscribeForNewCerts(finalityCertificates) - } - if err := h.receiveCertificate(c); err != nil { - return err - } - case <-h.alertTimer.C: - if err := h.participant.ReceiveAlarm(); err != nil { - return err - } - case msg, ok := <-messageQueue: - if !ok { - return xerrors.Errorf("incoming message queue closed") + // Handle messages, finality certificates, and alarms + select { + case c, ok := <-finalityCertificates: + if !ok { + finalityCertificates = make(chan *certs.FinalityCertificate, 4) + c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates) + } + if err := h.receiveCertificate(c); err != nil { + return err + } + case <-h.alertTimer.C: + if err := h.participant.ReceiveAlarm(); err != nil { + return err + } + case msg, ok := <-messageQueue: + if !ok { + return xerrors.Errorf("incoming message queue closed") + } + if err := h.participant.ReceiveMessage(msg); err != nil { + // We silently drop failed messages because GPBFT will + // return errors for, e.g., messages from old instances. + // Given the async nature of our pubsub message handling, we + // could easily receive these. + h.log.Debugf("error when processing message: %+v", err) + } + case <-h.runningCtx.Done(): + return nil } - if err := h.participant.ReceiveMessage(msg); err != nil { - return err - } - case start := <-h.client.manifestUpdate: - // Check for manifest update in the inner loop to exit and update the - // messageQueue and start from the last instance - h.log.Debugf("Manifest update detected, refreshing message queue") - if err := h.participant.StartInstanceAt(start, time.Now()); err != nil { - return xerrors.Errorf("on maifest update: %+v", err) - } - messageQueue = h.client.IncomingMessages() - case <-ctx.Done(): - return nil - } - } + } + return nil + }) + return nil } func (h *gpbftRunner) receiveCertificate(c *certs.FinalityCertificate) error { @@ -141,10 +191,9 @@ func (h *gpbftRunner) receiveCertificate(c *certs.FinalityCertificate) error { } func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) time.Time { - manifest := h.manifest.Manifest() - ecDelay := time.Duration(manifest.ECDelayMultiplier * float64(manifest.ECPeriod)) + ecDelay := time.Duration(h.manifest.ECDelayMultiplier * float64(h.manifest.ECPeriod)) - ts, err := h.client.ec.GetTipset(h.runningCtx, cert.ECChain.Head().Key) + ts, err := h.ec.GetTipset(h.runningCtx, cert.ECChain.Head().Key) if err != nil { // this should not happen h.log.Errorf("could not get timestamp of just finalized tipset: %+v", err) @@ -155,16 +204,16 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) // we decided on something new, the tipset that got finalized can at minimum be 30-60s old. return ts.Timestamp().Add(ecDelay) } - if cert.GPBFTInstance == manifest.InitialInstance { + if cert.GPBFTInstance == h.manifest.InitialInstance { // if we are at initial instance, there is no history to look at return ts.Timestamp().Add(ecDelay) } - backoffTable := manifest.BaseDecisionBackoffTable + backoffTable := h.manifest.BaseDecisionBackoffTable attempts := 0 backoffMultipler := 1.0 // to account for the one ECDelay after which we got the base decistion - for instance := cert.GPBFTInstance - 1; instance > manifest.InitialInstance; instance-- { - cert, err := h.client.certStore.Get(h.runningCtx, instance) + for instance := cert.GPBFTInstance - 1; instance > h.manifest.InitialInstance; instance-- { + cert, err := h.certStore.Get(h.runningCtx, instance) if err != nil { h.log.Errorf("error while getting instance %d from certstore: %+v", instance, err) break @@ -186,10 +235,132 @@ func (h *gpbftRunner) computeNextInstanceStart(cert *certs.FinalityCertificate) return ts.Timestamp().Add(backoff) } -func (h *gpbftRunner) ValidateMessage(msg *gpbft.GMessage) (gpbft.ValidatedMessage, error) { - return h.participant.ValidateMessage(msg) +// Sends a message to all other participants. +// The message's sender must be one that the network interface can sign on behalf of. +func (h *gpbftRunner) BroadcastMessage(msg *gpbft.GMessage) error { + if h.topic == nil { + return pubsub.ErrTopicClosed + } + var bw bytes.Buffer + err := msg.MarshalCBOR(&bw) + if err != nil { + return xerrors.Errorf("marshalling GMessage for broadcast: %w", err) + } + + err = h.topic.Publish(h.runningCtx, bw.Bytes()) + if err != nil { + return xerrors.Errorf("publishing message: %w", err) + } + return nil +} + +var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage + +func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, pID peer.ID, + msg *pubsub.Message) pubsub.ValidationResult { + var gmsg gpbft.GMessage + err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)) + if err != nil { + return pubsub.ValidationReject + } + + validatedMessage, err := h.participant.ValidateMessage(&gmsg) + if errors.Is(err, gpbft.ErrValidationInvalid) { + h.log.Debugf("validation error during validation: %+v", err) + return pubsub.ValidationReject + } + if err != nil { + h.log.Warnf("unknown error during validation: %+v", err) + return pubsub.ValidationIgnore + } + msg.ValidatorData = validatedMessage + return pubsub.ValidationAccept +} + +func (h *gpbftRunner) setupPubsub() error { + pubsubTopicName := h.manifest.PubSubTopic() + err := h.pubsub.RegisterTopicValidator(pubsubTopicName, h.validatePubsubMessage) + if err != nil { + return xerrors.Errorf("registering topic validator: %w", err) + } + + topic, err := h.pubsub.Join(pubsubTopicName) + if err != nil { + return xerrors.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err) + } + h.topic = topic + return nil +} + +func (h *gpbftRunner) teardownPubsub() error { + var err error + if h.topic != nil { + err = multierr.Combine( + h.topic.Close(), + h.pubsub.UnregisterTopicValidator(h.topic.String()), + ) + + if errors.Is(err, context.Canceled) { + err = nil + } + } + return err } +func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) { + if err := h.setupPubsub(); err != nil { + return nil, err + } + + sub, err := h.topic.Subscribe() + if err != nil { + return nil, xerrors.Errorf("could not subscribe to pubsub topic: %s: %w", sub.Topic(), err) + } + + messageQueue := make(chan gpbft.ValidatedMessage, 20) + h.errgrp.Go(func() error { + defer func() { + sub.Cancel() + close(messageQueue) + }() + + for h.runningCtx.Err() == nil { + var msg *pubsub.Message + msg, err := sub.Next(h.runningCtx) + if err != nil { + if h.runningCtx.Err() != nil { + return nil + } + return xerrors.Errorf("pubsub message subscription returned an error: %w", err) + } + gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage) + if !ok { + h.log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData) + continue + } + select { + case messageQueue <- gmsg: + case <-h.runningCtx.Done(): + return nil + } + } + return nil + }) + return messageQueue, nil +} + +type gpbftTracer gpbftRunner + +// Log fulfills the gpbft.Tracer interface +func (h *gpbftTracer) Log(fmt string, args ...any) { + h.logWithSkip.Debugf(fmt, args...) +} + +var _ gpbft.Tracer = (*gpbftTracer)(nil) + +// gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant +type gpbftHost gpbftRunner + func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, error) { // TODO: optimize when head is way beyond base res := make([]ec.TipSet, 0, 2*gpbft.CHAIN_MAX_LEN) @@ -204,7 +375,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e panic("reorg-ed away from base, dunno what to do, reboostrap is the answer") } var err error - head, err = h.client.ec.GetParent(h.runningCtx, head) + head, err = h.ec.GetParent(h.runningCtx, head) if err != nil { return nil, xerrors.Errorf("walking back the chain: %w", err) } @@ -214,10 +385,12 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e return res[1:], nil } -func (h *gpbftRunner) Stop() { - if h.ctxCancel != nil { - h.ctxCancel() - } +func (h *gpbftRunner) Stop(_ctx context.Context) error { + h.ctxCancel() + return multierr.Combine( + h.errgrp.Wait(), + h.teardownPubsub(), + ) } // Returns inputs to the next GPBFT instance. @@ -229,33 +402,33 @@ func (h *gpbftRunner) Stop() { // ReceiveDecision (or known to be final via some other channel). func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.SupplementalData, gpbft.ECChain, error) { var baseTsk gpbft.TipSetKey - if instance == h.manifest.Manifest().InitialInstance { - ts, err := h.client.ec.GetTipsetByEpoch(h.runningCtx, - h.manifest.Manifest().BootstrapEpoch-h.manifest.Manifest().ECFinality) + if instance == h.manifest.InitialInstance { + ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, + h.manifest.BootstrapEpoch-h.manifest.ECFinality) if err != nil { return nil, nil, xerrors.Errorf("getting boostrap base: %w", err) } baseTsk = ts.Key() } else { - cert, err := h.client.certStore.Get(h.runningCtx, instance-1) + cert, err := h.certStore.Get(h.runningCtx, instance-1) if err != nil { return nil, nil, xerrors.Errorf("getting cert for previous instance(%d): %w", instance-1, err) } baseTsk = cert.ECChain.Head().Key } - baseTs, err := h.client.ec.GetTipset(h.runningCtx, baseTsk) + baseTs, err := h.ec.GetTipset(h.runningCtx, baseTsk) if err != nil { return nil, nil, xerrors.Errorf("getting base TS: %w", err) } - headTs, err := h.client.ec.GetHead(h.runningCtx) + headTs, err := h.ec.GetHead(h.runningCtx) if err != nil { return nil, nil, xerrors.Errorf("getting head TS: %w", err) } - if time.Since(headTs.Timestamp()) < h.manifest.Manifest().ECPeriod { + if time.Since(headTs.Timestamp()) < h.manifest.ECPeriod { // less than ECPeriod since production of the head // agreement is unlikely - headTs, err = h.client.ec.GetParent(h.runningCtx, headTs) + headTs, err = h.ec.GetParent(h.runningCtx, headTs) if err != nil { return nil, nil, xerrors.Errorf("getting the parent of head TS: %w", err) } @@ -270,7 +443,7 @@ func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.Supplemental Epoch: baseTs.Epoch(), Key: baseTs.Key(), } - pte, err := h.client.GetPowerTable(h.runningCtx, baseTs.Key()) + pte, err := h.ec.GetPowerTable(h.runningCtx, baseTs.Key()) if err != nil { return nil, nil, xerrors.Errorf("getting power table for base: %w", err) } @@ -284,7 +457,7 @@ func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.Supplemental suffix[i].Key = collectedChain[i].Key() suffix[i].Epoch = collectedChain[i].Epoch() - pte, err = h.client.GetPowerTable(h.runningCtx, suffix[i].Key) + pte, err = h.ec.GetPowerTable(h.runningCtx, suffix[i].Key) if err != nil { return nil, nil, xerrors.Errorf("getting power table for suffix %d: %w", i, err) } @@ -317,36 +490,36 @@ func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, var powerEntries gpbft.PowerEntries var err error - if instance < h.manifest.Manifest().InitialInstance+h.manifest.Manifest().CommiteeLookback { + if instance < h.manifest.InitialInstance+h.manifest.CommiteeLookback { //boostrap phase - ts, err := h.client.ec.GetTipsetByEpoch(h.runningCtx, h.manifest.Manifest().BootstrapEpoch-h.manifest.Manifest().ECFinality) + ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, h.manifest.BootstrapEpoch-h.manifest.ECFinality) if err != nil { return nil, nil, xerrors.Errorf("getting tipset for boostrap epoch with lookback: %w", err) } powerTsk = ts.Key() - powerEntries, err = h.client.GetPowerTable(h.runningCtx, powerTsk) + powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting power table: %w", err) } } else { - cert, err := h.client.certStore.Get(h.runningCtx, instance-h.manifest.Manifest().CommiteeLookback) + cert, err := h.certStore.Get(h.runningCtx, instance-h.manifest.CommiteeLookback) if err != nil { return nil, nil, xerrors.Errorf("getting finality certificate: %w", err) } powerTsk = cert.ECChain.Head().Key - powerEntries, err = h.client.certStore.GetPowerTable(h.runningCtx, instance) + powerEntries, err = h.certStore.GetPowerTable(h.runningCtx, instance) if err != nil { h.log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) - powerEntries, err = h.client.ec.GetPowerTable(h.runningCtx, powerTsk) + powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting power table: %w", err) } } } - ts, err := h.client.ec.GetTipset(h.runningCtx, powerTsk) + ts, err := h.ec.GetTipset(h.runningCtx, powerTsk) if err != nil { return nil, nil, xerrors.Errorf("getting tipset: %w", err) } @@ -362,17 +535,15 @@ func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, // Returns the network's name (for signature separation) func (h *gpbftHost) NetworkName() gpbft.NetworkName { - return h.manifest.Manifest().NetworkName + return h.manifest.NetworkName } // Sends a message to all other participants. // The message's sender must be one that the network interface can sign on behalf of. func (h *gpbftHost) RequestBroadcast(mb *gpbft.MessageBuilder) error { - err := h.client.BroadcastMessage(h.runningCtx, mb) - if err != nil { - h.log.Errorf("broadcasting GMessage: %+v", err) - return err - } + mb.SetNetworkName(h.manifest.NetworkName) + mb.SetSigningMarshaler(h.signingMarshaller) + (h.broadcastCb)(mb) return nil } @@ -431,7 +602,7 @@ func (h *gpbftHost) saveDecision(decision *gpbft.Justification) (*certs.Finality return nil, xerrors.Errorf("certificate is invalid: %w", err) } - err = h.client.certStore.Put(h.runningCtx, cert) + err = h.certStore.Put(h.runningCtx, cert) if err != nil { return nil, xerrors.Errorf("saving ceritifcate in a store: %w", err) } @@ -443,22 +614,22 @@ func (h *gpbftHost) saveDecision(decision *gpbft.Justification) (*certs.Finality // This should usually call `Payload.MarshalForSigning(NetworkName)` except when testing as // that method is slow (computes a merkle tree that's necessary for testing). func (h *gpbftHost) MarshalPayloadForSigning(nn gpbft.NetworkName, p *gpbft.Payload) []byte { - return h.client.signingMarshaller.MarshalPayloadForSigning(nn, p) + return h.signingMarshaller.MarshalPayloadForSigning(nn, p) } // Verifies a signature for the given public key. // Implementations must be safe for concurrent use. func (h *gpbftHost) Verify(pubKey gpbft.PubKey, msg []byte, sig []byte) error { - return h.client.Verify(pubKey, msg, sig) + return h.verifier.Verify(pubKey, msg, sig) } // Aggregates signatures from a participants. func (h *gpbftHost) Aggregate(pubKeys []gpbft.PubKey, sigs [][]byte) ([]byte, error) { - return h.client.Aggregate(pubKeys, sigs) + return h.verifier.Aggregate(pubKeys, sigs) } // VerifyAggregate verifies an aggregate signature. // Implementations must be safe for concurrent use. func (h *gpbftHost) VerifyAggregate(payload []byte, aggSig []byte, signers []gpbft.PubKey) error { - return h.client.VerifyAggregate(payload, aggSig, signers) + return h.verifier.VerifyAggregate(payload, aggSig, signers) } diff --git a/manifest/dynamic_manifest.go b/manifest/dynamic_manifest.go index 4442973b..778fbfae 100644 --- a/manifest/dynamic_manifest.go +++ b/manifest/dynamic_manifest.go @@ -3,27 +3,27 @@ package manifest import ( "bytes" "context" + "encoding/json" + "errors" "fmt" - "sync" - "time" + "io" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" + logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) -var _ ManifestProvider = (*DynamicManifestProvider)(nil) - var log = logging.Logger("f3-dynamic-manifest") -const ( - ManifestPubSubTopicName = "/f3/manifests/0.0.1" - ManifestCheckTick = 5 * time.Second -) +var _ ManifestProvider = (*DynamicManifestProvider)(nil) + +const ManifestPubSubTopicName = "/f3/manifests/0.0.1" // DynamicManifestProvider is a manifest provider that allows // the manifest to be changed at runtime. @@ -31,169 +31,156 @@ type DynamicManifestProvider struct { pubsub *pubsub.PubSub ec ec.Backend manifestServerID peer.ID - manifestTopic *pubsub.Topic - // the lk guards all dynamic manifest-specific fields - lk sync.RWMutex - manifest Manifest - onManifestChange OnManifestChange - nextManifest *Manifest -} + runningCtx context.Context + errgrp *errgroup.Group + cancel context.CancelFunc -func NewDynamicManifestProvider(manifest Manifest, pubsub *pubsub.PubSub, ec ec.Backend, manifestServerID peer.ID) ManifestProvider { - return &DynamicManifestProvider{ - manifest: manifest, - pubsub: pubsub, - ec: ec, - manifestServerID: manifestServerID, - } + initialManifest *Manifest + manifestChanges chan *Manifest } -func (m *DynamicManifestProvider) Manifest() Manifest { - m.lk.RLock() - defer m.lk.RUnlock() - return m.manifest +type ManifestUpdateMessage struct { + MessageSequence uint64 + ManifestVersion uint64 + Manifest *Manifest } -func (m *DynamicManifestProvider) GpbftOptions() []gpbft.Option { - return m.manifest.GpbftOptions() -} +func (mu ManifestUpdateMessage) toManifest() *Manifest { + if mu.Manifest == nil { + return nil + } -func (m *DynamicManifestProvider) SetManifestChangeCallback(mc OnManifestChange) { - m.lk.Lock() - defer m.lk.Unlock() - m.onManifestChange = mc + // When a manifest configuration changes, a new network name is set that depends on the + // manifest version of the previous version to avoid overlapping previous configurations. + cpy := *mu.Manifest + newName := fmt.Sprintf("%s/%d", string(cpy.NetworkName), mu.ManifestVersion) + cpy.NetworkName = gpbft.NetworkName(newName) + return &cpy } -// When a manifest configuration changes, a new network name -// is set that depends on the manifest version of the previous version to avoid -// overlapping previous configurations. -func (m *DynamicManifestProvider) networkNameOnChange() gpbft.NetworkName { - return gpbft.NetworkName(string(m.manifest.NetworkName) + "/" + fmt.Sprintf("%d", m.manifest.Sequence)) +func (m ManifestUpdateMessage) Marshal() ([]byte, error) { + b, err := json.Marshal(m) + if err != nil { + return nil, xerrors.Errorf("marshaling JSON: %w", err) + } + return b, nil } -func (m *DynamicManifestProvider) Run(ctx context.Context, errCh chan error) { - if m.onManifestChange == nil { - errCh <- xerrors.New("onManifestChange is nil. Callback for manifest change required") +func (m *ManifestUpdateMessage) Unmarshal(r io.Reader) error { + err := json.NewDecoder(r).Decode(&m) + if err != nil { + return xerrors.Errorf("decoding JSON: %w", err) } - go m.handleIncomingManifests(ctx, errCh) - m.handleApplyManifest(ctx, errCh) + return nil } -func (m *DynamicManifestProvider) handleApplyManifest(ctx context.Context, errCh chan error) { - // add a timer for EC period - ticker := time.NewTicker(ManifestCheckTick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - m.lk.Lock() - if m.nextManifest != nil { - ts, err := m.ec.GetHead(ctx) - if err != nil { - log.Errorf("error fetching chain head: %+v", err) - m.lk.Unlock() - continue - } +func NewDynamicManifestProvider(initialManifest *Manifest, pubsub *pubsub.PubSub, ec ec.Backend, manifestServerID peer.ID) ManifestProvider { + ctx, cancel := context.WithCancel(context.Background()) + errgrp, ctx := errgroup.WithContext(ctx) - // if the upgrade epoch is reached or already passed. - if ts.Epoch() >= m.nextManifest.BootstrapEpoch { - log.Debugf("reached bootstrap epoch, triggering manifest change: %d", ts.Epoch()) - // update the current manifest - prevManifest := m.manifest - m.manifest = *m.nextManifest - nn := m.networkNameOnChange() - m.manifest.NetworkName = nn - m.nextManifest = nil - // trigger manifest change callback. - go m.onManifestChange(ctx, prevManifest, errCh) - m.lk.Unlock() - continue - } - } - m.lk.Unlock() - case <-ctx.Done(): - return - } + return &DynamicManifestProvider{ + pubsub: pubsub, + ec: ec, + manifestServerID: manifestServerID, + runningCtx: ctx, + errgrp: errgrp, + cancel: cancel, + initialManifest: initialManifest, + manifestChanges: make(chan *Manifest, 1), } } -// listen to manifests being broadcast through the network. -func (m *DynamicManifestProvider) handleIncomingManifests(ctx context.Context, errCh chan error) { - if err := m.setupManifestPubsub(); err != nil { - errCh <- xerrors.Errorf("setting up pubsub: %w", err) - return +func (m *DynamicManifestProvider) ManifestUpdates() <-chan *Manifest { + return m.manifestChanges +} + +func (m *DynamicManifestProvider) Stop(ctx context.Context) (_err error) { + m.cancel() + return m.errgrp.Wait() +} + +func (m *DynamicManifestProvider) Start(startCtx context.Context) (_err error) { + if err := m.registerTopicValidator(); err != nil { + return err } - manifestSub, err := m.manifestTopic.Subscribe() + manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName) if err != nil { - errCh <- xerrors.Errorf("subscribing to topic: %w", err) - return + return xerrors.Errorf("could not join manifest pubsub topic: %w", err) } -loop: - for ctx.Err() == nil { - select { - case <-ctx.Done(): - break loop + manifestSub, err := manifestTopic.Subscribe() + if err != nil { + return xerrors.Errorf("subscribing to manifest pubsub topic: %w", err) + } - default: - var msg *pubsub.Message - msg, err = manifestSub.Next(ctx) + // XXX: load the initial manifest from disk! + // And save it! + + m.manifestChanges <- ManifestUpdateMessage{ + MessageSequence: 0, + ManifestVersion: 0, + Manifest: m.initialManifest, + }.toManifest() + + m.errgrp.Go(func() error { + defer func() { + manifestSub.Cancel() + err := multierr.Combine( + manifestTopic.Close(), + m.unregisterTopicValidator(), + ) + // Pubsub likes to return context canceled errors if/when we unregister after + // closing pubsub. Ignore it. + if err != nil && !errors.Is(err, context.Canceled) { + _err = multierr.Append(_err, err) + } + if _err != nil { + log.Error("exited manifest subscription early: %+v", _err) + } + }() + + var msgSeqNumber uint64 + for m.runningCtx.Err() == nil { + msg, err := manifestSub.Next(m.runningCtx) if err != nil { - log.Errorf("manifestPubsub subscription.Next() returned an error: %+v", err) - break + if m.runningCtx.Err() == nil { + return xerrors.Errorf("error from manifest subscription: %w", err) + } + return nil } - manifest, ok := msg.ValidatorData.(*Manifest) + update, ok := msg.ValidatorData.(*ManifestUpdateMessage) if !ok { log.Errorf("invalid manifestValidatorData: %+v", msg.ValidatorData) continue } - m.acceptNextManifest(manifest) + if update.MessageSequence <= msgSeqNumber { + log.Warnf("discarded manifest update %d", update.MessageSequence) + continue + } + log.Infof("received manifest update %d", update.MessageSequence) + msgSeqNumber = update.MessageSequence + select { + case m.manifestChanges <- update.toManifest(): + case <-m.runningCtx.Done(): + return nil + } } - } + return nil + }) - manifestSub.Cancel() - if err := m.teardownManifestPubsub(); err != nil { - errCh <- xerrors.Errorf("shutting down manifest pubsub: %w", err) - } -} - -func (m *DynamicManifestProvider) teardownPubsub(topic *pubsub.Topic, topicName string) error { - return multierr.Combine( - m.pubsub.UnregisterTopicValidator(topicName), - topic.Close(), - ) + return nil } -func (m *DynamicManifestProvider) teardownManifestPubsub() error { - return m.teardownPubsub(m.manifestTopic, ManifestPubSubTopicName) -} - -// Checks if we should accept the manifest that we received through pubsub -// and sets nextManifest if it is the case -func (m *DynamicManifestProvider) acceptNextManifest(manifest *Manifest) { - m.lk.Lock() - defer m.lk.Unlock() - - if manifest.Sequence <= m.manifest.Sequence { - return - } - // TODO: Any additional logic here to determine what manifests to accept or not? - - m.nextManifest = manifest -} - -func (m *DynamicManifestProvider) setupManifestPubsub() (err error) { - topicName := ManifestPubSubTopicName +func (m *DynamicManifestProvider) registerTopicValidator() error { // using the same validator approach used for the message pubsub // to be homogeneous. var validator pubsub.ValidatorEx = func(ctx context.Context, pID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - var manifest Manifest - err := manifest.Unmarshal(bytes.NewReader(msg.Data)) + var update ManifestUpdateMessage + err := update.Unmarshal(bytes.NewReader(msg.Data)) if err != nil { return pubsub.ValidationReject } @@ -208,18 +195,17 @@ func (m *DynamicManifestProvider) setupManifestPubsub() (err error) { // Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests? // These should probably not be ValidationRejects to avoid banning in gossipsub // the centralized server in case of misconfigurations or bugs. - msg.ValidatorData = &manifest + msg.ValidatorData = &update return pubsub.ValidationAccept } - err = m.pubsub.RegisterTopicValidator(topicName, validator) + err := m.pubsub.RegisterTopicValidator(ManifestPubSubTopicName, validator) if err != nil { return xerrors.Errorf("registering topic validator: %w", err) } + return nil +} - m.manifestTopic, err = m.pubsub.Join(topicName) - if err != nil { - return xerrors.Errorf("could not join on pubsub topic: %s: %w", topicName, err) - } - return +func (m *DynamicManifestProvider) unregisterTopicValidator() error { + return m.pubsub.UnregisterTopicValidator(ManifestPubSubTopicName) } diff --git a/manifest/manifest.go b/manifest/manifest.go index b774b2ae..a8b03508 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -39,18 +39,15 @@ var ( } ) -type OnManifestChange func(ctx context.Context, prevManifest Manifest, errCh chan error) +type OnManifestChange func(ctx context.Context, prevManifest Manifest) error type ManifestProvider interface { - // Run starts any background tasks required for the operation - // of the manifest provider. - Run(context.Context, chan error) - // Returns the list of gpbft options to be used for gpbft configuration - GpbftOptions() []gpbft.Option - // Set callback to trigger to apply new manifests from F3. - SetManifestChangeCallback(OnManifestChange) - // Manifest accessor - Manifest() Manifest + // Start any background tasks required for the operation of the manifest provider. + Start(context.Context) error + // Stop stops a running manifest provider. + Stop(context.Context) error + // The channel on which manifest updates are returned. + ManifestUpdates() <-chan *Manifest } type Version string @@ -74,21 +71,12 @@ type EcConfig struct { CommiteeLookback uint64 } -// Manifest identifies the specific configuration for -// the F3 instance currently running. +// Manifest identifies the specific configuration for the F3 instance currently running. type Manifest struct { - // Sequence number of the manifest. - // This is used to identify if a new config needs to be applied - Sequence uint64 // Initial instance to used for the f3 instance InitialInstance uint64 // BootstrapEpoch from which the manifest should be applied BootstrapEpoch int64 - // Flag to determine if the peer should rebootstrap in this configuration - // change at BootstrapEpoch - ReBootstrap bool - // Specifies if the manifest should pause F3 until a new configuration arrives. - Pause bool // Network name to apply for this manifest. NetworkName gpbft.NetworkName // Updates to perform over the power table retrieved by the host @@ -100,10 +88,10 @@ type Manifest struct { *EcConfig } -func LocalDevnetManifest() Manifest { +func LocalDevnetManifest() *Manifest { rng := make([]byte, 4) _, _ = rand.Read(rng) - m := Manifest{ + m := &Manifest{ NetworkName: gpbft.NetworkName(fmt.Sprintf("localnet-%X", rng)), BootstrapEpoch: 1000, EcConfig: DefaultEcConfig, @@ -112,7 +100,7 @@ func LocalDevnetManifest() Manifest { } // Version that uniquely identifies the manifest. -func (m Manifest) Version() (Version, error) { +func (m *Manifest) Version() (Version, error) { b, err := json.Marshal(m) if err != nil { return "", xerrors.Errorf("computing manifest version: %w", err) @@ -123,7 +111,7 @@ func (m Manifest) Version() (Version, error) { // Marshal the manifest into JSON // We use JSON because we need to serialize a float and time.Duration // and the cbor serializer we use do not support these types yet. -func (m Manifest) Marshal() ([]byte, error) { +func (m *Manifest) Marshal() ([]byte, error) { b, err := json.Marshal(m) if err != nil { return nil, xerrors.Errorf("marshaling JSON: %w", err) @@ -139,11 +127,11 @@ func (m *Manifest) Unmarshal(r io.Reader) error { return nil } -func (m Manifest) DatastorePrefix() datastore.Key { +func (m *Manifest) DatastorePrefix() datastore.Key { return datastore.NewKey("/f3/" + string(m.NetworkName)) } -func (m Manifest) PubSubTopic() string { +func (m *Manifest) PubSubTopic() string { return PubSubTopicFromNetworkName(m.NetworkName) } @@ -151,7 +139,7 @@ func PubSubTopicFromNetworkName(nn gpbft.NetworkName) string { return "/f3/granite/0.0.1/" + string(nn) } -func (m Manifest) GpbftOptions() []gpbft.Option { +func (m *Manifest) GpbftOptions() []gpbft.Option { var opts []gpbft.Option if m.GpbftConfig == nil { diff --git a/manifest/manifest_sender.go b/manifest/manifest_sender.go index 75cc21e9..d5fbb3a4 100644 --- a/manifest/manifest_sender.go +++ b/manifest/manifest_sender.go @@ -20,13 +20,14 @@ type ManifestSender struct { interval time.Duration // lock to that guards the update of the manifest. - lk sync.RWMutex - manifest Manifest - // Used to stop publishing manifests - cancel context.CancelFunc + lk sync.Mutex + manifest *Manifest + version uint64 + msgSeq uint64 + paused bool } -func NewManifestSender(h host.Host, pubsub *pubsub.PubSub, firstManifest Manifest, pubishInterval time.Duration) (*ManifestSender, error) { +func NewManifestSender(h host.Host, pubsub *pubsub.PubSub, firstManifest *Manifest, pubishInterval time.Duration) (*ManifestSender, error) { topicName := ManifestPubSubTopicName m := &ManifestSender{ manifest: firstManifest, @@ -52,45 +53,67 @@ func (m *ManifestSender) PeerInfo() peer.AddrInfo { return m.h.Peerstore().PeerInfo(m.h.ID()) } -func (m *ManifestSender) Start(ctx context.Context) { - if m.cancel != nil { - log.Warn("manifest sender already running") - return - } - ctx, m.cancel = context.WithCancel(ctx) - +func (m *ManifestSender) Run(ctx context.Context) error { ticker := time.NewTicker(m.interval) - for { + for ctx.Err() == nil { select { case <-ticker.C: err := m.publishManifest(ctx) if err != nil { - log.Error("error publishing manifest: %w", err) + if ctx.Err() != nil { + return nil + } + return xerrors.Errorf("error publishing manifest: %w", err) } case <-ctx.Done(): - return + return nil } } + return nil } func (m *ManifestSender) publishManifest(ctx context.Context) error { - m.lk.RLock() - defer m.lk.RUnlock() + m.lk.Lock() + defer m.lk.Unlock() - b, err := m.manifest.Marshal() + update := ManifestUpdateMessage{ + MessageSequence: m.msgSeq, + ManifestVersion: m.version, + } + if !m.paused { + update.Manifest = m.manifest + } + + b, err := update.Marshal() if err != nil { return err } return m.manifestTopic.Publish(ctx, b) } -func (m *ManifestSender) UpdateManifest(manifest Manifest) { +func (m *ManifestSender) UpdateManifest(manifest *Manifest) { m.lk.Lock() m.manifest = manifest + m.msgSeq++ + m.version++ + m.paused = false + m.lk.Unlock() +} + +func (m *ManifestSender) Pause() { + m.lk.Lock() + if !m.paused { + m.paused = true + m.msgSeq++ + } m.lk.Unlock() } -func (m *ManifestSender) Stop() { - m.cancel() - m.cancel = nil +func (m *ManifestSender) Resume() { + m.lk.Lock() + if m.paused { + m.paused = false + m.msgSeq++ + } + m.lk.Unlock() } diff --git a/manifest/manifest_test.go b/manifest/manifest_test.go index 0c2909e7..d58de092 100644 --- a/manifest/manifest_test.go +++ b/manifest/manifest_test.go @@ -15,9 +15,7 @@ import ( ) var base manifest.Manifest = manifest.Manifest{ - Sequence: 0, BootstrapEpoch: 10, - ReBootstrap: true, NetworkName: gpbft.NetworkName("test"), PowerUpdate: []certs.PowerTableDelta{ { diff --git a/manifest/static.go b/manifest/static.go index f3f50f56..36e2f54b 100644 --- a/manifest/static.go +++ b/manifest/static.go @@ -2,8 +2,6 @@ package manifest import ( "context" - - "github.com/filecoin-project/go-f3/gpbft" ) var _ ManifestProvider = (*StaticManifestProvider)(nil) @@ -11,21 +9,15 @@ var _ ManifestProvider = (*StaticManifestProvider)(nil) // Static manifest provider that doesn't allow any changes // in runtime to the initial manifest set in the provider type StaticManifestProvider struct { - manifest Manifest -} - -func NewStaticManifestProvider(m Manifest) ManifestProvider { - return &StaticManifestProvider{manifest: m} + ch <-chan *Manifest } -func (m *StaticManifestProvider) GpbftOptions() []gpbft.Option { - return m.manifest.GpbftOptions() +func NewStaticManifestProvider(m *Manifest) ManifestProvider { + ch := make(chan *Manifest, 1) + ch <- m + return &StaticManifestProvider{ch: ch} } -func (m *StaticManifestProvider) Manifest() Manifest { - return m.manifest -} - -func (m *StaticManifestProvider) Run(context.Context, chan error) {} - -func (m *StaticManifestProvider) SetManifestChangeCallback(OnManifestChange) {} +func (m *StaticManifestProvider) Start(context.Context) error { return nil } +func (m *StaticManifestProvider) Stop(context.Context) error { return nil } +func (m *StaticManifestProvider) ManifestUpdates() <-chan *Manifest { return m.ch } diff --git a/store.go b/store.go new file mode 100644 index 00000000..a2f94e67 --- /dev/null +++ b/store.go @@ -0,0 +1,32 @@ +package f3 + +import ( + "context" + + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/ec" + "github.com/filecoin-project/go-f3/manifest" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "golang.org/x/xerrors" +) + +// openCertstore opens the certificate store for the specific manifest (namespaced by the network +// name). +func openCertstore(ctx context.Context, ec ec.Backend, ds datastore.Datastore, m *manifest.Manifest) (*certstore.Store, error) { + + ds = namespace.Wrap(ds, m.DatastorePrefix()) + + ts, err := ec.GetTipsetByEpoch(ctx, m.BootstrapEpoch-m.ECFinality) + if err != nil { + return nil, xerrors.Errorf("getting initial power tipset: %w", err) + } + + initialPowerTable, err := ec.GetPowerTable(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting initial power table: %w", err) + } + + return certstore.OpenOrCreateStore(ctx, ds, m.InitialInstance, initialPowerTable) +} diff --git a/test/f3_test.go b/test/f3_test.go index 830b721b..f7fb55cd 100644 --- a/test/f3_test.go +++ b/test/f3_test.go @@ -20,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -31,134 +32,89 @@ const ( var log = logging.Logger("f3-testing") func TestSimpleF3(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, false) - env.Connect(ctx) - env.run(ctx) - env.waitForInstanceNumber(ctx, 5, 10*time.Second, false) - env.stop() + env.connectAll() + env.start() + env.waitForInstanceNumber(5, 10*time.Second, false) } func TestDynamicManifest_WithoutChanges(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 5, 10*time.Second, false) + env.waitForInstanceNumber(5, 10*time.Second, false) // no changes in manifest require.Equal(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() } func TestDynamicManifest_WithRebootstrap(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) + env.waitForInstanceNumber(3, 15*time.Second, false) + prevInstance := env.nodes[0].currentGpbftInstance() env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.ReBootstrap = true + env.addPowerDeltaForParticipants(&env.manifest, []gpbft.ActorID{2, 3}, big.NewInt(1), false) env.updateManifest() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it rebootstrapped and the number of instances is below prevInstance - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) < prevInstance) - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) + require.True(t, env.nodes[0].currentGpbftInstance() < prevInstance) + env.waitForInstanceNumber(3, 15*time.Second, false) require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() -} - -func TestDynamicManifest_WithoutRebootstrap(t *testing.T) { - ctx := context.Background() - env := newTestEnvironment(t, 2, true) - - env.Connect(ctx) - env.run(ctx) - - prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) - - env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.ReBootstrap = false - env.addPowerDeltaForParticipants(ctx, &env.manifest, []gpbft.ActorID{2, 3}, big.NewInt(1), false) - env.updateManifest() - - env.waitForManifestChange(prev, 15*time.Second, []*testNode{env.nodes[0], env.nodes[1]}) - // check that the runner continued without rebootstrap - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) >= prevInstance) - env.waitForInstanceNumber(ctx, prevInstance+10, 15*time.Second, false) - require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) - env.requireEqualManifests(false) // check that the power table is updated - ts, err := env.ec.GetTipsetByEpoch(ctx, int64(env.nodes[0].CurrentGpbftInstance(t, ctx))) + ts, err := env.ec.GetTipsetByEpoch(env.testCtx, int64(env.nodes[0].currentGpbftInstance())) require.NoError(t, err) - pt, err := env.nodes[0].f3.GetPowerTable(ctx, ts.Key()) + pt, err := env.nodes[0].f3.GetPowerTable(env.testCtx, ts.Key()) require.NoError(t, err) require.Equal(t, len(pt), 4) - env.stop() } func TestDynamicManifest_WithPauseAndRebootstrap(t *testing.T) { - ctx := context.Background() env := newTestEnvironment(t, 2, true) - env.Connect(ctx) - env.run(ctx) + env.connectAll() + env.start() prev := env.nodes[0].f3.Manifest() - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) - prevInstance := env.nodes[0].CurrentGpbftInstance(t, ctx) + env.waitForInstanceNumber(3, 15*time.Second, false) + prevInstance := env.nodes[0].currentGpbftInstance() - env.manifest.BootstrapEpoch = 953 - env.manifest.Sequence = 1 - env.manifest.Pause = true - env.manifest.ReBootstrap = true - env.updateManifest() + env.manifestSender.Pause() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it paused - require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) - env.requireEqualManifests(false) env.waitForNodesStoppped(10 * time.Second) // New manifest with sequence 2 to start again F3 prev = env.nodes[0].f3.Manifest() env.manifest.BootstrapEpoch = 956 - env.manifest.Pause = false - env.manifest.Sequence = 2 - env.manifest.ReBootstrap = true env.updateManifest() - env.waitForManifestChange(prev, 15*time.Second, env.nodes) + env.waitForManifestChange(prev, 15*time.Second) // check that it rebootstrapped and the number of instances is below prevInstance - require.True(t, env.nodes[0].CurrentGpbftInstance(t, ctx) < prevInstance) - env.waitForInstanceNumber(ctx, 3, 15*time.Second, false) + require.True(t, env.nodes[0].currentGpbftInstance() < prevInstance) + env.waitForInstanceNumber(3, 15*time.Second, false) require.NotEqual(t, prev, env.nodes[0].f3.Manifest()) env.requireEqualManifests(false) - env.stop() } var base manifest.Manifest = manifest.Manifest{ - Sequence: 0, BootstrapEpoch: 950, InitialInstance: 0, NetworkName: gpbft.NetworkName("f3-test"), @@ -180,14 +136,14 @@ var base manifest.Manifest = manifest.Manifest{ } type testNode struct { - h host.Host - f3 *f3.F3 - errCh <-chan error + e *testEnv + h host.Host + f3 *f3.F3 } -func (n *testNode) CurrentGpbftInstance(t *testing.T, ctx context.Context) uint64 { - c, err := n.f3.GetLatestCert(ctx) - require.NoError(t, err) +func (n *testNode) currentGpbftInstance() uint64 { + c, err := n.f3.GetLatestCert(n.e.testCtx) + require.NoError(n.e.t, err) if c == nil { return 0 } @@ -196,6 +152,8 @@ func (n *testNode) CurrentGpbftInstance(t *testing.T, ctx context.Context) uint6 type testEnv struct { t *testing.T + errgrp *errgroup.Group + testCtx context.Context signingBackend *signing.FakeBackend nodes []*testNode ec *ec.FakeEC @@ -207,25 +165,29 @@ type testEnv struct { // signals the update to the latest manifest in the environment. func (e *testEnv) updateManifest() { - e.manifestSender.UpdateManifest(e.manifest) + m := e.manifest // copy because we mutate it locally. + e.manifestSender.UpdateManifest(&m) } -func (e *testEnv) newHeadEveryPeriod(ctx context.Context, period time.Duration) { - // set a timer that sets a new head every period - go func() { - for { +func (e *testEnv) newHeadEveryPeriod(period time.Duration) { + e.errgrp.Go(func() error { + // set a timer that sets a new head every period + ticker := time.NewTicker(period) + defer ticker.Stop() + for e.testCtx.Err() == nil { select { - case <-ctx.Done(): - return - case <-time.After(period): + case <-e.testCtx.Done(): + return nil + case <-ticker.C: e.ec.SetCurrentHead(e.ec.GetCurrentHead() + 1) // fmt.Println("Setting new head", e.ec.GetCurrentHead()) } } - }() + return nil + }) } -func (e *testEnv) addPowerDeltaForParticipants(ctx context.Context, m *manifest.Manifest, participants []gpbft.ActorID, power *big.Int, runNodes bool) { +func (e *testEnv) addPowerDeltaForParticipants(m *manifest.Manifest, participants []gpbft.ActorID, power *big.Int, runNodes bool) { for _, n := range participants { nodeLen := len(e.nodes) newNode := false @@ -253,54 +215,65 @@ func (e *testEnv) addPowerDeltaForParticipants(ctx context.Context, m *manifest. require.NoError(e.t, err) } // run - e.runNode(ctx, e.nodes[nodeLen-1]) + e.startNode(nodeLen - 1) } } } // waits for all nodes to reach a specific instance number. // If the `strict` flag is enabled the check also applies to the non-running nodes -func (e *testEnv) waitForInstanceNumber(ctx context.Context, instanceNumber uint64, timeout time.Duration, strict bool) { +func (e *testEnv) waitForInstanceNumber(instanceNumber uint64, timeout time.Duration, strict bool) { require.Eventually(e.t, func() bool { - reached := 0 - for i := 0; i < len(e.nodes); i++ { + for _, n := range e.nodes { // nodes that are not running are not required to reach the instance // (it will actually panic if we try to fetch it because there is no // runner initialized) - if !e.nodes[i].f3.IsRunning() && !strict { - reached++ - } else if e.nodes[i].CurrentGpbftInstance(e.t, ctx) >= instanceNumber && e.nodes[i].f3.IsRunning() { - reached++ + if !n.f3.IsRunning() { + if strict { + return false + } + continue } - if reached >= len(e.nodes) { - return true + if n.currentGpbftInstance() < instanceNumber { + return false } } - return false + return true }, timeout, e.manifest.ECPeriod) } -func (e *testEnv) waitForManifestChange(prev manifest.Manifest, timeout time.Duration, nodes []*testNode) { +func (e *testEnv) waitForManifestChange(prev *manifest.Manifest, timeout time.Duration) { require.Eventually(e.t, func() bool { - reached := 0 - for i := 0; i < len(e.nodes); i++ { - for i := 0; i < len(nodes); i++ { - v1, _ := nodes[i].f3.Manifest().Version() - v2, _ := prev.Version() - if v1 != v2 { - reached++ - } - if reached == len(nodes) { - return true - } + oldVersion, err := prev.Version() + require.NoError(e.t, err) + for _, n := range e.nodes { + if !n.f3.IsRunning() { + continue + } + + m := n.f3.Manifest() + if m == nil { + return false + } + + v, err := m.Version() + require.NoError(e.t, err) + if v == oldVersion { + return false } } - return false + return true }, timeout, ManifestSenderTimeout) } func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { - env := testEnv{t: t, net: mocknet.New()} + ctx, cancel := context.WithCancel(context.Background()) + grp, ctx := errgroup.WithContext(ctx) + env := testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()} + env.t.Cleanup(func() { + cancel() + require.NoError(env.t, env.errgrp.Wait()) + }) // populate manifest m := base @@ -322,7 +295,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { var manifestServer peer.ID if dynamicManifest { - env.newManifestSender(context.Background()) + env.newManifestSender() manifestServer = env.manifestSender.SenderID() } @@ -334,7 +307,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { } func (e *testEnv) initNode(i int, manifestServer peer.ID) { - n, err := e.newF3Instance(context.Background(), i, manifestServer) + n, err := e.newF3Instance(i, manifestServer) require.NoError(e.t, err) e.nodes = append(e.nodes, n) } @@ -378,19 +351,10 @@ func (e *testEnv) waitForNodesStoppped(timeout time.Duration) { e.waitFor(f, timeout) } -func (e *testEnv) runNode(ctx context.Context, n *testNode) { - errCh := make(chan error) - n.errCh = errCh - go func() { - err := n.f3.Run(ctx) - errCh <- err - }() -} - -func (e *testEnv) run(ctx context.Context) { +func (e *testEnv) start() { // Start the nodes - for _, n := range e.nodes { - e.runNode(ctx, n) + for i := range e.nodes { + e.startNode(i) } // wait for nodes to initialize @@ -398,40 +362,22 @@ func (e *testEnv) run(ctx context.Context) { // If it exists, start the manifest sender if e.manifestSender != nil { - go func() { - e.manifestSender.Start(ctx) - }() + e.errgrp.Go(func() error { return e.manifestSender.Run(e.testCtx) }) } // start creating new heads every ECPeriod - e.newHeadEveryPeriod(ctx, e.manifest.ECPeriod) - e.monitorNodesError(ctx) + e.newHeadEveryPeriod(e.manifest.ECPeriod) } -func (e *testEnv) stop() { - for _, n := range e.nodes { - n.f3.Stop() - // close errCh? - } -} - -func (e *testEnv) monitorNodesError(ctx context.Context) { - for _, n := range e.nodes { - go func(n *testNode) { - select { - case <-ctx.Done(): - return - case err := <-n.errCh: - if ctx.Err() != nil { - return - } - require.NoError(e.t, err) - } - }(n) - } +func (e *testEnv) startNode(i int) { + n := e.nodes[i] + require.NoError(e.t, n.f3.Start(e.testCtx)) + e.t.Cleanup(func() { + require.NoError(e.t, n.f3.Stop(context.Background())) + }) } -func (e *testEnv) Connect(ctx context.Context) { +func (e *testEnv) connectAll() { for i, n := range e.nodes { for j := i + 1; j < len(e.nodes); j++ { _, err := e.net.LinkPeers(n.h.ID(), e.nodes[j].h.ID()) @@ -454,24 +400,25 @@ func (e *testEnv) Connect(ctx context.Context) { } } -func (e *testEnv) newManifestSender(ctx context.Context) { +func (e *testEnv) newManifestSender() { h, err := e.net.GenPeer() require.NoError(e.t, err) - ps, err := pubsub.NewGossipSub(ctx, h) + ps, err := pubsub.NewGossipSub(e.testCtx, h) require.NoError(e.t, err) - e.manifestSender, err = manifest.NewManifestSender(h, ps, e.manifest, ManifestSenderTimeout) + m := e.manifest // copy because we mutate this + e.manifestSender, err = manifest.NewManifestSender(h, ps, &m, ManifestSenderTimeout) require.NoError(e.t, err) } -func (e *testEnv) newF3Instance(ctx context.Context, id int, manifestServer peer.ID) (*testNode, error) { +func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, error) { h, err := e.net.GenPeer() if err != nil { return nil, xerrors.Errorf("creating libp2p host: %w", err) } - ps, err := pubsub.NewGossipSub(ctx, h) + ps, err := pubsub.NewGossipSub(e.testCtx, h) if err != nil { return nil, xerrors.Errorf("creating gossipsub: %w", err) } @@ -491,62 +438,58 @@ func (e *testEnv) newF3Instance(ctx context.Context, id int, manifestServer peer return nil, xerrors.Errorf("creating a datastore: %w", err) } + m := e.manifest // copy because we mutate this var mprovider manifest.ManifestProvider if manifestServer != peer.ID("") { - mprovider = manifest.NewDynamicManifestProvider(e.manifest, ps, e.ec, manifestServer) + mprovider = manifest.NewDynamicManifestProvider(&m, ps, e.ec, manifestServer) } else { - mprovider = manifest.NewStaticManifestProvider(e.manifest) + mprovider = manifest.NewStaticManifestProvider(&m) } e.signingBackend.Allow(int(id)) - module, err := f3.New(ctx, mprovider, ds, h, ps, + module, err := f3.New(e.testCtx, mprovider, ds, h, ps, e.signingBackend, e.ec, log, nil) if err != nil { return nil, xerrors.Errorf("creating module: %w", err) } - mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module)) - go runMessageSubscription(ctx, module, gpbft.ActorID(id), e.signingBackend) - return &testNode{h: h, f3: module}, nil + e.errgrp.Go(func() error { + return runMessageSubscription(e.testCtx, module, gpbft.ActorID(id), e.signingBackend) + }) + + return &testNode{e: e, h: h, f3: module}, nil } // TODO: This code is copy-pasta from cmd/f3/run.go, consider taking it out into a shared testing lib. // We could do the same to the F3 test instantiation -func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) { - for { +func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error { + ch := make(chan *gpbft.MessageBuilder, 4) + module.SubscribeForMessagesToSign(ch) + for ctx.Err() == nil { select { - case <-ctx.Done(): - return - default: - } - - ch := make(chan *gpbft.MessageBuilder, 4) - module.SubscribeForMessagesToSign(ch) - inner: - for { - select { - case mb, ok := <-ch: - if !ok { - // the broadcast bus kicked us out - log.Infof("lost message bus subscription, retrying") - break inner - } - signatureBuilder, err := mb.PrepareSigningInputs(actorID) - if err != nil { - log.Errorf("preparing signing inputs: %+v", err) - } - // signatureBuilder can be sent over RPC - payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) - if err != nil { - log.Errorf("signing message: %+v", err) - } - // signatureBuilder and signatures can be returned back over RPC - module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) - case <-ctx.Done(): - return + case mb, ok := <-ch: + if !ok { + // the broadcast bus kicked us out + log.Infof("lost message bus subscription, retrying") + ch = make(chan *gpbft.MessageBuilder, 4) + module.SubscribeForMessagesToSign(ch) + continue + } + signatureBuilder, err := mb.PrepareSigningInputs(actorID) + if err != nil { + return xerrors.Errorf("preparing signing inputs: %w", err) } + // signatureBuilder can be sent over RPC + payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) + if err != nil { + return xerrors.Errorf("signing message: %w", err) + } + // signatureBuilder and signatures can be returned back over RPC + module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) + case <-ctx.Done(): + return nil } - } + return nil }