Skip to content

Commit d90d1db

Browse files
authored
eth/filters: remove use of event.TypeMux for pending logs (#20312)
1 parent b8bc9b3 commit d90d1db

File tree

13 files changed

+230
-237
lines changed

13 files changed

+230
-237
lines changed

accounts/abi/bind/backends/simulated.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
7676
database: database,
7777
blockchain: blockchain,
7878
config: genesis.Config,
79-
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
79+
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
8080
}
8181
backend.rollback()
8282
return backend
@@ -502,22 +502,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
502502
}
503503

504504
func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
505-
return event.NewSubscription(func(quit <-chan struct{}) error {
506-
<-quit
507-
return nil
508-
})
505+
return nullSubscription()
509506
}
507+
510508
func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
511509
return fb.bc.SubscribeChainEvent(ch)
512510
}
511+
513512
func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
514513
return fb.bc.SubscribeRemovedLogsEvent(ch)
515514
}
515+
516516
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
517517
return fb.bc.SubscribeLogsEvent(ch)
518518
}
519519

520+
func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
521+
return nullSubscription()
522+
}
523+
520524
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
525+
521526
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
522527
panic("not supported")
523528
}
529+
530+
func nullSubscription() event.Subscription {
531+
return event.NewSubscription(func(quit <-chan struct{}) error {
532+
<-quit
533+
return nil
534+
})
535+
}

core/events.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ import (
2424
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
2525
type NewTxsEvent struct{ Txs []*types.Transaction }
2626

27-
// PendingLogsEvent is posted pre mining and notifies of pending logs.
28-
type PendingLogsEvent struct {
29-
Logs []*types.Log
30-
}
31-
3227
// NewMinedBlockEvent is posted when a block has been imported.
3328
type NewMinedBlockEvent struct{ Block *types.Block }
3429

eth/api_backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
202202
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
203203
}
204204

205+
func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
206+
return b.eth.miner.SubscribePendingLogs(ch)
207+
}
208+
205209
func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
206210
return b.eth.BlockChain().SubscribeChainEvent(ch)
207211
}

eth/filters/api.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,8 @@ type PublicFilterAPI struct {
6565
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
6666
api := &PublicFilterAPI{
6767
backend: backend,
68-
mux: backend.EventMux(),
6968
chainDb: backend.ChainDb(),
70-
events: NewEventSystem(backend.EventMux(), backend, lightMode),
69+
events: NewEventSystem(backend, lightMode),
7170
filters: make(map[rpc.ID]*filter),
7271
}
7372
go api.timeoutLoop()
@@ -428,7 +427,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
428427
hashes := f.hashes
429428
f.hashes = nil
430429
return returnHashes(hashes), nil
431-
case LogsSubscription:
430+
case LogsSubscription, MinedAndPendingLogsSubscription:
432431
logs := f.logs
433432
f.logs = nil
434433
return returnLogs(logs), nil

eth/filters/bench_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/ethereum/go-ethereum/core/rawdb"
2929
"github.com/ethereum/go-ethereum/core/types"
3030
"github.com/ethereum/go-ethereum/ethdb"
31-
"github.com/ethereum/go-ethereum/event"
3231
"github.com/ethereum/go-ethereum/node"
3332
)
3433

@@ -122,14 +121,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
122121

123122
b.Log("Running filter benchmarks...")
124123
start = time.Now()
125-
mux := new(event.TypeMux)
126124
var backend *testBackend
127125

