From 67c4b7266ae9350184365c08050a07d23d8432a9 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Wed, 6 Nov 2024 11:20:02 +0530 Subject: [PATCH 1/5] apply raw codec filter for eth get logs --- chain/index/events.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/index/events.go b/chain/index/events.go index a1eb84a4280..df3f315ec95 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + "github.com/multiformats/go-multicodec" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -563,6 +564,9 @@ func makePrefillFilterQuery(f *EventFilter) ([]any, string, error) { clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } } + } else { + clauses = append(clauses, "ee.codec=?") + values = append(values, uint64(multicodec.Raw)) } s := `SELECT From 0d7956c2f550bb7e930376ebcbbc11e06628ecbc Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Wed, 6 Nov 2024 11:28:45 +0530 Subject: [PATCH 2/5] return error when we hit max filter result --- chain/index/events.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/chain/index/events.go b/chain/index/events.go index df3f315ec95..56dd4394289 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -23,6 +23,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +var ErrMaxResultsReached = xerrors.New("max results limit reached, results truncated") + const maxLookBackForWait = 120 // one hour of tipsets type executedMessage struct { @@ -359,9 +361,9 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter) if row.id != currentID { // Unfortunately we can't easily incorporate the max results limit into the query due to the // unpredictable number of rows caused by joins - // Break here to stop collecting rows + // Error here to inform the caller that we've hit the max results limit if f.MaxResults > 0 && len(ces) >= f.MaxResults { - break + return nil, ErrMaxResultsReached } currentID = row.id From 4f9b7541a90be4528b8e7abacfeb352805084a0a Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 7 Nov 2024 14:21:19 +0530 Subject: [PATCH 3/5] add optional field codec in event filter --- chain/index/events.go | 5 ++--- chain/index/interface.go | 2 ++ node/impl/full/eth.go | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/chain/index/events.go b/chain/index/events.go index 56dd4394289..6222990f237 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" - "github.com/multiformats/go-multicodec" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -566,9 +565,9 @@ func makePrefillFilterQuery(f *EventFilter) ([]any, string, error) { clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } } - } else { + } else if f.Codec != 0 { // if no keys are specified, we can use the codec filter clauses = append(clauses, "ee.codec=?") - values = append(values, uint64(multicodec.Raw)) + values = append(values, f.Codec) } s := `SELECT diff --git a/chain/index/interface.go b/chain/index/interface.go index e312648e6cf..d7061ae5526 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -47,6 +47,8 @@ type EventFilter struct { KeysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match MaxResults int // maximum number of results to collect, 0 is unlimited + + Codec uint64 // optional codec filter, only used if KeysWithCodec is not set } type Indexer interface { diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 4e20e0ca534..f83d5e2d533 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1768,12 +1768,19 @@ func (e *EthEventHandler) ethGetEventsForFilter(ctx context.Context, filterSpec return nil, xerrors.New("cannot ask for events for a tipset at or greater than head") } + var codec uint64 + // if no keys are specified, we can use the codec filter to make sure we only get `raw` encoded events + if len(pf.keys) == 0 { + codec = uint64(multicodec.Raw) + } + ef := &index.EventFilter{ MinHeight: pf.minHeight, MaxHeight: pf.maxHeight, TipsetCid: pf.tipsetCid, Addresses: pf.addresses, KeysWithCodec: pf.keys, + Codec: codec, MaxResults: e.EventFilterManager.MaxFilterResults, } From 6cab093013d9c9c1b680aa6d4b45eb29a1154513 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 7 Nov 2024 14:21:49 +0530 Subject: [PATCH 4/5] add tests for filter codec and max results --- chain/index/events_test.go | 269 +++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) diff --git a/chain/index/events_test.go b/chain/index/events_test.go index 5cf00e89ff0..8a83e9e3fcd 100644 --- a/chain/index/events_test.go +++ b/chain/index/events_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" @@ -390,6 +391,257 @@ func TestGetEventsFilterByAddress(t *testing.T) { } } +func TestGetEventsForFilterWithRawCodec(t *testing.T) { + ctx := context.Background() + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + headHeight := abi.ChainEpoch(60) + + // Setup the indexer and chain store with the specified head height + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + t.Cleanup(func() { _ = si.Close() }) + + // Define codec constants (replace with actual multicodec values) + var ( + codecRaw = uint64(multicodec.Raw) + codecCBOR = uint64(multicodec.Cbor) + ) + + // Create events with different codecs + evRaw1 := fakeEventWithCodec( + abi.ActorID(1), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + codecRaw, + ) + + evCBOR := fakeEventWithCodec( + abi.ActorID(2), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr2")}, + }, + codecCBOR, + ) + + evRaw2 := fakeEventWithCodec( + abi.ActorID(3), + []kv{ + {k: "type", v: []byte("transfer")}, + {k: "recipient", v: []byte("addr3")}, + }, + codecRaw, + ) + + // Aggregate events + events := []types.Event{*evRaw1, *evCBOR, *evRaw2} + + // Create a fake message and associate it with the events + fm := fakeMessage(address.TestAddress, address.TestAddress) + em1 := executedMessage{ + msg: fm, + evs: events, + } + + // Mock the Actor to Address mapping + si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + return idAddr, true + }) + + // Mock the executed messages loader + si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{em1}, nil + }) + + // Create fake tipsets + fakeTipSet1 := fakeTipSet(t, rng, 1, nil) + fakeTipSet2 := fakeTipSet(t, rng, 2, nil) + + // Associate tipsets with their heights and CIDs + cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1 + cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2 + cs.SetTipSetByCid(t, fakeTipSet1) + cs.SetTipSetByCid(t, fakeTipSet2) + + // Associate messages with tipsets + cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm}) + + // Apply the indexer to process the tipsets + require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2)) + + t.Run("FilterEventsByRawCodecWithoutKeys", func(t *testing.T) { + f := &EventFilter{ + MinHeight: 1, + MaxHeight: 2, + Codec: codecRaw, // Set to RAW codec + } + + // Retrieve events based on the filter + ces, err := si.GetEventsForFilter(ctx, f) + require.NoError(t, err) + + // Define expected collected events (only RAW encoded events) + expectedCES := []*CollectedEvent{ + { + Entries: evRaw1.Entries, + EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw1.Emitter))), + EventIdx: 0, + Reverted: false, + Height: 1, + TipSetKey: fakeTipSet1.Key(), + MsgIdx: 0, + MsgCid: fm.Cid(), + }, + { + Entries: evRaw2.Entries, + EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw2.Emitter))), + EventIdx: 2, // Adjust based on actual indexing + Reverted: false, + Height: 1, + TipSetKey: fakeTipSet1.Key(), + MsgIdx: 0, + MsgCid: fm.Cid(), + }, + } + + require.Equal(t, expectedCES, ces) + }) +} + +func TestMaxFilterResults(t *testing.T) { + ctx := context.Background() + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + headHeight := abi.ChainEpoch(60) + + // Setup the indexer and chain store with the specified head height + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + t.Cleanup(func() { _ = si.Close() }) + + // Define codec constants (replace with actual multicodec values) + var ( + codecRaw = uint64(multicodec.Raw) + codecCBOR = uint64(multicodec.Cbor) + ) + + // Create events with different codecs + evRaw1 := fakeEventWithCodec( + abi.ActorID(1), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + codecRaw, + ) + + evCBOR := fakeEventWithCodec( + abi.ActorID(2), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr2")}, + }, + codecCBOR, + ) + + evRaw2 := fakeEventWithCodec( + abi.ActorID(3), + []kv{ + {k: "type", v: []byte("transfer")}, + {k: "recipient", v: []byte("addr3")}, + }, + codecRaw, + ) + + evRaw3 := fakeEventWithCodec( + abi.ActorID(4), + []kv{ + {k: "type", v: []byte("transfer")}, + {k: "recipient", v: []byte("addr4")}, + }, + codecCBOR, + ) + + // Aggregate events + events := []types.Event{*evRaw1, *evCBOR, *evRaw2, *evRaw3} + + // Create a fake message and associate it with the events + fm := fakeMessage(address.TestAddress, address.TestAddress) + em1 := executedMessage{ + msg: fm, + evs: events, + } + + // Mock the Actor to Address mapping + si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + return idAddr, true + }) + + // Mock the executed messages loader + si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{em1}, nil + }) + + // Create fake tipsets + fakeTipSet1 := fakeTipSet(t, rng, 1, nil) + fakeTipSet2 := fakeTipSet(t, rng, 2, nil) + + // Associate tipsets with their heights and CIDs + cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1 + cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2 + cs.SetTipSetByCid(t, fakeTipSet1) + cs.SetTipSetByCid(t, fakeTipSet2) + + // Associate messages with tipsets + cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm}) + + // Apply the indexer to process the tipsets + require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2)) + + // if we hit max results, we should get an error + // we have total 4 events + testCases := []struct { + name string + maxResults int + expectedCount int + expectedErr string + }{ + {name: "no max results", maxResults: 0, expectedCount: 4}, + {name: "max result more that total events", maxResults: 10, expectedCount: 4}, + {name: "max results less than total events", maxResults: 1, expectedErr: ErrMaxResultsReached.Error()}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f := &EventFilter{ + MinHeight: 1, + MaxHeight: 2, + MaxResults: tc.maxResults, + } + + ces, err := si.GetEventsForFilter(ctx, f) + if tc.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedCount, len(ces)) + } + }) + } +} + func sortAddresses(addrs []address.Address) { sort.Slice(addrs, func(i, j int) bool { return addrs[i].String() < addrs[j].String() @@ -435,6 +687,23 @@ func fakeEvent(emitter abi.ActorID, indexed []kv, unindexed []kv) *types.Event { return ev } +func fakeEventWithCodec(emitter abi.ActorID, indexed []kv, codec uint64) *types.Event { + ev := &types.Event{ + Emitter: emitter, + } + + for _, in := range indexed { + ev.Entries = append(ev.Entries, types.EventEntry{ + Flags: 0x01, + Key: in.k, + Codec: codec, + Value: in.v, + }) + } + + return ev +} + type kv struct { k string v []byte From 8cca81cb38d2b616367f655f00050bc0f8ef8a61 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 7 Nov 2024 14:35:37 +0530 Subject: [PATCH 5/5] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56421ef500c..8dcd6932925 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ - Try harder in the F3 participation loop to participate using the same lotus node ([filecoin-project/lotus#12664](https://github.com/filecoin-project/lotus/pull/12664)). - The mining loop will now correctly "stick" to the same upstream lotus node for all operations pertaining to mining a single block ([filecoin-project/lotus#12665](https://github.com/filecoin-project/lotus/pull/12665)). - Make the ordering of event output for `eth_` APIs and `GetActorEventsRaw` consistent, sorting ascending on: epoch, message index, event index and original event entry order. ([filecoin-project/lotus#12623](https://github.com/filecoin-project/lotus/pull/12623)) - +- Return error after hitting max results in event filtering and add default `raw` codec for eth_getLogs. ([filecoin-project/lotus#12671](https://github.com/filecoin-project/lotus/pull/12671)) ## Deps # UNRELEASED Node v1.30.0