Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 6f2d560

Browse files
nonsensegbalint
authored andcommitted
swarm-network-rewrite conflicts with master (#589)
* core/types: avoid duplicating transactions on changing signer (#16435) * core/state: cache missing storage entries (#16584) * cmd/utils: point users to --syncmode under DEPRECATED (#16572) Indicate that --light and --fast options are replaced by --syncmode * trie: remove unused `buf` parameter (#16583) * core, eth: fix tracer dirty finalization * travis.yml: remove obsolete brew-cask install * whisper: Golint fixes in whisper packages (#16637) * vendor: fix leveldb crash when bigger than 1 TiB * core: ensure local transactions aren't discarded as underpriced This fixes an issue where local transactions are discarded as underpriced when the pool and queue are full. * evm/main: use blocknumber from genesis * accounts: golint updates for this or self warning (#16627) * tests: golint fixes for tests directory (#16640) * trie: golint iterator fixes (#16639) * internal: golint updates for this or self warning (#16634) * core: golint updates for this or self warning (#16633) * build: Add ldflags -s -w when building aar Smaller size on mobile is always good. Might also solve our maven central upload problem * cmd/clef: documentation about setup (#16568) clef: documentation about setup * params: release geth 1.8.7 * VERSION, params: begin v1.8.8 release cycle * log: changed if-else blocks to conform with golint (#16661) * p2p: changed if-else blocks to conform with golint (#16660) * les: changed if-else blocks to conform with golint (#16658) * accounts: changed if-else blocks to conform with golint (#16654) * rpc: golint error with context as last parameter (#16657) * rpc/*: golint error with context as last parameter * Update json.go * metrics: golint updates for this or self warning (#16635) * metrics/*: golint updates for this or self warning * metrics/*: golint updates for this or self warning, updated pr from feedback * consensus/ethash: fixed typo (#16665) * event: golint updates for this or self warning (#16631) * event/*: golint updates for this or self warning * event/*: golint updates for this or self warning, pr updated per feedback * eth: golint updates for this or self warning (#16632) * eth/*:golint updates for this or self warning * eth/*: golint updates for this or self warning, pr updated per feedback * signer: fix golint errors (#16653) * signer/*: golint fixes Specifically naming and comment formatting for documentation * signer/*: fixed naming error crashing build * signer/*: corrected error * signer/core: fix tiny error whitespace * signer/rules: fix test refactor * whisper/mailserver: pass init error to the caller (#16671) * whisper/mailserver: pass init error to the caller * whisper/mailserver: add returns to fmt.Errorf * whisper/mailserver: check err in mailserver init test * common: changed if-else blocks to conform with golint (#16656) * mobile: add GetStatus Method for Receipt (#16598) * core/rawdb: separate raw database access to own package (#16666) * rlp: fix some golint warnings (#16659) * p2p: fix some golint warnings (#16577) * eth/filters: derive FilterCriteria from ethereum.FilterQuery (#16629) * p2p/simulations/adapters: fix websocket log line parsing in exec adapter (#16667) * build: specify the key to use when invoking gpg:sign-and-deploy-file (#16696) * crypto: fix golint warnings (#16710) * p2p: don't discard reason set by Disconnect (#16559) Peer.run was discarding the reason for disconnection sent to the disc channel by Disconnect. * cmd: various golint fixes (#16700) * cmd: various golint fixes * cmd: update to pr change request * cmd: update to pr change request * eth: golint fixes to variable names (#16711) * eth/filter: check nil pointer when unsubscribe (#16682) * eth/filter: check nil pointer when unsubscribe * eth/filters, accounts, rpc: abort system if subscribe failed * eth/filter: add crit log before exit * eth/filter, event: minor fixes * whisper/shhclient: update call to shh_generateSymKeyFromPassword to pass a string (#16668) * all: get rid of error when creating memory database (#16716) * all: get rid of error when create mdb * core: clean up variables definition * all: inline mdb definition * event: document select case slice use and add edge case test (#16680) Feed keeps active subscription channels in a slice called 'f.sendCases'. The Send method tracks the active cases in a local variable 'cases' whose value is f.sendCases initially. 'cases' shrinks to a shorter prefix of f.sendCases every time a send succeeds, moving the successful case out of range of the active case list. This can be confusing because the two slices share a backing array. Add more comments to document what is going on. Also add a test for removing a case that is in 'f.sentCases' but not 'cases'. * travis: use Android NDK 16b (#16562) * bmt: golint updates for this or self warning (#16628) * bmt/*: golint updates for this or self warning * Update bmt.go * light: new CHT for mainnet and ropsten (#16736) * params: release go-ethereum v1.8.8 * VERSION, params: start 1.8.9 release cycle * accounts/abi: allow abi: tags when unpacking structs Go code users can now tag event struct members with `abi:` to specify in what fields the event will be de-serialized. See PR #16648 for details. * travis: try to upgrade android builder to trusty * p2p/enr: updates for discovery v4 compatibility (#16679) This applies spec changes from ethereum/EIPs#1049 and adds support for pluggable identity schemes. Some care has been taken to make the "v4" scheme standalone. It uses public APIs only and could be moved out of package enr at any time. A couple of minor changes were needed to make identity schemes work: - The sequence number is now updated in Set instead of when signing. - Record is now copy-safe, i.e. calling Set on a shallow copy doesn't modify the record it was copied from. * all: collate new transaction events together * core, eth: minor txpool event cleanups * travis, appveyor: bump Go release to 1.10.2 * core, consensus: fix some typos in comment code and output log * eth: propagate blocks and transactions async * trie: fixes to comply with golint (#16771) * log: fixes for golint warnings (#16775) * node: all golint warnings fixed (#16773) * node: all golint warnings fixed * node: rm per peter * node: rm per peter * vendor, ethdb: print warning log if leveldb is performing compaction (#16766) * vendor: update leveldb package * ethdb: print warning log if db is performing compaction * ethdb: update annotation and log * core/types: convert status type from uint to uint64 (#16784) * trie: support proof generation from the iterator * core/vm: fix typo in instructions.go (#16788) * core: use a wrapped map to remove contention in `TxPool.Get`. (#16670) * core: use a wrapped `map` and `sync.RWMutex` for `TxPool.all` to remove contention in `TxPool.Get`. * core: Remove redundant `txLookup.Find` and improve comments on txLookup methods.
1 parent 2aec12e commit 6f2d560

File tree

7 files changed

+283
-119
lines changed

7 files changed

+283
-119
lines changed

core/tx_list.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -397,13 +397,13 @@ func (h *priceHeap) Pop() interface{} {
397397
// txPricedList is a price-sorted heap to allow operating on transactions pool
398398
// contents in a price-incrementing way.
399399
type txPricedList struct {
400-
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
401-
items *priceHeap // Heap of prices of all the stored transactions
402-
stales int // Number of stale price points to (re-heap trigger)
400+
all *txLookup // Pointer to the map of all transactions
401+
items *priceHeap // Heap of prices of all the stored transactions
402+
stales int // Number of stale price points to (re-heap trigger)
403403
}
404404

405405
// newTxPricedList creates a new price-sorted transaction heap.
406-
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
406+
func newTxPricedList(all *txLookup) *txPricedList {
407407
return &txPricedList{
408408
all: all,
409409
items: new(priceHeap),
@@ -425,12 +425,13 @@ func (l *txPricedList) Removed() {
425425
return
426426
}
427427
// Seems we've reached a critical number of stale transactions, reheap
428-
reheap := make(priceHeap, 0, len(*l.all))
428+
reheap := make(priceHeap, 0, l.all.Count())
429429

430430
l.stales, l.items = 0, &reheap
431-
for _, tx := range *l.all {
431+
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
432432
*l.items = append(*l.items, tx)
433-
}
433+
return true
434+
})
434435
heap.Init(l.items)
435436
}
436437

@@ -443,7 +444,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
443444
for len(*l.items) > 0 {
444445
// Discard stale transactions if found during cleanup
445446
tx := heap.Pop(l.items).(*types.Transaction)
446-
if _, ok := (*l.all)[tx.Hash()]; !ok {
447+
if l.all.Get(tx.Hash()) == nil {
447448
l.stales--
448449
continue
449450
}
@@ -475,7 +476,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
475476
// Discard stale price points if found at the heap start
476477
for len(*l.items) > 0 {
477478
head := []*types.Transaction(*l.items)[0]
478-
if _, ok := (*l.all)[head.Hash()]; !ok {
479+
if l.all.Get(head.Hash()) == nil {
479480
l.stales--
480481
heap.Pop(l.items)
481482
continue
@@ -500,7 +501,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
500501
for len(*l.items) > 0 && count > 0 {
501502
// Discard stale transactions if found during cleanup
502503
tx := heap.Pop(l.items).(*types.Transaction)
503-
if _, ok := (*l.all)[tx.Hash()]; !ok {
504+
if l.all.Get(tx.Hash()) == nil {
504505
l.stales--
505506
continue
506507
}

core/tx_pool.go

Lines changed: 96 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ type TxPool struct {
200200
locals *accountSet // Set of local transaction to exempt from eviction rules
201201
journal *txJournal // Journal of local transaction to back up to disk
202202

203-
pending map[common.Address]*txList // All currently processable transactions
204-
queue map[common.Address]*txList // Queued but non-processable transactions
205-
beats map[common.Address]time.Time // Last heartbeat from each known account
206-
all map[common.Hash]*types.Transaction // All transactions to allow lookups
207-
priced *txPricedList // All transactions sorted by price
203+
pending map[common.Address]*txList // All currently processable transactions
204+
queue map[common.Address]*txList // Queued but non-processable transactions
205+
beats map[common.Address]time.Time // Last heartbeat from each known account
206+
all *txLookup // All transactions to allow lookups
207+
priced *txPricedList // All transactions sorted by price
208208

209209
wg sync.WaitGroup // for shutdown sync
210210

@@ -226,12 +226,12 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
226226
pending: make(map[common.Address]*txList),
227227
queue: make(map[common.Address]*txList),
228228
beats: make(map[common.Address]time.Time),
229-
all: make(map[common.Hash]*types.Transaction),
229+
all: newTxLookup(),
230230
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
231231
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
232232
}
233233
pool.locals = newAccountSet(pool.signer)
234-
pool.priced = newTxPricedList(&pool.all)
234+
pool.priced = newTxPricedList(pool.all)
235235
pool.reset(nil, chain.CurrentBlock().Header())
236236

237237
// If local transactions and journaling is enabled, load from disk
@@ -605,7 +605,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
605605
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
606606
// If the transaction is already known, discard it
607607
hash := tx.Hash()
608-
if pool.all[hash] != nil {
608+
if pool.all.Get(hash) != nil {
609609
log.Trace("Discarding already known transaction", "hash", hash)
610610
return false, fmt.Errorf("known transaction: %x", hash)
611611
}
@@ -616,15 +616,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
616616
return false, err
617617
}
618618
// If the transaction pool is full, discard underpriced transactions
619-
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
619+
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
620620
// If the new transaction is underpriced, don't accept it
621621
if !local && pool.priced.Underpriced(tx, pool.locals) {
622622
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
623623
underpricedTxCounter.Inc(1)
624624
return false, ErrUnderpriced
625625
}
626626
// New transaction is better than our worse ones, make room for it
627-
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
627+
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
628628
for _, tx := range drop {
629629
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
630630
underpricedTxCounter.Inc(1)
@@ -642,11 +642,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
642642
}
643643
// New transaction is better, replace old one
644644
if old != nil {
645-
delete(pool.all, old.Hash())
645+
pool.all.Remove(old.Hash())
646646
pool.priced.Removed()
647647
pendingReplaceCounter.Inc(1)
648648
}
649-
pool.all[tx.Hash()] = tx
649+
pool.all.Add(tx)
650650
pool.priced.Put(tx)
651651
pool.journalTx(from, tx)
652652

@@ -689,12 +689,12 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
689689
}
690690
// Discard any previous transaction and mark this
691691
if old != nil {
692-
delete(pool.all, old.Hash())
692+
pool.all.Remove(old.Hash())
693693
pool.priced.Removed()
694694
queuedReplaceCounter.Inc(1)
695695
}
696-
if pool.all[hash] == nil {
697-
pool.all[hash] = tx
696+
if pool.all.Get(hash) == nil {
697+
pool.all.Add(tx)
698698
pool.priced.Put(tx)
699699
}
700700
return old != nil, nil
@@ -726,22 +726,22 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
726726
inserted, old := list.Add(tx, pool.config.PriceBump)
727727
if !inserted {
728728
// An older transaction was better, discard this
729-
delete(pool.all, hash)
729+
pool.all.Remove(hash)
730730
pool.priced.Removed()
731731

732732
pendingDiscardCounter.Inc(1)
733733
return false
734734
}
735735
// Otherwise discard any previous transaction and mark this
736736
if old != nil {
737-
delete(pool.all, old.Hash())
737+
pool.all.Remove(old.Hash())
738738
pool.priced.Removed()
739739

740740
pendingReplaceCounter.Inc(1)
741741
}
742742
// Failsafe to work around direct pending inserts (tests)
743-
if pool.all[hash] == nil {
744-
pool.all[hash] = tx
743+
if pool.all.Get(hash) == nil {
744+
pool.all.Add(tx)
745745
pool.priced.Put(tx)
746746
}
747747
// Set the potentially new pending nonce and notify any subsystems of the new tx
@@ -840,7 +840,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
840840

841841
status := make([]TxStatus, len(hashes))
842842
for i, hash := range hashes {
843-
if tx := pool.all[hash]; tx != nil {
843+
if tx := pool.all.Get(hash); tx != nil {
844844
from, _ := types.Sender(pool.signer, tx) // already validated
845845
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
846846
status[i] = TxStatusPending
@@ -855,24 +855,21 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
855855
// Get returns a transaction if it is contained in the pool
856856
// and nil otherwise.
857857
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
858-
pool.mu.RLock()
859-
defer pool.mu.RUnlock()
860-
861-
return pool.all[hash]
858+
return pool.all.Get(hash)
862859
}
863860

864861
// removeTx removes a single transaction from the queue, moving all subsequent
865862
// transactions back to the future queue.
866863
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
867864
// Fetch the transaction we wish to delete
868-
tx, ok := pool.all[hash]
869-
if !ok {
865+
tx := pool.all.Get(hash)
866+
if tx == nil {
870867
return
871868
}
872869
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
873870

874871
// Remove it from the list of known transactions
875-
delete(pool.all, hash)
872+
pool.all.Remove(hash)
876873
if outofbound {
877874
pool.priced.Removed()
878875
}
@@ -928,15 +925,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
928925
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
929926
hash := tx.Hash()
930927
log.Trace("Removed old queued transaction", "hash", hash)
931-
delete(pool.all, hash)
928+
pool.all.Remove(hash)
932929
pool.priced.Removed()
933930
}
934931
// Drop all transactions that are too costly (low balance or out of gas)
935932
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
936933
for _, tx := range drops {
937934
hash := tx.Hash()
938935
log.Trace("Removed unpayable queued transaction", "hash", hash)
939-
delete(pool.all, hash)
936+
pool.all.Remove(hash)
940937
pool.priced.Removed()
941938
queuedNofundsCounter.Inc(1)
942939
}
@@ -952,7 +949,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
952949
if !pool.locals.contains(addr) {
953950
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
954951
hash := tx.Hash()
955-
delete(pool.all, hash)
952+
pool.all.Remove(hash)
956953
pool.priced.Removed()
957954
queuedRateLimitCounter.Inc(1)
958955
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
@@ -1001,7 +998,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
1001998
for _, tx := range list.Cap(list.Len() - 1) {
1002999
// Drop the transaction from the global pools too
10031000
hash := tx.Hash()
1004-
delete(pool.all, hash)
1001+
pool.all.Remove(hash)
10051002
pool.priced.Removed()
10061003

10071004
// Update the account nonce to the dropped transaction
@@ -1023,7 +1020,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
10231020
for _, tx := range list.Cap(list.Len() - 1) {
10241021
// Drop the transaction from the global pools too
10251022
hash := tx.Hash()
1026-
delete(pool.all, hash)
1023+
pool.all.Remove(hash)
10271024
pool.priced.Removed()
10281025

10291026
// Update the account nonce to the dropped transaction
@@ -1092,15 +1089,15 @@ func (pool *TxPool) demoteUnexecutables() {
10921089
for _, tx := range list.Forward(nonce) {
10931090
hash := tx.Hash()
10941091
log.Trace("Removed old pending transaction", "hash", hash)
1095-
delete(pool.all, hash)
1092+
pool.all.Remove(hash)
10961093
pool.priced.Removed()
10971094
}
10981095
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
10991096
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
11001097
for _, tx := range drops {
11011098
hash := tx.Hash()
11021099
log.Trace("Removed unpayable pending transaction", "hash", hash)
1103-
delete(pool.all, hash)
1100+
pool.all.Remove(hash)
11041101
pool.priced.Removed()
11051102
pendingNofundsCounter.Inc(1)
11061103
}
@@ -1172,3 +1169,68 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
11721169
func (as *accountSet) add(addr common.Address) {
11731170
as.accounts[addr] = struct{}{}
11741171
}
1172+
1173+
// txLookup is used internally by TxPool to track transactions while allowing lookup without
1174+
// mutex contention.
1175+
//
1176+
// Note, although this type is properly protected against concurrent access, it
1177+
// is **not** a type that should ever be mutated or even exposed outside of the
1178+
// transaction pool, since its internal state is tightly coupled with the pools
1179+
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
1180+
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
1181+
// TxPool.mu mutex.
1182+
type txLookup struct {
1183+
all map[common.Hash]*types.Transaction
1184+
lock sync.RWMutex
1185+
}
1186+
1187+
// newTxLookup returns a new txLookup structure.
1188+
func newTxLookup() *txLookup {
1189+
return &txLookup{
1190+
all: make(map[common.Hash]*types.Transaction),
1191+
}
1192+
}
1193+
1194+
// Range calls f on each key and value present in the map.
1195+
func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
1196+
t.lock.RLock()
1197+
defer t.lock.RUnlock()
1198+
1199+
for key, value := range t.all {
1200+
if !f(key, value) {
1201+
break
1202+
}
1203+
}
1204+
}
1205+
1206+
// Get returns a transaction if it exists in the lookup, or nil if not found.
1207+
func (t *txLookup) Get(hash common.Hash) *types.Transaction {
1208+
t.lock.RLock()
1209+
defer t.lock.RUnlock()
1210+
1211+
return t.all[hash]
1212+
}
1213+
1214+
// Count returns the current number of items in the lookup.
1215+
func (t *txLookup) Count() int {
1216+
t.lock.RLock()
1217+
defer t.lock.RUnlock()
1218+
1219+
return len(t.all)
1220+
}
1221+
1222+
// Add adds a transaction to the lookup.
1223+
func (t *txLookup) Add(tx *types.Transaction) {
1224+
t.lock.Lock()
1225+
defer t.lock.Unlock()
1226+
1227+
t.all[tx.Hash()] = tx
1228+
}
1229+
1230+
// Remove removes a transaction from the lookup.
1231+
func (t *txLookup) Remove(hash common.Hash) {
1232+
t.lock.Lock()
1233+
defer t.lock.Unlock()
1234+
1235+
delete(t.all, hash)
1236+
}

0 commit comments

Comments
 (0)