128126
for i := 0; i < benchFilterCnt; i++ {
129127
if i%20 == 0 {
130128
db.Close()
131129
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
132-
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
130+
backend = &testBackend{db: db, sections: cnt}
133131
}
134132
var addr common.Address
135133
addr[0] = byte(i)
@@ -173,8 +171,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
173171

174172
b.Log("Running filter benchmarks...")
175173
start := time.Now()
176-
mux := new(event.TypeMux)
177-
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
174+
backend := &testBackend{db: db}
178175
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
179176
filter.Logs(context.Background())
180177
d := time.Since(start)

eth/filters/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232

3333
type Backend interface {
3434
ChainDb() ethdb.Database
35-
EventMux() *event.TypeMux
3635
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
3736
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
3837
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
@@ -42,6 +41,7 @@ type Backend interface {
4241
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
4342
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
4443
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
44+
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
4545

4646
BloomStatus() (uint64, uint64)
4747
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

eth/filters/filter_system.go

Lines changed: 79 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package filters
2020

2121
import (
2222
"context"
23-
"errors"
2423
"fmt"
2524
"sync"
2625
"time"
@@ -58,7 +57,6 @@ const (
5857
)
5958

6059
const (
61-
6260
// txChanSize is the size of channel listening to NewTxsEvent.
6361
// The number is referenced from the size of tx pool.
6462
txChanSize = 4096
@@ -70,10 +68,6 @@ const (
7068
chainEvChanSize = 10
7169
)
7270

73-
var (
74-
ErrInvalidSubscriptionID = errors.New("invalid id")
75-
)
76-
7771
type subscription struct {
7872
id rpc.ID
7973
typ Type
@@ -89,25 +83,25 @@ type subscription struct {
8983
// EventSystem creates subscriptions, processes events and broadcasts them to the
9084
// subscription which match the subscription criteria.
9185
type EventSystem struct {
92-
mux *event.TypeMux
9386
backend Backend
9487
lightMode bool
9588
lastHead *types.Header
9689

9790
// Subscriptions
98-
txsSub event.Subscription // Subscription for new transaction event
99-
logsSub event.Subscription // Subscription for new log event
100-
rmLogsSub event.Subscription // Subscription for removed log event
101-
chainSub event.Subscription // Subscription for new chain event
102-
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
91+
txsSub event.Subscription // Subscription for new transaction event
92+
logsSub event.Subscription // Subscription for new log event
93+
rmLogsSub event.Subscription // Subscription for removed log event
94+
pendingLogsSub event.Subscription // Subscription for pending log event
95+
chainSub event.Subscription // Subscription for new chain event
10396

10497
// Channels
105-
install chan *subscription // install filter for event notification
106-
uninstall chan *subscription // remove filter for event notification
107-
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
108-
logsCh chan []*types.Log // Channel to receive new log event
109-
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
110-
chainCh chan core.ChainEvent // Channel to receive new chain event
98+
install chan *subscription // install filter for event notification
99+
uninstall chan *subscription // remove filter for event notification
100+
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
101+
logsCh chan []*types.Log // Channel to receive new log event
102+
pendingLogsCh chan []*types.Log // Channel to receive new log event
103+
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
104+
chainCh chan core.ChainEvent // Channel to receive new chain event
111105
}
112106

113107
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -116,30 +110,28 @@ type EventSystem struct {
116110
//
117111
// The returned manager has a loop that needs to be stopped with the Stop function
118112
// or by stopping the given mux.
119-
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
113+
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
120114
m := &EventSystem{
121-
mux: mux,
122-
backend: backend,
123-
lightMode: lightMode,
124-
install: make(chan *subscription),
125-
uninstall: make(chan *subscription),
126-
txsCh: make(chan core.NewTxsEvent, txChanSize),
127-
logsCh: make(chan []*types.Log, logsChanSize),
128-
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
129-
chainCh: make(chan core.ChainEvent, chainEvChanSize),
115+
backend: backend,
116+
lightMode: lightMode,
117+
install: make(chan *subscription),
118+
uninstall: make(chan *subscription),
119+
txsCh: make(chan core.NewTxsEvent, txChanSize),
120+
logsCh: make(chan []*types.Log, logsChanSize),
121+
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
122+
pendingLogsCh: make(chan []*types.Log, logsChanSize),
123+
chainCh: make(chan core.ChainEvent, chainEvChanSize),
130124
}
131125

132126
// Subscribe events
133127
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
134128
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
135129
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
136130
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
137-
// TODO(rjl493456442): use feed to subscribe pending log event
138-
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
131+
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
139132

140133
// Make sure none of the subscriptions are empty
141-
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
142-
m.pendingLogSub.Closed() {
134+
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
143135
log.Crit("Subscribe for event system failed")
144136
}
145137

@@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
316308

317309
type filterIndex map[Type]map[rpc.ID]*subscription
318310

319-
// broadcast event to filters that match criteria.
320-
func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
321-
if ev == nil {
311+
func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
312+
if len(ev) == 0 {
322313
return
323314
}
315+
for _, f := range filters[LogsSubscription] {
316+
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
317+
if len(matchedLogs) > 0 {
318+
f.logs <- matchedLogs
319+
}
320+
}
321+
}
324322

325-
switch e := ev.(type) {
326-
case []*types.Log:
327-
if len(e) > 0 {
328-
for _, f := range filters[LogsSubscription] {
329-
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
330-
f.logs <- matchedLogs
331-
}
332-
}
323+
func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
324+
if len(ev) == 0 {
325+
return
326+
}
327+
for _, f := range filters[PendingLogsSubscription] {
328+
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
329+
if len(matchedLogs) > 0 {
330+
f.logs <- matchedLogs
333331
}
334-
case core.RemovedLogsEvent:
335-
for _, f := range filters[LogsSubscription] {
336-
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
337-
f.logs <- matchedLogs
338-
}
332+
}
333+
}
334+
335+
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
336+
for _, f := range filters[LogsSubscription] {
337+
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
338+
if len(matchedLogs) > 0 {
339+
f.logs <- matchedLogs
339340
}
340-
case *event.TypeMuxEvent:
341-
if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
342-
for _, f := range filters[PendingLogsSubscription] {
343-
if e.Time.After(f.created) {
344-
if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
345-
f.logs <- matchedLogs
346-
}
341+
}
342+
}
343+
344+
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
345+
hashes := make([]common.Hash, 0, len(ev.Txs))
346+
for _, tx := range ev.Txs {
347+
hashes = append(hashes, tx.Hash())
348+
}
349+
for _, f := range filters[PendingTransactionsSubscription] {
350+
f.hashes <- hashes
351+
}
352+
}
353+
354+
func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
355+
for _, f := range filters[BlocksSubscription] {
356+
f.headers <- ev.Block.Header()
357+
}
358+
if es.lightMode && len(filters[LogsSubscription]) > 0 {
359+
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
360+
for _, f := range filters[LogsSubscription] {
361+
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
362+
f.logs <- matchedLogs
347363
}
348364
}
349-
}
350-
case core.NewTxsEvent:
351-
hashes := make([]common.Hash, 0, len(e.Txs))
352-
for _, tx := range e.Txs {
353-
hashes = append(hashes, tx.Hash())
354-
}
355-
for _, f := range filters[PendingTransactionsSubscription] {
356-
f.hashes <- hashes
357-
}
358-
case core.ChainEvent:
359-
for _, f := range filters[BlocksSubscription] {
360-
f.headers <- e.Block.Header()
361-
}
362-
if es.lightMode && len(filters[LogsSubscription]) > 0 {
363-
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
364-
for _, f := range filters[LogsSubscription] {
365-
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
366-
f.logs <- matchedLogs
367-
}
368-
}
369-
})
370-
}
365+
})
371366
}
372367
}
373368

