Skip to content

Commit 25be1e8

Browse files
JukLee0irawanwiset25
authored andcommitted
eth/filters: fix pending for getLogs (ethereum#24949)
1 parent 9888d03 commit 25be1e8

File tree

4 files changed

+61
-19
lines changed

4 files changed

+61
-19
lines changed

accounts/abi/bind/backends/simulated.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ type SimulatedBackend struct {
6161
database ethdb.Database // In memory database to store our testing data
6262
blockchain *core.BlockChain // Ethereum blockchain to handle the consensus
6363

64-
mu sync.Mutex
65-
pendingBlock *types.Block // Currently pending block that will be imported on request
66-
pendingState *state.StateDB // Currently pending state that will be the active on on request
64+
mu sync.Mutex
65+
pendingBlock *types.Block // Currently pending block that will be imported on request
66+
pendingState *state.StateDB // Currently pending state that will be the active on request
67+
pendingReceipts types.Receipts // Currently receipts for the pending block
6768

6869
events *filters.EventSystem // Event system for filtering log events live
6970

@@ -126,8 +127,8 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
126127
database: database,
127128
blockchain: blockchain,
128129
config: genesis.Config,
129-
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
130130
}
131+
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
131132
blockchain.Client = backend
132133
backend.rollback()
133134
return backend
@@ -146,8 +147,8 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
146147
database: database,
147148
blockchain: blockchain,
148149
config: genesis.Config,
149-
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
150150
}
151+
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
151152
backend.rollback()
152153
return backend
153154
}
@@ -400,7 +401,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
400401
}
401402

402403
// Include tx in chain.
403-
blocks, _ := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
404+
blocks, receipts := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
404405
for _, tx := range b.pendingBlock.Transactions() {
405406
block.AddTxWithChain(b.blockchain, tx)
406407
}
@@ -410,6 +411,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
410411

411412
b.pendingBlock = blocks[0]
412413
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
414+
b.pendingReceipts = receipts[0]
413415
return nil
414416
}
415417

@@ -421,7 +423,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
421423
var filter *filters.Filter
422424
if query.BlockHash != nil {
423425
// Block filter requested, construct a single-shot filter
424-
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
426+
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
425427
} else {
426428
// Initialize unset filter boundaried to run from genesis to chain head
427429
from := int64(0)
@@ -433,7 +435,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
433435
to = query.ToBlock.Int64()
434436
}
435437
// Construct the range filter
436-
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
438+
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
437439
}
438440
// Run the filter and return all the logs
439441
logs, err := filter.Logs(ctx)
@@ -523,8 +525,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
523525
// filterBackend implements filters.Backend to support filtering for logs without
524526
// taking bloom-bits acceleration structures into account.
525527
type filterBackend struct {
526-
db ethdb.Database
527-
bc *core.BlockChain
528+
db ethdb.Database
529+
bc *core.BlockChain
530+
backend *SimulatedBackend
528531
}
529532

530533
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
@@ -545,6 +548,10 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
545548
return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil
546549
}
547550

551+
func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
552+
return fb.backend.pendingBlock, fb.backend.pendingReceipts
553+
}
554+
548555
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
549556
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
550557
if receipts == nil {

eth/filters/filter.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Backend interface {
3636
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
3737
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
3838
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
39+
PendingBlockAndReceipts() (*types.Block, types.Receipts)
3940

4041
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
4142
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
@@ -128,26 +129,35 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
128129
}
129130
return f.blockLogs(ctx, header)
130131
}
132+
// Short-cut if all we care about is pending logs
133+
if f.begin == rpc.PendingBlockNumber.Int64() {
134+
if f.end != rpc.PendingBlockNumber.Int64() {
135+
return nil, errors.New("invalid block range")
136+
}
137+
return f.pendingLogs()
138+
}
131139
// Figure out the limits of the filter range
132140
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
133141
if header == nil {
134142
return nil, nil
135143
}
136-
head := header.Number.Uint64()
137-
138-
if f.begin == -1 {
144+
var (
145+
head = header.Number.Uint64()
146+
end = uint64(f.end)
147+
pending = f.end == rpc.PendingBlockNumber.Int64()
148+
)
149+
if f.begin == rpc.LatestBlockNumber.Int64() {
139150
f.begin = int64(head)
140151
}
141-
end := uint64(f.end)
142-
if f.end == -1 {
152+
if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
143153
end = head
144154
}
145155
// Gather all indexed logs, and finish with non indexed ones
146156
var (
147-
logs []*types.Log
148-
err error
157+
logs []*types.Log
158+
err error
159+
size, sections = f.backend.BloomStatus()
149160
)
150-
size, sections := f.backend.BloomStatus()
151161
if indexed := sections * size; indexed > uint64(f.begin) {
152162
if indexed > end {
153163
logs, err = f.indexedLogs(ctx, end)
@@ -160,6 +170,13 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
160170
}
161171
rest, err := f.unindexedLogs(ctx, end)
162172
logs = append(logs, rest...)
173+
if pending {
174+
pendingLogs, err := f.pendingLogs()
175+
if err != nil {
176+
return nil, err
177+
}
178+
logs = append(logs, pendingLogs...)
179+
}
163180
return logs, err
164181
}
165182

@@ -272,6 +289,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
272289
return nil, nil
273290
}
274291

292+
// pendingLogs returns the logs matching the filter criteria within the pending block.
293+
func (f *Filter) pendingLogs() ([]*types.Log, error) {
294+
block, receipts := f.backend.PendingBlockAndReceipts()
295+
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
296+
var unfiltered []*types.Log
297+
for _, r := range receipts {
298+
unfiltered = append(unfiltered, r.Logs...)
299+
}
300+
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
301+
}
302+
return nil, nil
303+
}
304+
275305
func includes(addresses []common.Address, a common.Address) bool {
276306
for _, addr := range addresses {
277307
if addr == a {

eth/filters/filter_system_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (b *testBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*
9292
return logs, nil
9393
}
9494

95+
func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
96+
return nil, nil
97+
}
98+
9599
func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
96100
return b.txFeed.Subscribe(ch)
97101
}
@@ -602,7 +606,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
602606
var (
603607
db = rawdb.NewMemoryDatabase()
604608
backend = &testBackend{db: db}
605-
api = NewFilterAPI(backend, false, timeout)
609+
api = NewPublicFilterAPI(backend, false, timeout)
606610
done = make(chan struct{})
607611
)
608612

internal/ethapi/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type Backend interface {
6666
StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error)
6767
StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error)
6868
GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error)
69+
PendingBlockAndReceipts() (*types.Block, types.Receipts)
6970
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
7071
GetTd(blockHash common.Hash) *big.Int
7172
GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, XDCxState *tradingstate.TradingStateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error)

0 commit comments

Comments
 (0)