diff --git a/api/client/client.go b/api/client/client.go index 25b2a056f7..97e700af42 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -27,6 +27,15 @@ type SubmitConfig struct { DefaultKeyName string Network p2p.Network CoreGRPCConfig CoreGRPCConfig + // TxWorkerAccounts is used for queued submission. It defines how many accounts the + // TxClient uses for PayForBlob submissions. + // - Value of 0 submits transactions immediately (without a submission queue). + // - Value of 1 uses synchronous submission (submission queue with default + // signer as author of transactions). + // - Value of > 1 uses parallel submission (submission queue with several accounts + // submitting blobs). Parallel submission is not guaranteed to include blobs + // in the same order as they were submitted. + TxWorkerAccounts int } func (cfg Config) Validate() error { @@ -40,6 +49,11 @@ func (cfg SubmitConfig) Validate() error { if cfg.DefaultKeyName == "" { return errors.New("default key name should not be empty") } + + if cfg.TxWorkerAccounts < 0 { + return fmt.Errorf("worker accounts must be non-negative") + } + return cfg.CoreGRPCConfig.Validate() } @@ -90,6 +104,11 @@ func (c *Client) initTxClient( conn *grpc.ClientConn, kr keyring.Keyring, ) error { + var opts []state.Option + if submitCfg.TxWorkerAccounts > 0 { + opts = append(opts, state.WithTxWorkerAccounts(submitCfg.TxWorkerAccounts)) + } + // key is specified. Set up core accessor and txClient core, err := state.NewCoreAccessor( kr, @@ -98,6 +117,7 @@ func (c *Client) initTxClient( conn, submitCfg.Network.String(), nil, + opts..., ) if err != nil { return err diff --git a/api/client/client_test.go b/api/client/client_test.go index 5ca0cb66da..db0ad76ed1 100644 --- a/api/client/client_test.go +++ b/api/client/client_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "sync" "testing" "time" @@ -192,6 +193,89 @@ func TestSubmission(t *testing.T) { } } +// TestClientWithSubmission tests the client with parallel tx submission +func TestSubmission_QueuedSubmission(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + t.Cleanup(cancel) + + accounts := []string{"Elon"} + + start := time.Now() + cctx := setupConsensus(t, ctx, accounts...) + fmt.Println("consensus", time.Since(start).String()) + + bn, adminToken := bridgeNode(t, ctx, cctx) + fmt.Println("bridgeNode", time.Since(start).String()) + + cfg := Config{ + ReadConfig: ReadConfig{ + BridgeDAAddr: "http://" + bn.RPCServer.ListenAddr(), + DAAuthToken: adminToken, + EnableDATLS: false, + }, + + SubmitConfig: SubmitConfig{ + DefaultKeyName: accounts[0], + Network: "private", + CoreGRPCConfig: CoreGRPCConfig{ + Addr: cctx.GRPCClient.Target(), + }, + TxWorkerAccounts: 4, // should create 3 additional parallel tx worker accounts + }, + } + client, err := New(ctx, cfg, cctx.Keyring) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, client.Close()) + }) + + // Test State returns actual balance + balance, err := client.State.Balance(ctx) + require.NoError(t, err) + fmt.Println("balance", balance) + + // Test header read operation works + _, err = client.Header.GetByHeight(ctx, 1) + require.NoError(t, err) + + namespace := libshare.MustNewV0Namespace(bytes.Repeat([]byte{0xb}, 10)) + b, err := blob.NewBlob(libshare.ShareVersionZero, namespace, []byte("dataA"), nil) + require.NoError(t, err) + submitBlobs := []*blob.Blob{b} + + now := time.Now() + // Submit a blob from default key + height, err := client.Blob.Submit(ctx, submitBlobs, &blob.SubmitOptions{}) + require.NoError(t, err) + fmt.Println("height default", height, time.Since(now).String()) + + // Test blob read operation works + received, err := client.Blob.Get(ctx, height, namespace, b.Commitment) + require.NoError(t, err) + require.Equal(t, b.Data(), received.Data()) + fmt.Println("get default", time.Since(now).String()) + + // Spam blobs from default key parallel + wg := sync.WaitGroup{} + for range 5 { + wg.Go(func() { + // submit takes ~3 seconds to confirm each Tx on localnet + submitCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + height, err := client.Blob.Submit(submitCtx, submitBlobs, state.NewTxConfig()) + require.NoError(t, err) + fmt.Println("submit", accounts[0], height, time.Since(now).String()) + + received, err := client.Blob.Get(submitCtx, height, namespace, b.Commitment) + require.NoError(t, err) + require.Equal(t, b.Data(), received.Data()) + fmt.Println("get", accounts[0], time.Since(now).String()) + }) + } + wg.Wait() +} + type mockAPI struct { State *stateMock.MockModule Share *shareMock.MockModule @@ -326,7 +410,7 @@ func bridgeNode(t *testing.T, ctx context.Context, cctx testnode.Context) (*node require.NoError(t, err) t.Cleanup(func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) require.NoError(t, bn.Stop(ctx)) cancel() }) diff --git a/api/client/readme.md b/api/client/readme.md index 18ff77cf03..66eb4a1159 100644 --- a/api/client/readme.md +++ b/api/client/readme.md @@ -48,6 +48,8 @@ cfg := client.Config{ TLSEnabled: true, AuthToken: "your_core_auth_token", }, + // Optional: Enable parallel transaction submission with multiple worker accounts + // TxWorkerAccounts: 4, }, } @@ -111,6 +113,10 @@ balance, err := celestiaClient.State.Balance(ctx) - `DefaultKeyName`: Default key to use for transactions - `Network`: Network name (e.g., "mocha-4", "private") - `CoreGRPCConfig`: Configuration for Core node connection +- `TxWorkerAccounts`: (Optional) Number of worker accounts for transaction submission. Default: 0 + - Value of 0 submits transactions immediately (without a submission queue) + - Value of 1 uses synchronous submission (submission queue with default signer as author of transactions) + - Value of > 1 uses parallel submission (submission queue with several accounts submitting blobs). Parallel submission is not guaranteed to include blobs in the same order as they were submitted ### CoreGRPCConfig @@ -125,4 +131,4 @@ balance, err := celestiaClient.State.Balance(ctx) ## Example -See [example.go](https://github.com/celestiaorg/celestia-node/blob/light-lib/api/client/example/example.go) for a complete example of creating a client, submitting a blob, and retrieving it. +See [example.go](https://github.com/celestiaorg/celestia-node/blob/main/api/client/example/example.go) for a complete example of creating a client, submitting a blob, and retrieving it. diff --git a/state/core_access_test.go b/state/core_access_test.go index 2226864b89..151a97c939 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -360,6 +360,9 @@ func buildAccessor(t *testing.T, opts ...Option) (*CoreAccessor, []string) { cctx, _, grpcAddr := testnode.NewNetwork(t, config) + _, err := cctx.WaitForHeight(int64(2)) + require.NoError(t, err) + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, nil, opts...)