@@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
448443
func (es *EventSystem) eventLoop() {
449444
// Ensure all subscriptions get cleaned up
450445
defer func() {
451-
es.pendingLogSub.Unsubscribe()
452446
es.txsSub.Unsubscribe()
453447
es.logsSub.Unsubscribe()
454448
es.rmLogsSub.Unsubscribe()
449+
es.pendingLogsSub.Unsubscribe()
455450
es.chainSub.Unsubscribe()
456451
}()
457452

@@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() {
462457

463458
for {
464459
select {
465-
// Handle subscribed events
466460
case ev := <-es.txsCh:
467-
es.broadcast(index, ev)
461+
es.handleTxsEvent(index, ev)
468462
case ev := <-es.logsCh:
469-
es.broadcast(index, ev)
463+
es.handleLogs(index, ev)
470464
case ev := <-es.rmLogsCh:
471-
es.broadcast(index, ev)
465+
es.handleRemovedLogs(index, ev)
466+
case ev := <-es.pendingLogsCh:
467+
es.handlePendingLogs(index, ev)
472468
case ev := <-es.chainCh:
473-
es.broadcast(index, ev)
474-
case ev, active := <-es.pendingLogSub.Chan():
475-
if !active { // system stopped
476-
return
477-
}
478-
es.broadcast(index, ev)
469+
es.handleChainEvent(index, ev)
479470

480471
case f := <-es.install:
481472
if f.typ == MinedAndPendingLogsSubscription {

0 commit comments

Comments
 (0)