Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/simulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ The staking-enabled flag is only for local testing. Disabling staking serves two

1. Ignore stake weight on the P-Chain and count each connected peer as having a stake weight of 1
2. Automatically opts in to validate every Subnet
:::
:::

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this change do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my editor added that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix


Once you have AvalancheGo running locally, it will be running an HTTP Server on the default port `9650`. This means that the RPC Endpoint for the C-Chain will be http://127.0.0.1:9650/ext/bc/C/rpc and ws://127.0.0.1:9650/ext/bc/C/ws for WebSocket connections.

Now, we can run the simulator command to simulate some load on the local C-Chain for 30s:

```bash
./simulator --timeout=1m --concurrency=1 --max-fee-cap=300 --max-tip-cap=10 --txs-per-worker=50
./simulator --timeout=1m --workers=1 --max-fee-cap=300 --max-tip-cap=10 --txs-per-worker=50
```

## Command Line Flags
Expand Down
4 changes: 4 additions & 0 deletions cmd/simulator/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
KeyDirKey = "key-dir"
VersionKey = "version"
TimeoutKey = "timeout"
BatchSizeKey = "batch-size"
)

var (
Expand All @@ -42,6 +43,7 @@ type Config struct {
TxsPerWorker uint64 `json:"txs-per-worker"`
KeyDir string `json:"key-dir"`
Timeout time.Duration `json:"timeout"`
BatchSize uint64 `json:"batch-size"`
}

func BuildConfig(v *viper.Viper) (Config, error) {
Expand All @@ -53,6 +55,7 @@ func BuildConfig(v *viper.Viper) (Config, error) {
TxsPerWorker: v.GetUint64(TxsPerWorkerKey),
KeyDir: v.GetString(KeyDirKey),
Timeout: v.GetDuration(TimeoutKey),
BatchSize: v.GetUint64(BatchSizeKey),
}
if len(c.Endpoints) == 0 {
return c, ErrNoEndpoints
Expand Down Expand Up @@ -114,4 +117,5 @@ func addSimulatorFlags(fs *pflag.FlagSet) {
fs.String(KeyDirKey, ".simulator/keys", "Specify the directory to save private keys in (INSECURE: only use for testing)")
fs.Duration(TimeoutKey, 5*time.Minute, "Specify the timeout for the simulator to complete (0 indicates no timeout)")
fs.String(LogLevelKey, "info", "Specify the log level to use in the simulator")
fs.Uint64(BatchSizeKey, 100, "Specify the batchsize for the worker to issue and confirm txs")
}
14 changes: 8 additions & 6 deletions cmd/simulator/load/funder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/big"

"github.com/ava-labs/subnet-evm/cmd/simulator/key"
"github.com/ava-labs/subnet-evm/cmd/simulator/txs"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/params"
Expand Down Expand Up @@ -80,7 +81,7 @@ func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.K
signer := types.LatestSignerForChainID(chainID)

// Generate a sequence of transactions to distribute the required funds.
log.Info("Generating distribution transactions")
log.Info("Generating distribution transactions...")
i := 0
txGenerator := func(key *ecdsa.PrivateKey, nonce uint64) (*types.Transaction, error) {
tx, err := types.SignNewTx(key, signer, &types.DynamicFeeTx{
Expand All @@ -99,17 +100,18 @@ func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.K
i++
return tx, nil
}
txs, err := GenerateTxSequence(ctx, txGenerator, client, maxFundsKey.PrivKey, uint64(len(needFundsAddrs)))

numTxs := uint64(len(needFundsAddrs))
txSequence, err := txs.GenerateTxSequence(ctx, txGenerator, client, maxFundsKey.PrivKey, numTxs)
if err != nil {
return nil, fmt.Errorf("failed to generate fund distribution sequence from %s of length %d", maxFundsKey.Address, len(needFundsAddrs))
}
worker := NewSingleAddressTxWorker(ctx, client, maxFundsKey.Address)
txFunderAgent := txs.NewIssueNAgent[*types.Transaction](txSequence, worker, numTxs)

log.Info("Executing distribution transactions...")
worker := NewWorker(client, maxFundsKey.Address, txs)
if err := worker.Execute(ctx); err != nil {
if err := txFunderAgent.Execute(ctx); err != nil {
return nil, err
}

for _, addr := range needFundsAddrs {
balance, err := client.BalanceAt(ctx, addr, nil)
if err != nil {
Expand Down
58 changes: 36 additions & 22 deletions cmd/simulator/load/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,48 @@ import (

"github.com/ava-labs/subnet-evm/cmd/simulator/config"
"github.com/ava-labs/subnet-evm/cmd/simulator/key"
"github.com/ava-labs/subnet-evm/cmd/simulator/txs"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/params"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"
)

// CreateLoader creates a WorkerGroup from [config] to perform the specified simulation.
func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, error) {
// ExecuteLoader creates txSequences from [config] and has txAgents execute the specified simulation.
func ExecuteLoader(ctx context.Context, config config.Config) error {
if config.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
defer cancel()
}

// Construct the arguments for the load simulator
clients := make([]ethclient.Client, 0, len(config.Endpoints))
for i := 0; i < config.Workers; i++ {
clientURI := config.Endpoints[i%len(config.Endpoints)]
client, err := ethclient.Dial(clientURI)
if err != nil {
return nil, fmt.Errorf("failed to dial client at %s: %w", clientURI, err)
return fmt.Errorf("failed to dial client at %s: %w", clientURI, err)
}
clients = append(clients, client)
}

keys, err := key.LoadAll(ctx, config.KeyDir)
if err != nil {
return nil, err
return err
}
// Ensure there are at least [config.Workers] keys and save any newly generated ones.
if len(keys) < config.Workers {
for i := 0; len(keys) < config.Workers; i++ {
newKey, err := key.Generate()
if err != nil {
return nil, fmt.Errorf("failed to generate %d new key: %w", i, err)
return fmt.Errorf("failed to generate %d new key: %w", i, err)
}
if err := newKey.Save(config.KeyDir); err != nil {
return nil, fmt.Errorf("failed to save %d new key: %w", i, err)
return fmt.Errorf("failed to save %d new key: %w", i, err)
}
keys = append(keys, newKey)
}
Expand All @@ -57,8 +65,9 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
log.Info("Distributing funds", "numTxsPerWorker", config.TxsPerWorker, "minFunds", minFundsPerAddr)
keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr)
if err != nil {
return nil, err
return err
}
log.Info("Distributed funds successfully")

pks := make([]*ecdsa.PrivateKey, 0, len(keys))
senders := make([]common.Address, 0, len(keys))
Expand All @@ -73,10 +82,11 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
client := clients[0]
chainID, err := client.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch chainID: %w", err)
return fmt.Errorf("failed to fetch chainID: %w", err)
}
signer := types.LatestSignerForChainID(chainID)

log.Info("Creating transaction sequences...")
txGenerator := func(key *ecdsa.PrivateKey, nonce uint64) (*types.Transaction, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this txGenerator func can just be a series of options/params for GenerateTxSequences

addr := ethcrypto.PubkeyToAddress(key.PublicKey)
tx, err := types.SignNewTx(key, signer, &types.DynamicFeeTx{
Expand All @@ -94,26 +104,30 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
}
return tx, nil
}
txSequences, err := GenerateTxSequences(ctx, txGenerator, clients[0], pks, config.TxsPerWorker)
txSequences, err := txs.GenerateTxSequences(ctx, txGenerator, clients[0], pks, config.TxsPerWorker)
if err != nil {
return nil, err
return err
}

wg := NewWorkerGroup(clients[:config.Workers], senders[:config.Workers], txSequences[:config.Workers])
return wg, nil
}
log.Info("Constructing tx agents...", "numAgents", config.Workers)
agents := make([]txs.Agent[*types.Transaction], 0, config.Workers)
for i := 0; i < config.Workers; i++ {
agents = append(agents, txs.NewIssueNAgent[*types.Transaction](txSequences[i], NewSingleAddressTxWorker(ctx, clients[i], senders[i]), config.BatchSize))
}

// ExecuteLoader runs the load simulation specified by config.
func ExecuteLoader(ctx context.Context, config config.Config) error {
if config.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
defer cancel()
log.Info("Starting tx agents...")
eg := errgroup.Group{}
for _, agent := range agents {
agent := agent
eg.Go(func() error {
return agent.Execute(ctx)
})
}

loader, err := CreateLoader(ctx, config)
if err != nil {
log.Info("Waiting for tx agents...")
if err := eg.Wait(); err != nil {
return err
}
return loader.Execute(ctx)
log.Info("Tx agents completed successfully.")
return nil
}
112 changes: 42 additions & 70 deletions cmd/simulator/load/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,102 +10,74 @@ import (

"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

type Worker struct {
client ethclient.Client
address common.Address
txs []*types.Transaction
}
type singleAddressTxWorker struct {
client ethclient.Client

// NewWorker creates a new worker that will issue the sequence of transactions from the given address
//
// Assumes that all transactions are from the same address, ordered by nonce, and this worker has exclusive access
// to issuance of transactions from the underlying private key.
func NewWorker(client ethclient.Client, address common.Address, txs []*types.Transaction) *Worker {
return &Worker{
client: client,
address: address,
txs: txs,
}
}
acceptedNonce uint64
address common.Address

func (w *Worker) ExecuteTxsFromAddress(ctx context.Context) error {
log.Info("Executing txs", "numTxs", len(w.txs))
for i, tx := range w.txs {
start := time.Now()
err := w.client.SendTransaction(ctx, tx)
if err != nil {
return fmt.Errorf("failed to issue tx %d: %w", i, err)
}
log.Info("execute tx", "tx", tx.Hash(), "nonce", tx.Nonce(), "duration", time.Since(start))
}
return nil
sub interfaces.Subscription
newHeads chan *types.Header
}

// AwaitTxs awaits for the nonce of the last transaction issued by the worker to be confirmed or
// rejected by the network.
//
// Assumes that a non-zero number of transactions were already generated and that they were issued
// by this worker.
func (w *Worker) AwaitTxs(ctx context.Context) error {
nonce := w.txs[len(w.txs)-1].Nonce()

// NewSingleAddressTxWorker creates and returns a singleAddressTxWorker
func NewSingleAddressTxWorker(ctx context.Context, client ethclient.Client, address common.Address) *singleAddressTxWorker {
newHeads := make(chan *types.Header)
defer close(newHeads)
tw := &singleAddressTxWorker{
client: client,
address: address,
newHeads: newHeads,
}

sub, err := w.client.SubscribeNewHead(ctx, newHeads)
sub, err := client.SubscribeNewHead(ctx, newHeads)
if err != nil {
log.Debug("failed to subscribe new heads, falling back to polling", "err", err)
} else {
defer sub.Unsubscribe()
tw.sub = sub
}

return tw
}

func (tw *singleAddressTxWorker) IssueTx(ctx context.Context, tx *types.Transaction) error {
return tw.client.SendTransaction(ctx, tx)
}

func (tw *singleAddressTxWorker) ConfirmTx(ctx context.Context, tx *types.Transaction) error {
txNonce := tx.Nonce()

for {
// If the is less than what has already been accepted, the transaction is confirmed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If the is less than what has already been accepted, the transaction is confirmed
// If the nonce is less than what has already been accepted, the transaction is confirmed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

if txNonce < tw.acceptedNonce {
return nil
}

select {
case <-newHeads:
case <-tw.newHeads:
case <-time.After(time.Second):
case <-ctx.Done():
return fmt.Errorf("failed to await nonce: %w", ctx.Err())
return fmt.Errorf("failed to await tx %s nonce %d: %w", tx.Hash(), txNonce, ctx.Err())
}

currentNonce, err := w.client.NonceAt(ctx, w.address, nil)
// Update the worker's accepted nonce, so we can check on the next iteration
// if the transaction has been accepted.
acceptedNonce, err := tw.client.NonceAt(ctx, tw.address, nil)
if err != nil {
log.Warn("failed to get nonce", "err", err)
}
if currentNonce >= nonce {
return nil
} else {
log.Info("fetched nonce", "awaiting", nonce, "currentNonce", currentNonce)
return fmt.Errorf("failed to await tx %s nonce %d: %w", tx.Hash(), txNonce, err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we still keep the awaiting line. It's helpful to the user.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's excessive to do so for every nonce at the info level. Could we make it debug?

tw.acceptedNonce = acceptedNonce
}
}

// ConfirmAllTransactions iterates over every transaction of this worker and confirms it
// via eth_getTransactionByHash
func (w *Worker) ConfirmAllTransactions(ctx context.Context) error {
for i, tx := range w.txs {
_, isPending, err := w.client.TransactionByHash(ctx, tx.Hash())
if err != nil {
return fmt.Errorf("failed to confirm tx at index %d: %s", i, tx.Hash())
}
if isPending {
return fmt.Errorf("failed to confirm tx at index %d: pending", i)
}
func (tw *singleAddressTxWorker) Close(ctx context.Context) error {
if tw.sub != nil {
tw.sub.Unsubscribe()
}
log.Info("Confirmed all transactions")
close(tw.newHeads)
return nil
}

// Execute issues and confirms all transactions for the worker.
func (w *Worker) Execute(ctx context.Context) error {
if err := w.ExecuteTxsFromAddress(ctx); err != nil {
return err
}
if err := w.AwaitTxs(ctx); err != nil {
return err
}
return w.ConfirmAllTransactions(ctx)
}
Loading