Skip to content

Commit

Permalink
Improve lifecycle management
Browse files Browse the repository at this point in the history
1. Merge the client and the runner. The distinction was unclear and the
client/runner/module tended to reach into each other. This change merges
the client/runner and then separates the new "runner" from the module as
much as possible.
2. Completely stop/discard the runner when rebootstrapping. The new
logic carefully waits for all components to stop before moving on.
3. Simplify locking and make sure we take the locks where appropriate.
4. Merge bootstrap and re-configure logic. The dynamic manifest client
no longer cares about _when_ a manifest should be applied, it simply
gives it to the module (F3) and let's F3 us its normal bootstrap logic.

Finally, I've improved the tests to:

1. Always on exit (checking for errors).
2. Never fail from goroutines.
3. Correctly wait for manifest changes (previously, it would wait for at
least one node to change manifests).

NOTEs:

1. This removes the ability to reconfig without rebootstrap, but
preserves the ability to _pause_ without rebootstrap.
2. This causes bootstrap to start at the time the bootstrap epoch
_should_ have happened instead of starting at the next non-null epoch.
In practice, this should behave better as all nodes will start at the
same time (and will look back 900 epochs anyways).
  • Loading branch information
Stebalien committed Jul 6, 2024
1 parent 00d8fe0 commit 2a57762
Show file tree
Hide file tree
Showing 13 changed files with 897 additions and 1,005 deletions.
64 changes: 35 additions & 29 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

Expand Down Expand Up @@ -137,6 +138,8 @@ var manifestServeCmd = cli.Command{
return xerrors.Errorf("initializing libp2p host: %w", err)
}

defer func() { _ = host.Close() }()

// Connect to all bootstrap addresses once. This should be sufficient to build
// the pubsub mesh, if not then we need to periodically re-connect and/or pull in
// the Lotus bootstrapping, which includes DHT connectivity.
Expand Down Expand Up @@ -171,15 +174,15 @@ var manifestServeCmd = cli.Command{
}

manifestPath := c.String("manifest")
loadManifestAndVersion := func() (manifest.Manifest, manifest.Version, error) {
loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) {

m, err := loadManifest(manifestPath)
if err != nil {
return manifest.Manifest{}, "", xerrors.Errorf("loading manifest: %w", err)
return nil, "", xerrors.Errorf("loading manifest: %w", err)
}
version, err := m.Version()
if err != nil {
return manifest.Manifest{}, "", xerrors.Errorf("versioning manifest: %w", err)
return nil, "", xerrors.Errorf("versioning manifest: %w", err)
}
return m, version, nil
}
Expand All @@ -200,46 +203,49 @@ var manifestServeCmd = cli.Command{
}
_, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion)

go func() {
sender.Start(c.Context)
}()
defer func() {
sender.Stop()
_ = host.Close()
}()

checkTicker := time.NewTicker(c.Duration("checkInterval"))
for c.Context.Err() == nil {
select {
case <-c.Context.Done():
return c.Context.Err()
case <-checkTicker.C:
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
} else if manifestVersion != nextManifestVersion {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
sender.UpdateManifest(nextManifest)
manifestVersion = nextManifestVersion
checkInterval := c.Duration("checkInterval")

errgrp, ctx := errgroup.WithContext(c.Context)
errgrp.Go(func() error { return sender.Run(ctx) })
errgrp.Go(func() error {
checkTicker := time.NewTicker(checkInterval)
defer checkTicker.Stop()

for ctx.Err() == nil {
select {
case <-ctx.Done():
return nil
case <-checkTicker.C:
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
} else if manifestVersion != nextManifestVersion {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
sender.UpdateManifest(nextManifest)
manifestVersion = nextManifestVersion
}
}
}
}
return nil

return nil
})

return errgrp.Wait()
},
}

func getManifest(c *cli.Context) (manifest.Manifest, error) {
func getManifest(c *cli.Context) (*manifest.Manifest, error) {
manifestPath := c.String("manifest")
return loadManifest(manifestPath)
}

func loadManifest(path string) (manifest.Manifest, error) {
func loadManifest(path string) (*manifest.Manifest, error) {
f, err := os.Open(path)
if err != nil {
return manifest.Manifest{}, xerrors.Errorf("opening %s to load manifest: %w", path, err)
return nil, xerrors.Errorf("opening %s to load manifest: %w", path, err)
}
defer f.Close()
var m manifest.Manifest

err = m.Unmarshal(f)
return m, err
return &m, err
}
14 changes: 6 additions & 8 deletions cmd/f3/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,18 @@ var runCmd = cli.Command{
return xerrors.Errorf("creating module: %w", err)
}

mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module))
go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend)

return module.Run(ctx)
if err := module.Start(ctx); err != nil {
return nil
}
<-ctx.Done()
return module.Stop(context.Background())
},
}

func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) {
for {
select {
case <-ctx.Done():
return
default:
}
for ctx.Err() == nil {

ch := make(chan *gpbft.MessageBuilder, 4)
module.SubscribeForMessagesToSign(ch)
Expand Down
29 changes: 29 additions & 0 deletions ec/powerdelta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ec

import (
"context"
"fmt"

"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
)

func WithModifiedPower(backend Backend, delta []certs.PowerTableDelta) Backend {
return &withModifiedPower{
Backend: backend,
delta: delta,
}
}

type withModifiedPower struct {
Backend
delta []certs.PowerTableDelta
}

func (b *withModifiedPower) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) {
pt, err := b.Backend.GetPowerTable(ctx, ts)
if err != nil {
return nil, fmt.Errorf("getting power table: %w", err)

Check warning on line 26 in ec/powerdelta.go

View check run for this annotation

Codecov / codecov/patch

ec/powerdelta.go#L26

Added line #L26 was not covered by tests
}
return certs.ApplyPowerTableDiffs(pt, b.delta)
}
Loading

0 comments on commit 2a57762

Please sign in to comment.