diff --git a/.env.example b/.env.example index fdd7a67..26bb542 100644 --- a/.env.example +++ b/.env.example @@ -10,4 +10,5 @@ POSTGRES_DB=starknet POSTGRES_PASSWORD= # REQUIRED STARKNET_NODE_URL= # REQUIRED if INDEXER_DATASOURCE=node NODE_APIKEY= # REQUIRED if your node provider has api key. It's api key. -NODE_HEADER_APIKEY= # REQUIRED if your node provider has api key. It's header name. \ No newline at end of file +NODE_HEADER_APIKEY= # REQUIRED if your node provider has api key. It's header name. +STARKNET_SUBSQUID_URL= # REQUIRED if INDEXER_DATASOURCE=subsquid \ No newline at end of file diff --git a/build/dipdup.yml b/build/dipdup.yml index 44b5a12..49f6ee8 100644 --- a/build/dipdup.yml +++ b/build/dipdup.yml @@ -22,6 +22,9 @@ datasources: fallback: url: ${STARKNET_FALLBACK_NODE_URL} rps: ${STARKNET_FALLBACK_NODE_RPS:-1} + subsquid: + url: ${STARKNET_SUBSQUID_URL} + rps: ${STARKNET_SUBSQUID_RPS:-5} database: kind: postgres diff --git a/cmd/indexer/config.go b/cmd/indexer/config.go index b661908..da614f9 100644 --- a/cmd/indexer/config.go +++ b/cmd/indexer/config.go @@ -11,7 +11,7 @@ type Config struct { config.Config `yaml:",inline"` LogLevel string `yaml:"log_level" validate:"omitempty,oneof=debug trace info warn error fatal panic"` Indexer indexerConfig.Config `yaml:"indexer"` - GRPC *sdkGrpc.ServerConfig `yaml:"grpc" validate:"required"` + GRPC *sdkGrpc.ServerConfig `yaml:"grpc" validate:"required"` } // Substitute - diff --git a/docker-compose.yml b/docker-compose.yml index 96fe07c..23df242 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,7 +45,7 @@ services: hasura: image: hasura/graphql-engine:v2.28.0 ports: - - 127.0.0.1:8080:8080 + - "127.0.0.1:8080:8080" restart: always environment: - HASURA_GRAPHQL_DATABASE_URL=postgres://${POSTGRES_USER:-dipdup}:${POSTGRES_PASSWORD:-changeme}@${HASURA_POSTGRES_HOST:-db}:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-starknet} diff --git a/go.mod b/go.mod index 41a6fa7..66c978a 100644 --- a/go.mod +++ b/go.mod @@ -5,17 +5,18 @@ go 1.23.5 require ( github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10 github.com/dipdup-io/workerpool v0.0.4 - github.com/dipdup-net/go-lib v0.3.3 - github.com/dipdup-net/indexer-sdk v0.0.4 + github.com/dipdup-net/go-lib v0.3.6 + github.com/dipdup-net/indexer-sdk v0.0.5 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/goccy/go-json v0.10.2 github.com/karlseguin/ccache/v2 v2.0.8 github.com/lib/pq v1.10.9 + github.com/opus-domini/fast-shot v1.1.4 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.30.0 github.com/shopspring/decimal v1.3.1 github.com/spf13/cobra v1.7.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/uptrace/bun v1.1.14 go.uber.org/mock v0.2.0 google.golang.org/grpc v1.58.3 @@ -110,7 +111,7 @@ require ( go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/go.sum b/go.sum index 58ed31f..c52223b 100644 --- a/go.sum +++ b/go.sum @@ -43,10 +43,10 @@ github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10 h1:5QbKq github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10/go.mod h1:AY8SrbR86x/TYipue99eZZBN0VJpfoMovp451bcHF3M= github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= -github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8= -github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4= -github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U= -github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg= +github.com/dipdup-net/go-lib v0.3.6 h1:ctas0AYDgN8gfKLvrIgRsMHqvH6wwmKJaNoEWA8YtrM= +github.com/dipdup-net/go-lib v0.3.6/go.mod h1:UibJawy7ILP3jD+vG3ebNiRjv05+G6Wuv2HmGqaadTA= +github.com/dipdup-net/indexer-sdk v0.0.5 h1:BcI+R+hRO7uDMVrp3S1WzSSNPdC+t6+nrmFUxua8aYs= +github.com/dipdup-net/indexer-sdk v0.0.5/go.mod h1:v4j99EnJs2T6CheLYCcD095WHAb7RR56JJNEAWJv0DA= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v25.0.6+incompatible h1:5cPwbwriIcsua2REJe8HqQV+6WlWc1byg2QSXzBxBGg= @@ -233,6 +233,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0= github.com/opencontainers/image-spec v1.1.0-rc4/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= +github.com/opus-domini/fast-shot v1.1.4 h1:xWTO/4JEILjZM/rP6mwiWe/jZyE9+L1G9sC4BsoynAk= +github.com/opus-domini/fast-shot v1.1.4/go.mod h1:BOr2JXHQJhOnYsxyCvFbgBP3BuYCjgh2YfzWKweEL0A= github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= @@ -270,6 +272,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -277,8 +281,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/testcontainers/testcontainers-go v0.22.0 h1:hOK4NzNu82VZcKEB1aP9LO1xYssVFMvlfeuDW9JMmV0= github.com/testcontainers/testcontainers-go v0.22.0/go.mod h1:k0YiPa26xJCRUbUkYqy5rY6NGvSbVCeUBXCvucscBR4= github.com/testcontainers/testcontainers-go/modules/postgres v0.22.0 h1:OHVaqu9MRGMSlro9AD5UCfj8XiHwQdhB9thE4vINq+E= @@ -341,8 +345,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU= -golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/pkg/indexer/config/config.go b/pkg/indexer/config/config.go index 1364cc5..8cc8708 100644 --- a/pkg/indexer/config/config.go +++ b/pkg/indexer/config/config.go @@ -2,11 +2,11 @@ package config // Config - configuration structure for indexer type Config struct { - Name string `yaml:"name" validate:"omitempty"` - StartLevel uint64 `yaml:"start_level" validate:"omitempty"` - ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"` - Timeout uint64 `yaml:"timeout" validate:"omitempty"` + Name string `yaml:"name" validate:"omitempty"` + StartLevel uint64 `yaml:"start_level" validate:"omitempty"` + ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"` + Timeout uint64 `yaml:"timeout" validate:"omitempty"` ClassInterfacesDir string `yaml:"class_interfaces_dir" validate:"required,dir"` - BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"` - Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node"` + BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"` + Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node subsquid"` } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 9d01558..c4e5835 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -3,6 +3,8 @@ package indexer import ( "bytes" "context" + sqdAdapter "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/adapter" + sqdRcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver" "runtime" "sync" "time" @@ -32,8 +34,9 @@ const ( type Indexer struct { modules.BaseModule - cfg config.Config - queue map[uint64]receiver.Result + cfg config.Config + datasource map[string]ddConfig.DataSource + queue map[uint64]receiver.Result address models.IAddress blocks models.IBlock @@ -52,7 +55,8 @@ type Indexer struct { state *state idGenerator *generator.IdGenerator - receiver *receiver.Receiver + receiver receiver.IReceiver + adapter *sqdAdapter.Adapter statusChecker *statusChecker rollbackManager models.Rollback @@ -60,7 +64,8 @@ type Indexer struct { rollbackRerun chan struct{} rollbackWait *sync.WaitGroup - txWriteMutex *sync.Mutex + txWriteMutex *sync.Mutex + cancelReceiver context.CancelFunc } // New - creates new indexer entity @@ -72,6 +77,7 @@ func New( indexer := &Indexer{ BaseModule: modules.New("indexer"), cfg: cfg, + datasource: datasource, queue: make(map[uint64]receiver.Result), stateRepo: storage.State, address: storage.Address, @@ -93,11 +99,43 @@ func New( txWriteMutex: new(sync.Mutex), rollbackWait: new(sync.WaitGroup), } - rcvr, err := receiver.NewReceiver(cfg, datasource) - if err != nil { - return nil, err + + switch cfg.Datasource { + case "subsquid": + sqdReceiver, err := sqdRcvr.New( + cfg, + datasource, + cfg.StartLevel, + 2, + func() uint64 { + return indexer.state.Height() + }, + ) + if err != nil { + return nil, err + } + + indexer.receiver = sqdReceiver + indexer.adapter = sqdAdapter.New(sqdReceiver.GetResults()) + + if err = indexer.adapter.AttachTo(sqdReceiver, sqdRcvr.BlocksOutput, sqdAdapter.BlocksInput); err != nil { + return nil, errors.Wrap(err, "while attaching adapter to receiver") + } + if err = indexer.adapter.AttachTo(sqdReceiver, sqdRcvr.HeadOutput, sqdAdapter.HeadInput); err != nil { + return nil, errors.Wrap(err, "while attaching adapter to receiver") + } + indexer.CreateInputWithCapacity(InputStopSubsquid, 1) + if err = indexer.AttachTo(indexer.adapter, sqdAdapter.HeadAchieved, InputStopSubsquid); err != nil { + return nil, errors.Wrap(err, "while attaching indexer to subsquid done channel") + } + + default: + rcvr, err := receiver.NewReceiver(cfg, datasource) + if err != nil { + return nil, err + } + indexer.receiver = rcvr } - indexer.receiver = rcvr indexer.CreateOutput(OutputBlocks) @@ -129,17 +167,26 @@ func New( // Start - func (indexer *Indexer) Start(ctx context.Context) { indexer.Log.Info().Msg("starting indexer...") + receiverCtx, cancel := context.WithCancel(ctx) + indexer.cancelReceiver = cancel + if err := indexer.init(ctx); err != nil { indexer.Log.Err(err).Msg("state initializing error") return } - indexer.receiver.Start(ctx) + indexer.receiver.Start(receiverCtx) + indexer.statusChecker.Start(receiverCtx) - indexer.statusChecker.Start(ctx) + switch indexer.cfg.Datasource { + case "subsquid": + indexer.adapter.Start(receiverCtx) + indexer.G.GoCtx(ctx, indexer.listenStopSubsquid) + default: + indexer.G.GoCtx(ctx, indexer.sync) + } indexer.G.GoCtx(ctx, indexer.saveBlocks) - indexer.G.GoCtx(ctx, indexer.sync) } // Name - @@ -162,7 +209,6 @@ func (indexer *Indexer) Close() error { if err := indexer.receiver.Close(); err != nil { return err } - close(indexer.rollback) close(indexer.rollbackRerun) return nil @@ -201,8 +247,8 @@ func (indexer *Indexer) checkQueue(ctx context.Context) bool { return false } -func (indexer *Indexer) getNewBlocks(ctx context.Context) error { - head, err := indexer.receiver.Head(ctx) +func (indexer *Indexer) getNewBlocks(ctx context.Context, commonReceiver *receiver.Receiver) error { + head, err := commonReceiver.Head(ctx) if err != nil { return err } @@ -242,7 +288,7 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { if indexer.checkQueue(ctx) { return nil } - indexer.receiver.AddTask(height) + commonReceiver.AddTask(height) } } @@ -267,7 +313,12 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { } func (indexer *Indexer) sync(ctx context.Context) { - if err := indexer.getNewBlocks(ctx); err != nil { + commonReceiver, ok := indexer.receiver.(*receiver.Receiver) + if !ok { + log.Panic().Msg("incorrect receiver type") + return + } + if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } @@ -281,11 +332,11 @@ func (indexer *Indexer) sync(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - if err := indexer.getNewBlocks(ctx); err != nil { + if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } case <-indexer.rollbackRerun: - if err := indexer.getNewBlocks(ctx); err != nil { + if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } } @@ -303,7 +354,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { case result := <-indexer.receiver.Results(): indexer.queue[result.Block.Height] = result - if indexer.state.Height() == 0 && !zeroBlock { + if indexer.state.Height() == 0 && !zeroBlock && indexer.cfg.StartLevel == 0 { if data, ok := indexer.queue[0]; ok { if err := indexer.handleBlock(ctx, data); err != nil { indexer.Log.Err(err).Msg("handle block") @@ -381,7 +432,11 @@ func (indexer *Indexer) makeRollback(ctx context.Context, height uint64) error { delete(indexer.queue, key) } - indexer.receiver.Clear() + commonReceiver, ok := indexer.receiver.(*receiver.Receiver) + if !ok { + return errors.Errorf("incorrect receiver type") + } + commonReceiver.Clear() if err := indexer.Rollback(ctx, height-1); err != nil { return errors.Wrap(err, "rollback") @@ -467,3 +522,50 @@ func (indexer *Indexer) Rollback(ctx context.Context, height uint64) error { return indexer.rollbackManager.Rollback(ctx, indexer.Name(), height) } + +func (indexer *Indexer) listenStopSubsquid(ctx context.Context) { + input := indexer.MustInput(InputStopSubsquid) + + for { + select { + case <-ctx.Done(): + return + case headLevel, ok := <-input.Listen(): + if !ok { + indexer.Log.Warn().Msg("can't read message from input, it was drained and closed") + if err := indexer.Close(); err != nil { + return + } + } + + for indexer.state.Height() != headLevel { + time.Sleep(time.Millisecond * 10) + } + + indexer.cancelReceiver() + if err := indexer.receiver.Close(); err != nil { + return + } + if err := indexer.adapter.Close(); err != nil { + return + } + if err := indexer.statusChecker.Close(); err != nil { + return + } + + indexer.cfg.Datasource = "node" + rcvr, err := receiver.NewReceiver(indexer.cfg, indexer.datasource) + if err != nil { + indexer.Log.Error().Msg("can't create node receiver") + return + } + indexer.receiver = rcvr + indexer.statusChecker.SetReceiver(rcvr) + + indexer.Log.Info().Msgf("data source has been replaced with node, starting...") + indexer.receiver.Start(ctx) + indexer.statusChecker.Start(ctx) + indexer.G.GoCtx(ctx, indexer.sync) + } + } +} diff --git a/pkg/indexer/messages.go b/pkg/indexer/messages.go index d356e50..6e990ce 100644 --- a/pkg/indexer/messages.go +++ b/pkg/indexer/messages.go @@ -7,6 +7,7 @@ import ( // topics const ( OutputBlocks string = "blocks" + InputStopSubsquid ) // IndexerMessage - diff --git a/pkg/indexer/parser/parser.go b/pkg/indexer/parser/parser.go index 4779c8e..5020685 100644 --- a/pkg/indexer/parser/parser.go +++ b/pkg/indexer/parser/parser.go @@ -2,7 +2,6 @@ package parser import ( "context" - starknetData "github.com/dipdup-io/starknet-go-api/pkg/data" "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-io/starknet-indexer/pkg/indexer/cache" @@ -40,7 +39,7 @@ func createParser( // Parse - func Parse( ctx context.Context, - receiver *receiver.Receiver, + receiver receiver.IReceiver, cache *cache.Cache, idGenerator *generator.IdGenerator, blocks storage.IBlock, diff --git a/pkg/indexer/parser/resolver/resolver.go b/pkg/indexer/parser/resolver/resolver.go index 0203851..78e6f42 100644 --- a/pkg/indexer/parser/resolver/resolver.go +++ b/pkg/indexer/parser/resolver/resolver.go @@ -19,7 +19,7 @@ type Resolver struct { blocks storage.IBlock addresses storage.IAddress proxies storage.IProxy - receiver *receiver.Receiver + receiver receiver.IReceiver cache *cache.Cache idGenerator *generator.IdGenerator blockContext *data.BlockContext @@ -27,7 +27,7 @@ type Resolver struct { // NewResolver - func NewResolver( - receiver *receiver.Receiver, + receiver receiver.IReceiver, cache *cache.Cache, idGenerator *generator.IdGenerator, blocks storage.IBlock, diff --git a/pkg/indexer/receiver/receiver.go b/pkg/indexer/receiver/receiver.go index 6dc05ee..624b3f0 100644 --- a/pkg/indexer/receiver/receiver.go +++ b/pkg/indexer/receiver/receiver.go @@ -31,7 +31,7 @@ func NewResult() Result { } } -func (r *Result) setBlock(block Block) { +func (r *Result) SetBlock(block Block) { r.mx.Lock() { r.Block = block @@ -39,7 +39,7 @@ func (r *Result) setBlock(block Block) { r.mx.Unlock() } -func (r *Result) setTraces(traces []starknet.Trace) { +func (r *Result) SetTraces(traces []starknet.Trace) { r.mx.Lock() { r.Traces = traces @@ -47,7 +47,7 @@ func (r *Result) setTraces(traces []starknet.Trace) { r.mx.Unlock() } -func (r *Result) setStateUpdates(stateUpdate starknetData.StateUpdate) { +func (r *Result) SetStateUpdates(stateUpdate starknetData.StateUpdate) { r.mx.Lock() { r.StateUpdate = stateUpdate @@ -55,6 +55,16 @@ func (r *Result) setStateUpdates(stateUpdate starknetData.StateUpdate) { r.mx.Unlock() } +type IReceiver interface { + Start(ctx context.Context) + Close() error + QueueSize() int + Head(ctx context.Context) (uint64, error) + GetClass(ctx context.Context, hash string) (starknetData.Class, error) + GetBlockStatus(ctx context.Context, height uint64) (storage.Status, error) + Results() <-chan Result +} + // Receiver - type Receiver struct { api API @@ -243,7 +253,7 @@ func (r *Receiver) getBlock(ctx context.Context, blockId starknetData.BlockID, r time.Sleep(time.Second) continue } - result.setBlock(response) + result.SetBlock(response) break } } @@ -272,7 +282,7 @@ func (r *Receiver) traceBlock(ctx context.Context, blockId starknetData.BlockID, time.Sleep(time.Second) continue } - result.setTraces(response) + result.SetTraces(response) break } } @@ -301,7 +311,7 @@ func (r *Receiver) receiveStateUpdate(ctx context.Context, blockId starknetData. time.Sleep(time.Second) continue } - result.setStateUpdates(response) + result.SetStateUpdates(response) break } } diff --git a/pkg/indexer/status_checker.go b/pkg/indexer/status_checker.go index 3a50908..7667add 100644 --- a/pkg/indexer/status_checker.go +++ b/pkg/indexer/status_checker.go @@ -2,11 +2,11 @@ package indexer import ( "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" "time" "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-io/starknet-indexer/internal/storage/postgres" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" "github.com/dipdup-io/workerpool" sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" "github.com/rs/zerolog" @@ -26,13 +26,13 @@ type statusChecker struct { invoke storage.IInvoke l1Handlers storage.IL1Handler transactable sdk.Transactable - receiver *receiver.Receiver + receiver receiver.IReceiver log zerolog.Logger g workerpool.Group } func newStatusChecker( - receiver *receiver.Receiver, + receiver receiver.IReceiver, blocks storage.IBlock, declares storage.IDeclare, deploys storage.IDeploy, @@ -56,6 +56,10 @@ func newStatusChecker( } } +func (checker *statusChecker) SetReceiver(receiver receiver.IReceiver) { + checker.receiver = receiver +} + // Start - func (checker *statusChecker) Start(ctx context.Context) { checker.g.GoCtx(ctx, checker.start) diff --git a/pkg/indexer/subsquid/adapter/adapter.go b/pkg/indexer/subsquid/adapter/adapter.go new file mode 100644 index 0000000..9513255 --- /dev/null +++ b/pkg/indexer/subsquid/adapter/adapter.go @@ -0,0 +1,48 @@ +package adapter + +import ( + "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-net/indexer-sdk/pkg/modules" +) + +type Adapter struct { + modules.BaseModule + results chan receiver.Result + head uint64 +} + +var _ modules.Module = (*Adapter)(nil) + +const ( + BlocksInput = "blocks" + HeadInput = "head" + BlocksOutput = "parsed_blocks" + StopOutput = "stop" + HeadAchieved = "head_achieved" +) + +func New(resultsChannel chan receiver.Result) *Adapter { + m := &Adapter{ + BaseModule: modules.New("sqd adapter"), + results: resultsChannel, + } + m.CreateInputWithCapacity(BlocksInput, 128) + m.CreateInputWithCapacity(HeadInput, 1) + m.CreateOutput(BlocksOutput) + m.CreateOutput(StopOutput) + m.CreateOutput(HeadAchieved) + + return m +} + +func (a *Adapter) Start(ctx context.Context) { + a.Log.Info().Msg("starting...") + a.G.GoCtx(ctx, a.listen) +} + +func (a *Adapter) Close() error { + a.Log.Info().Msg("closing...") + a.G.Wait() + return nil +} diff --git a/pkg/indexer/subsquid/adapter/convert.go b/pkg/indexer/subsquid/adapter/convert.go new file mode 100644 index 0000000..2a5b1c3 --- /dev/null +++ b/pkg/indexer/subsquid/adapter/convert.go @@ -0,0 +1,79 @@ +package adapter + +import ( + "context" + "fmt" + "github.com/dipdup-io/starknet-go-api/pkg/data" + "github.com/dipdup-io/starknet-go-api/pkg/encoding" + "github.com/dipdup-io/starknet-indexer/internal/storage" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" + "time" +) + +func (a *Adapter) convert(_ context.Context, block *api.SqdBlockResponse) (receiver.Result, error) { + result := receiver.NewResult() + b := receiver.Block{ + Height: block.Header.Number, + Status: storage.NewStatus(block.Header.Status), + Hash: data.Felt(block.Header.Hash).Bytes(), + ParentHash: data.Felt(block.Header.ParentHash).Bytes(), + NewRoot: encoding.MustDecodeHex(block.Header.NewRoot), + Time: time.Unix(block.Header.Timestamp, 0).UTC(), + SequencerAddress: encoding.MustDecodeHex(block.Header.SequencerAddress), + Transactions: ConvertTransactions(block), + Version: &block.Header.StarknetVersion, + Receipts: nil, + } + result.SetBlock(b) + + traces, err := ConvertTraces(block) + if err != nil { + return result, err + } + result.SetTraces(traces) + + stateUpdates, err := ConvertStateUpdates(block) + if err != nil { + return result, err + } + result.SetStateUpdates(stateUpdates) + + return result, nil +} + +func uint64ToFelt(value *uint64) data.Felt { + if value == nil { + return "" + } + return data.Felt(fmt.Sprintf("0x%x", *value)) +} + +func stringToFelt(value *string) data.Felt { + if value == nil { + return "" + } + return data.Felt(*value) +} + +func parseStringSlice(value *[]string) []string { + if value == nil { + return []string{} + } + return *value +} + +func parseString(value *string) string { + if value == nil { + return "" + } + return *value +} + +func stringSliceToFeltSlice(income []string) []data.Felt { + result := make([]data.Felt, len(income)) + for i := range income { + result[i] = data.Felt(income[i]) + } + return result +} diff --git a/pkg/indexer/subsquid/adapter/listen.go b/pkg/indexer/subsquid/adapter/listen.go new file mode 100644 index 0000000..d2d0323 --- /dev/null +++ b/pkg/indexer/subsquid/adapter/listen.go @@ -0,0 +1,60 @@ +package adapter + +import ( + "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" +) + +func (a *Adapter) listen(ctx context.Context) { + a.Log.Info().Msg("module started") + + blocksInput := a.MustInput(BlocksInput) + headInput := a.MustInput(HeadInput) + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-blocksInput.Listen(): + if !ok { + a.Log.Warn().Msg("can't read message from input, it was drained and closed") + a.MustOutput(StopOutput).Push(struct{}{}) + return + } + + block, ok := msg.(*api.SqdBlockResponse) + + if !ok { + a.Log.Warn().Msgf("invalid message type: %T", msg) + continue + } + + results, err := a.convert(ctx, block) + if err != nil { + a.Log.Err(err). + Uint64("height", block.Header.Number). + Msg("convert error") + a.MustOutput(StopOutput).Push(struct{}{}) + continue + } + a.results <- results + if results.Block.Height == a.head { + a.MustOutput(HeadAchieved).Push(a.head) + return + } + + case msg, ok := <-headInput.Listen(): + if !ok { + a.Log.Warn().Msg("can't read message from input, it was drained and closed") + a.MustOutput(StopOutput).Push(struct{}{}) + return + } + head, ok := msg.(uint64) + if !ok { + a.Log.Warn().Msgf("invalid message type: %T", msg) + continue + } + a.head = head + } + } +} diff --git a/pkg/indexer/subsquid/adapter/state_update.go b/pkg/indexer/subsquid/adapter/state_update.go new file mode 100644 index 0000000..e137f9f --- /dev/null +++ b/pkg/indexer/subsquid/adapter/state_update.go @@ -0,0 +1,68 @@ +package adapter + +import ( + "github.com/dipdup-io/starknet-go-api/pkg/data" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" +) + +func ConvertStateUpdates(block *api.SqdBlockResponse) (data.StateUpdate, error) { + storageDiffs := make(map[data.Felt][]data.KeyValue) + for i := range block.StorageDiffs { + address := data.Felt(block.StorageDiffs[i].Address) + storageDiffs[address] = append(storageDiffs[address], data.KeyValue{ + Key: data.Felt(block.StorageDiffs[i].Key), + Value: data.Felt(block.StorageDiffs[i].Value), + }) + } + + declaredClasses := make([]data.DeclaredClass, 0) + for i := range block.StateUpdates[0].DeclaredClasses { + declaredClass := block.StateUpdates[0].DeclaredClasses[i] + declaredClasses = append(declaredClasses, data.DeclaredClass{ + ClassHash: data.Felt(declaredClass.ClassHash), + CompiledClassHash: data.Felt(declaredClass.CompiledClassHash), + }) + } + + replacedClasses := make([]data.ReplacedClass, 0) + for i := range block.StateUpdates[0].ReplacedClasses { + replacedClass := block.StateUpdates[0].ReplacedClasses[i] + replacedClasses = append(replacedClasses, data.ReplacedClass{ + Address: data.Felt(replacedClass.ContractAddress), + ClassHash: data.Felt(replacedClass.ClassHash), + }) + } + + oldDeclaredContracts := stringSliceToFeltSlice(block.StateUpdates[0].DeprecatedDeclaredClasses) + + deployedContracts := make([]data.DeployedContract, 0) + for i := range block.StateUpdates[0].DeployedContracts { + deployedContract := block.StateUpdates[0].DeployedContracts[i] + deployedContracts = append(deployedContracts, data.DeployedContract{ + Address: data.Felt(deployedContract.Address), + ClassHash: data.Felt(deployedContract.ClassHash), + }) + } + + nonces := make(map[data.Felt]data.Felt) + for i := range block.StateUpdates[0].Nonces { + nonce := block.StateUpdates[0].Nonces[i] + nonces[data.Felt(nonce.ContractAddress)] = data.Felt(nonce.Nonce) + } + + stateUpdate := data.StateUpdate{ + BlockHash: data.Felt(block.Header.Hash), + NewRoot: data.Felt(block.StateUpdates[0].NewRoot), + OldRoot: data.Felt(block.StateUpdates[0].OldRoot), + StateDiff: data.StateDiff{ + StorageDiffs: storageDiffs, + DeclaredClasses: declaredClasses, + ReplacedClasses: replacedClasses, + OldDeclaredContracts: oldDeclaredContracts, + DeployedContracts: deployedContracts, + Nonces: nonces, + }, + } + + return stateUpdate, nil +} diff --git a/pkg/indexer/subsquid/adapter/traces.go b/pkg/indexer/subsquid/adapter/traces.go new file mode 100644 index 0000000..b8b32ce --- /dev/null +++ b/pkg/indexer/subsquid/adapter/traces.go @@ -0,0 +1,237 @@ +package adapter + +import ( + "github.com/dipdup-io/starknet-go-api/pkg/data" + starknet "github.com/dipdup-io/starknet-go-api/pkg/sequencer" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" + "golang.org/x/exp/slices" + "sort" +) + +const ( + Execute = "execute" + Constructor = "constructor" + Validate = "validate" + FeeTransfer = "fee_transfer" +) + +func ConvertTraces(block *api.SqdBlockResponse) ([]starknet.Trace, error) { + traces := make([]starknet.Trace, 0) + + for i := range block.Transactions { + tx := block.Transactions[i] + resultTrace := starknet.Trace{ + RevertedError: "", + ValidateInvocation: nil, + FunctionInvocation: nil, + FeeTransferInvocation: nil, + Signature: nil, + TransactionHash: data.Felt(tx.TransactionHash), + } + txTraces := getTxTraces(block.Traces, tx.TransactionIndex) + if len(txTraces) == 0 { + traces = append(traces, resultTrace) + continue + } + txEvents := getTxEvents(block.Events, tx.TransactionIndex) + txMessages := getTxMessages(block.Messages, tx.TransactionIndex) + + trace := buildTraceTree(txTraces, txEvents, txMessages) + trace.TransactionHash = data.Felt(tx.TransactionHash) + traces = append(traces, trace) + } + + return traces, nil +} + +func getTxTraces(traces []api.TraceResponse, txIndex uint) []api.TraceResponse { + var result []api.TraceResponse + for i := range traces { + trace := traces[i] + if trace.TransactionIndex == txIndex { + result = append(result, trace) + } + } + return result +} + +func getTxEvents(events []api.Event, txIndex uint) []api.Event { + var result []api.Event + for i := range events { + event := events[i] + if event.TransactionIndex == txIndex { + result = append(result, event) + } + } + return result +} + +func getTxMessages(messages []api.Message, txIndex uint) []api.Message { + var result []api.Message + for i := range messages { + message := messages[i] + if message.TransactionIndex == txIndex { + result = append(result, message) + } + } + return result +} + +func buildTraceTree(flatInvocations []api.TraceResponse, events []api.Event, messages []api.Message) starknet.Trace { + resultTrace := starknet.Trace{ + RevertedError: "", + ValidateInvocation: nil, + FunctionInvocation: nil, + FeeTransferInvocation: nil, + Signature: nil, + TransactionHash: "", + } + mapAddressInvocationType := make(map[int]string) + sort.Slice(flatInvocations, func(i, j int) bool { + return compareTraceAddresses(flatInvocations[i].TraceAddress, flatInvocations[j].TraceAddress) + }) + + for invokationIndex := range flatInvocations { + invocation := flatInvocations[invokationIndex] + calldata := stringSliceToFeltSlice(invocation.Calldata) + result := stringSliceToFeltSlice(invocation.Result) + sqdEvents := filterEventsByAddress(events, invocation.TraceAddress) + adaptedEvents := make([]data.Event, len(sqdEvents)) + + for i := range sqdEvents { + event := sqdEvents[i] + keys := stringSliceToFeltSlice(event.Keys) + eventData := stringSliceToFeltSlice(event.Data) + + eventOrder := uint64(event.EvenIndex) + switch invocation.InvocationType { + case Execute, FeeTransfer: + default: + eventOrder = 0 + } + + adaptedEvents[i] = data.Event{ + Order: eventOrder, + FromAddress: "", + Keys: keys, + Data: eventData, + } + } + + sqdMessages := filterMessagesByAddress(messages, invocation.TraceAddress) + adaptedMessages := make([]data.Message, len(sqdMessages)) + for i := range sqdMessages { + message := sqdMessages[i] + payload := stringSliceToFeltSlice(message.Payload) + adaptedMessages[i] = data.Message{ + Order: uint64(message.Order), + FromAddress: parseString(message.FromAddress), + ToAddress: message.ToAddress, + Selector: "", + Payload: payload, + Nonce: "", + } + } + + currentInvocation := starknet.Invocation{ + CallerAddress: data.Felt(invocation.CallerAddress), + ContractAddress: data.Felt(invocation.ContractAddress), + Calldata: calldata, + CallType: parseString(invocation.CallType), + ClassHash: stringToFelt(invocation.ClassHash), + Selector: stringToFelt(invocation.EntryPointSelector), + EntrypointType: parseString(invocation.EntryPointType), + Result: result, + ExecutionResources: starknet.ExecutionResources{}, + InternalCalls: make([]starknet.Invocation, 0), + Events: adaptedEvents, + Messages: adaptedMessages, + } + + level := len(invocation.TraceAddress) + if level == 1 { + mapAddressInvocationType[invocation.TraceAddress[0]] = invocation.InvocationType + switch invocation.InvocationType { + case FeeTransfer: + resultTrace.FeeTransferInvocation = ¤tInvocation + case Validate: + resultTrace.ValidateInvocation = ¤tInvocation + case Execute, Constructor: + if invocation.RevertReason != nil { + resultTrace.RevertedError = parseString(invocation.RevertReason) + } else { + resultTrace.FunctionInvocation = ¤tInvocation + } + } + + continue + } + + parentIndex := invocation.TraceAddress[:level-1] + parent := findParentByOrder(&resultTrace, parentIndex, mapAddressInvocationType) + if parent != nil { + parent.InternalCalls = append(parent.InternalCalls, currentInvocation) + } + } + + return resultTrace +} + +func compareTraceAddresses(a, b []int) bool { + for i := 0; i < len(a) && i < len(b); i++ { + if a[i] != b[i] { + return a[i] < b[i] + } + } + return len(a) < len(b) +} + +func findParentByOrder(trace *starknet.Trace, traceAddress []int, mapAddressInvocationType map[int]string) *starknet.Invocation { + var rootIndex int + if len(traceAddress) == 1 { + rootIndex = traceAddress[0] + } else { + rootIndex = traceAddress[:1][0] + } + + var rootInvocation *starknet.Invocation + switch invocationType := mapAddressInvocationType[rootIndex]; invocationType { + case "fee_transfer": + rootInvocation = trace.FeeTransferInvocation + case "validate": + rootInvocation = trace.ValidateInvocation + case "execute", "constructor": + rootInvocation = trace.FunctionInvocation + } + + current := rootInvocation + for i := 1; i < len(traceAddress); i++ { + if current == nil || len(current.InternalCalls) == 0 { + return nil + } + current = ¤t.InternalCalls[len(current.InternalCalls)-1] + } + return current +} + +func filterEventsByAddress(events []api.Event, targetAddress []int) []api.Event { + var result []api.Event + + for _, event := range events { + if slices.Equal(event.TraceAddress, targetAddress) { + result = append(result, event) + } + } + return result +} + +func filterMessagesByAddress(messages []api.Message, targetAddress []int) []api.Message { + var result []api.Message + + for _, message := range messages { + if slices.Equal(message.TraceAddress, targetAddress) { + result = append(result, message) + } + } + return result +} diff --git a/pkg/indexer/subsquid/adapter/txs.go b/pkg/indexer/subsquid/adapter/txs.go new file mode 100644 index 0000000..e3ba1d2 --- /dev/null +++ b/pkg/indexer/subsquid/adapter/txs.go @@ -0,0 +1,84 @@ +package adapter + +import ( + "github.com/dipdup-io/starknet-go-api/pkg/data" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" +) + +func ConvertTransactions(block *api.SqdBlockResponse) []receiver.Transaction { + txs := block.Transactions + resultTxs := make([]receiver.Transaction, len(txs)) + for i := range txs { + tx := txs[i] + var body any + switch tx.Type { + case data.TransactionTypeInvoke: + body = &data.Invoke{ + MaxFee: stringToFelt(tx.MaxFee), + Nonce: uint64ToFelt(tx.Nonce), + ContractAddress: stringToFelt(tx.ContractAddress), + EntrypointSelector: stringToFelt(tx.EntryPointSelector), + SenderAddress: stringToFelt(tx.SenderAddress), + Signature: parseStringSlice(tx.Signature), + Calldata: parseStringSlice(tx.Calldata), + } + case data.TransactionTypeDeclare: + body = &data.Declare{ + MaxFee: stringToFelt(tx.MaxFee), + Nonce: uint64ToFelt(tx.Nonce), + SenderAddress: stringToFelt(tx.SenderAddress), + ContractAddress: stringToFelt(tx.ContractAddress), + Signature: parseStringSlice(tx.Signature), + ClassHash: stringToFelt(tx.ClassHash), + CompiledClassHash: stringToFelt(tx.CompiledClassHash), + } + case data.TransactionTypeDeploy: + body = &data.Deploy{ + ContractAddressSalt: parseString(tx.ContractAddressSalt), + ConstructorCalldata: parseStringSlice(tx.ConstructorCalldata), + ClassHash: stringToFelt(tx.ClassHash), + ContractAddress: getDeployContractAddress(block, tx.TransactionIndex), + } + case data.TransactionTypeDeployAccount: + body = &data.DeployAccount{ + MaxFee: stringToFelt(tx.MaxFee), + Nonce: uint64ToFelt(tx.Nonce), + ContractAddress: stringToFelt(tx.ContractAddress), + ContractAddressSalt: parseString(tx.ContractAddressSalt), + ClassHash: stringToFelt(tx.ClassHash), + ConstructorCalldata: parseStringSlice(tx.ConstructorCalldata), + Signature: parseStringSlice(tx.Signature), + } + case data.TransactionTypeL1Handler: + body = &data.L1Handler{ + Nonce: uint64ToFelt(tx.Nonce), + ContractAddress: stringToFelt(tx.ContractAddress), + EntrypointSelector: stringToFelt(tx.EntryPointSelector), + Calldata: parseStringSlice(tx.Calldata), + } + default: + return nil + } + + resultTxs[i] = receiver.Transaction{ + Type: tx.Type, + Version: data.Felt(tx.Version), + Hash: data.Felt(tx.TransactionHash), + ActualFee: data.Felt(tx.ActualFee.Amount), + Body: body, + } + } + + return resultTxs +} + +func getDeployContractAddress(block *api.SqdBlockResponse, txIndex uint) data.Felt { + for i := range block.Traces { + trace := block.Traces[i] + if trace.TransactionIndex == txIndex && trace.TraceType == data.TransactionTypeDeploy { + return data.Felt(trace.ContractAddress) + } + } + return "" +} diff --git a/pkg/indexer/subsquid/receiver/api/api.go b/pkg/indexer/subsquid/receiver/api/api.go new file mode 100644 index 0000000..f905cca --- /dev/null +++ b/pkg/indexer/subsquid/receiver/api/api.go @@ -0,0 +1,99 @@ +package api + +import ( + "context" + "fmt" + "github.com/dipdup-net/go-lib/config" + fastshot "github.com/opus-domini/fast-shot" + "github.com/opus-domini/fast-shot/constant/mime" + "strconv" +) + +type Subsquid struct { + httpClient fastshot.ClientHttpMethods +} + +func NewSubsquid(cfg config.DataSource) *Subsquid { + var httpClient = fastshot.NewClient(cfg.URL). + Build() + + return &Subsquid{ + httpClient: httpClient, + } +} + +func (s *Subsquid) GetWorkerUrl(_ context.Context, startLevel uint64) (string, error) { + path := fmt.Sprintf("/%d/worker", startLevel) + response, err := s.httpClient. + GET(path). + Send() + + if err != nil { + return "", err + } + + return response.Body().AsString() +} + +func (s *Subsquid) GetBlocks(ctx context.Context, from, to uint64, workerUrl string) ([]*SqdBlockResponse, error) { + var workerClient = fastshot.NewClient(workerUrl). + Build() + + response, err := workerClient.POST(""). + Context().Set(ctx). + Header().AddContentType(mime.JSON). + Body().AsJSON(NewRequest(from, to)). + Send() + + if err != nil { + return nil, err + } + + var result []*SqdBlockResponse + err = response.Body().AsJSON(&result) + if err != nil { + return nil, err + } + + return result, err +} + +func (s *Subsquid) GetBlankBlocks(ctx context.Context, startLevel uint64, workerUrl string) ([]*SqdBlockResponse, error) { + var workerClient = fastshot.NewClient(workerUrl). + Build() + + response, err := workerClient.POST(""). + Context().Set(ctx). + Header().AddContentType(mime.JSON). + Body().AsJSON(NewSimpleRequest(startLevel)). + Send() + + if err != nil { + return nil, err + } + + var result []*SqdBlockResponse + err = response.Body().AsJSON(&result) + if err != nil { + return nil, err + } + + return result, err +} + +func (s *Subsquid) GetHead(_ context.Context) (uint64, error) { + response, err := s.httpClient. + GET("/height"). + Send() + + if err != nil { + return 0, err + } + + stringResponse, err := response.Body().AsString() + if err != nil { + return 0, err + } + + return strconv.ParseUint(stringResponse, 10, 64) +} diff --git a/pkg/indexer/subsquid/receiver/api/request.go b/pkg/indexer/subsquid/receiver/api/request.go new file mode 100644 index 0000000..bfc23fb --- /dev/null +++ b/pkg/indexer/subsquid/receiver/api/request.go @@ -0,0 +1,194 @@ +package api + +type Request struct { + Type string `json:"type"` + FromBlock uint64 `json:"fromBlock"` + ToBlock uint64 `json:"toBlock,omitempty"` + IncludeAllBlocks bool `json:"includeAllBlocks"` + Fields Fields `json:"fields,omitempty"` + StateUpdates []map[string]any `json:"stateUpdates,omitempty"` + StorageDiffs []map[string]any `json:"storageDiffs,omitempty"` + Traces []Trace `json:"traces,omitempty"` + Messages []map[string]any `json:"messages,omitempty"` + Transactions []TransactionWithTrace `json:"transactions,omitempty"` +} + +type Fields struct { + Block BlockField `json:"block"` + StateUpdate StateUpdateField `json:"stateUpdate"` + StorageDiff StorageDiffField `json:"storageDiff"` + Trace TraceField `json:"trace"` + Transaction TransactionField `json:"transaction"` + Event EventField `json:"event"` + Message MessageField `json:"message"` +} + +type BlockField struct { + Timestamp bool `json:"timestamp"` + ParentHash bool `json:"parentHash,omitempty"` + Status bool `json:"status,omitempty"` + NewRoot bool `json:"newRoot,omitempty"` + SequencerAddress bool `json:"sequencerAddress,omitempty"` + StarknetVersion bool `json:"starknetVersion,omitempty"` +} + +type StateUpdateField struct { + NewRoot bool `json:"newRoot"` + OldRoot bool `json:"oldRoot"` + DeprecatedDeclaredClasses bool `json:"deprecatedDeclaredClasses"` + DeclaredClasses bool `json:"declaredClasses"` + DeployedContracts bool `json:"deployedContracts"` + ReplacedClasses bool `json:"replacedClasses"` + Nonces bool `json:"nonces"` +} + +type StorageDiffField struct { + Value bool `json:"value"` +} + +type TraceField struct { + TraceType bool `json:"traceType"` + InvocationType bool `json:"invocationType"` + CallerAddress bool `json:"callerAddress"` + ContractAddress bool `json:"contractAddress"` + CallType bool `json:"callType"` + ClassHash bool `json:"classHash"` + EntryPointSelector bool `json:"entryPointSelector"` + EntryPointType bool `json:"entryPointType"` + RevertReason bool `json:"revertReason"` + Calldata bool `json:"calldata"` + Result bool `json:"result"` +} + +type TransactionField struct { + TransactionHash bool `json:"transactionHash"` + ContractAddress bool `json:"contractAddress"` + EntryPointSelector bool `json:"entryPointSelector"` + Calldata bool `json:"calldata"` + MaxFee bool `json:"maxFee"` + Type bool `json:"type"` + SenderAddress bool `json:"senderAddress"` + Version bool `json:"version"` + Signature bool `json:"signature"` + Nonce bool `json:"nonce"` + ClassHash bool `json:"classHash"` + CompiledClassHash bool `json:"compiledClassHash"` + ContractAddressSalt bool `json:"contractAddressSalt"` + ConstructorCalldata bool `json:"constructorCalldata"` + ActualFee bool `json:"actualFee"` +} + +type EventField struct { + Keys bool `json:"keys"` + Data bool `json:"data"` + TraceAddress bool `json:"traceAddress"` +} + +type MessageField struct { + FromAddress bool `json:"fromAddress"` + ToAddress bool `json:"toAddress"` + Payload bool `json:"payload"` +} + +type Trace struct { + Events bool `json:"events"` +} + +type TransactionWithTrace struct { + Traces bool `json:"traces"` + Events bool `json:"events"` +} + +func NewRequest(fromLevel uint64, toLevel uint64) *Request { + return &Request{ + Type: "starknet", + FromBlock: fromLevel, + ToBlock: toLevel, + IncludeAllBlocks: true, + Fields: Fields{ + Block: BlockField{ + ParentHash: true, + Status: true, + NewRoot: true, + Timestamp: true, + SequencerAddress: true, + StarknetVersion: true, + }, + StateUpdate: StateUpdateField{ + NewRoot: true, + OldRoot: true, + DeprecatedDeclaredClasses: true, + DeclaredClasses: true, + DeployedContracts: true, + ReplacedClasses: true, + Nonces: true, + }, + StorageDiff: StorageDiffField{ + Value: true, + }, + Trace: TraceField{ + TraceType: true, + InvocationType: true, + CallerAddress: true, + ContractAddress: true, + CallType: true, + ClassHash: true, + EntryPointSelector: true, + EntryPointType: true, + RevertReason: true, + Calldata: true, + Result: true, + }, + Transaction: TransactionField{ + TransactionHash: true, + ContractAddress: true, + EntryPointSelector: true, + Calldata: true, + MaxFee: true, + Type: true, + SenderAddress: true, + Version: true, + Signature: true, + Nonce: true, + ClassHash: true, + CompiledClassHash: true, + ContractAddressSalt: true, + ConstructorCalldata: true, + ActualFee: true, + }, + Event: EventField{ + Keys: true, + Data: true, + TraceAddress: true, + }, + Message: MessageField{ + FromAddress: true, + ToAddress: true, + Payload: true, + }, + }, + StateUpdates: []map[string]any{ + {}, + }, + StorageDiffs: []map[string]any{ + {}, + }, + Traces: []Trace{ + {Events: true}, + }, + Messages: []map[string]any{ + {}, + }, + Transactions: []TransactionWithTrace{ + {Traces: true, Events: true}, + }, + } +} + +func NewSimpleRequest(level uint64) *Request { + return &Request{ + Type: "starknet", + FromBlock: level, + IncludeAllBlocks: true, + } +} diff --git a/pkg/indexer/subsquid/receiver/api/response.go b/pkg/indexer/subsquid/receiver/api/response.go new file mode 100644 index 0000000..91f4817 --- /dev/null +++ b/pkg/indexer/subsquid/receiver/api/response.go @@ -0,0 +1,115 @@ +package api + +type SqdBlockResponse struct { + Header BlockHeader `json:"header"` + Transactions []Transaction `json:"transactions,omitempty"` + Traces []TraceResponse `json:"traces,omitempty"` + Events []Event `json:"events,omitempty"` + Messages []Message `json:"messages,omitempty"` + StateUpdates []StateUpdate `json:"state_updates,omitempty"` + StorageDiffs []StorageDiff `json:"storage_diffs,omitempty"` +} + +type BlockHeader struct { + Number uint64 `example:"321" json:"number"` + Hash string `example:"0x44529f2c44d9113e0ba4e53cb6e84f425ec186cda27545827b5a72d5540bfdc" json:"hash"` + ParentHash string `example:"0x44529f2c44d9113e0ba4e53cb6e84f425ec186cda27545827b5a72d5540bfdc" json:"parentHash"` + Status string `example:"ACCEPTED_ON_L1" json:"status"` + NewRoot string `example:"0x44529f2c44d9113e0ba4e53cb6e84f425ec186cda27545827b5a72d5540bfdc" json:"newRoot"` + Timestamp int64 `example:"1641950335" json:"timestamp"` + SequencerAddress string `example:"0x44529f2c44d9113e0ba4e53cb6e84f425ec186cda27545827b5a72d5540bfdc" json:"sequencerAddress"` + StarknetVersion string `json:"starknetVersion"` +} + +type Transaction struct { + TransactionIndex uint `example:"0" json:"transactionIndex"` + TransactionHash string `example:"0x794fae89c8c4b8f5f77a4996948d2547740f90e54bb4a5cc6119a7c70eca42c" json:"transactionHash"` + ContractAddress *string `example:"0x1cee8364383aea317eefc181dbd8732f1504fd4511aed58f32c369dd546da0d" json:"contractAddress"` + EntryPointSelector *string `example:"0x317eb442b72a9fae758d4fb26830ed0d9f31c8e7da4dbff4e8c59ea6a158e7f" json:"entryPointSelector"` + Calldata *[]string `json:"calldata"` + MaxFee *string `example:"0x0" json:"maxFee"` + Type string `example:"INVOKE" json:"type"` + SenderAddress *string `json:"senderAddress"` + Version string `example:"0x0" json:"version"` + Signature *[]string `json:"signature"` + Nonce *uint64 `json:"nonce"` + ClassHash *string `json:"classHash"` + CompiledClassHash *string `json:"compiledClassHash"` + ContractAddressSalt *string `json:"contractAddressSalt"` + ConstructorCalldata *[]string `json:"constructorCalldata"` + ActualFee ActualFee `json:"actualFee"` +} + +type ActualFee struct { + Amount string `json:"amount"` + Unit string `json:"unit"` +} + +type TraceResponse struct { + TransactionIndex uint `json:"transactionIndex"` + TraceAddress []int `json:"traceAddress"` + TraceType string `json:"traceType"` + InvocationType string `json:"invocationType"` + CallerAddress string `json:"callerAddress"` + ContractAddress string `json:"contractAddress"` + CallType *string `json:"callType"` + ClassHash *string `json:"classHash"` + EntryPointSelector *string `json:"entryPointSelector"` + EntryPointType *string `json:"entryPointType"` + RevertReason *string `json:"revertReason"` + Calldata []string `json:"calldata"` + Result []string `json:"result"` +} + +type Event struct { + TransactionIndex uint `json:"transactionIndex"` + EvenIndex uint `json:"eventIndex"` + Keys []string `json:"keys"` + Data []string `json:"data"` + TraceAddress []int `json:"traceAddress"` +} + +type Message struct { + TransactionIndex uint `json:"transactionIndex"` + TraceAddress []int `json:"traceAddress"` + Order uint `json:"order"` + FromAddress *string `json:"fromAddress"` + ToAddress string `json:"toAddress"` + Payload []string `json:"payload"` +} + +type StateUpdate struct { + NewRoot string `json:"newRoot"` + OldRoot string `json:"oldRoot"` + DeprecatedDeclaredClasses []string `json:"deprecatedDeclaredClasses"` + DeclaredClasses []DeclaredClass `json:"declaredClasses"` + DeployedContracts []DeployedContract `json:"deployedContracts"` + ReplacedClasses []ReplacedClass `json:"replacedClasses"` + Nonces []Nonce `json:"nonces"` +} + +type DeclaredClass struct { + ClassHash string `json:"class_hash"` + CompiledClassHash string `json:"compiled_class_hash"` +} + +type Nonce struct { + ContractAddress string `json:"contract_address"` + Nonce string `json:"nonce"` +} + +type ReplacedClass struct { + ContractAddress string `json:"contract_address"` + ClassHash string `json:"class_hash"` +} + +type DeployedContract struct { + Address string `json:"address"` + ClassHash string `json:"class_hash"` +} + +type StorageDiff struct { + Address string `json:"address"` + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/pkg/indexer/subsquid/receiver/receiver.go b/pkg/indexer/subsquid/receiver/receiver.go new file mode 100644 index 0000000..68bc02e --- /dev/null +++ b/pkg/indexer/subsquid/receiver/receiver.go @@ -0,0 +1,268 @@ +package receiver + +import ( + "context" + starknetData "github.com/dipdup-io/starknet-go-api/pkg/data" + "github.com/dipdup-io/starknet-indexer/internal/storage" + rcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" + "github.com/dipdup-io/workerpool" + "github.com/dipdup-net/indexer-sdk/pkg/modules" + "github.com/pkg/errors" + "sync" + "time" + + "github.com/dipdup-io/starknet-indexer/pkg/indexer/config" + ddConfig "github.com/dipdup-net/go-lib/config" +) + +type BlocksToWorker struct { + From uint64 + To uint64 + WorkerURL string +} + +type GetIndexerHeight func() uint64 + +const ( + BlocksOutput = "blocks" + HeadOutput = "head_level" + StopOutput = "stop" +) + +type Receiver struct { + modules.BaseModule + api *api.Subsquid + nodeApi rcvr.API + startLevel uint64 + level uint64 + threadsCount int + blocks chan *api.SqdBlockResponse + getIndexerHeight GetIndexerHeight + pool *workerpool.Pool[BlocksToWorker] + processing map[uint64]struct{} + processingMx *sync.Mutex + result chan rcvr.Result + timeout time.Duration + mx *sync.RWMutex +} + +// New - +func New(cfg config.Config, + ds map[string]ddConfig.DataSource, + startLevel uint64, + threadsCount int, + getIndexerHeight GetIndexerHeight, +) (*Receiver, error) { + dsCfg, ok := ds[cfg.Datasource] + if !ok { + return nil, errors.Errorf("unknown datasource name: %s", cfg.Datasource) + } + + nodeCfg, ok := ds["node"] + if !ok { + return nil, errors.Errorf("can't access node datasource: %s", cfg.Datasource) + } + + receiver := &Receiver{ + BaseModule: modules.New("sqd receiver"), + startLevel: startLevel, + getIndexerHeight: getIndexerHeight, + threadsCount: threadsCount, + api: api.NewSubsquid(dsCfg), + nodeApi: rcvr.NewNode(nodeCfg), + blocks: make(chan *api.SqdBlockResponse, cfg.ThreadsCount*10), + result: make(chan rcvr.Result, cfg.ThreadsCount*2), + processing: make(map[uint64]struct{}), + processingMx: new(sync.Mutex), + timeout: time.Duration(cfg.Timeout) * time.Second, + mx: new(sync.RWMutex), + } + + if receiver.timeout == 0 { + receiver.timeout = 10 * time.Second + } + + receiver.CreateOutput(BlocksOutput) + receiver.CreateOutput(HeadOutput) + receiver.CreateOutput(StopOutput) + + receiver.pool = workerpool.NewPool(receiver.worker, threadsCount) + return receiver, nil +} + +// Start - +func (r *Receiver) Start(ctx context.Context) { + r.Log.Info().Msg("starting subsquid receiver...") + level := r.getIndexerHeight() + if r.startLevel > level { + level = r.startLevel + } + + r.setLevel(level) + + r.pool.Start(ctx) + r.G.GoCtx(ctx, r.sync) + r.G.GoCtx(ctx, r.sequencer) +} + +// Close - +func (r *Receiver) Close() error { + r.Log.Info().Msg("closing...") + r.G.Wait() + + if err := r.pool.Close(); err != nil { + return err + } + + close(r.result) + return nil +} + +func (r *Receiver) checkQueue(ctx context.Context) bool { + for r.pool.QueueSize() >= r.threadsCount { + select { + case <-ctx.Done(): + return true + default: + time.Sleep(time.Millisecond * 10) + } + } + + return false +} + +// QueueSize - +func (r *Receiver) QueueSize() int { + return r.pool.QueueSize() +} + +// AddTask - +func (r *Receiver) AddTask(blocksRange BlocksToWorker) { + r.processingMx.Lock() + defer r.processingMx.Unlock() + + if _, ok := r.processing[blocksRange.From]; ok { + return + } + + r.pool.AddTask(blocksRange) + r.processing[blocksRange.From] = struct{}{} +} + +// Results - +func (r *Receiver) Results() <-chan rcvr.Result { + return r.result +} + +// GetResults - +func (r *Receiver) GetResults() chan rcvr.Result { + return r.result +} + +// GetClass - +func (r *Receiver) GetClass(ctx context.Context, hash string) (starknetData.Class, error) { + requestCtx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + return r.nodeApi.GetClass(requestCtx, hash) +} + +// Head - +func (r *Receiver) Head(ctx context.Context) (uint64, error) { + requestCtx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + return r.api.GetHead(requestCtx) +} + +// GetBlockStatus - +func (r *Receiver) GetBlockStatus(ctx context.Context, height uint64) (storage.Status, error) { + requestCtx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + return r.nodeApi.GetBlockStatus(requestCtx, height) +} + +func (r *Receiver) GetSqdWorkerRanges(ctx context.Context, fromLevel, height uint64) ([]BlocksToWorker, error) { + r.Log.Info(). + Uint64("head", height). + Msg("retrieving subsquid workers...") + + result := make([]BlocksToWorker, 0) + currentLevel := fromLevel + + for { + workerUrl, err := r.api.GetWorkerUrl(ctx, currentLevel) + if err != nil { + return nil, err + } + + blankBlocks, err := r.api.GetBlankBlocks(ctx, currentLevel, workerUrl) + if err != nil { + return nil, err + } + + lastBlock := blankBlocks[len(blankBlocks)-1] + + workerSegment := BlocksToWorker{ + From: blankBlocks[0].Header.Number, + To: lastBlock.Header.Number, + WorkerURL: workerUrl, + } + if workerSegment.To > height { + workerSegment.To = height + } + + result = append(result, workerSegment) + + if lastBlock.Header.Number >= height { + break + } + + currentLevel = lastBlock.Header.Number + 1 + + r.Log.Info(). + Uint64("from level", workerSegment.From). + Uint64("to level", workerSegment.To). + Msg("retrieved worker for blocks") + } + + return result, nil +} + +func (r *Receiver) SplitWorkerRanger(workerRanges []BlocksToWorker) []BlocksToWorker { + var result []BlocksToWorker + batchSize := uint64(200) + + for _, worker := range workerRanges { + for start := worker.From; start <= worker.To; start += batchSize { + end := start + batchSize - 1 + if end > worker.To { + end = worker.To + } + + result = append(result, BlocksToWorker{ + From: start, + To: end, + WorkerURL: worker.WorkerURL, + }) + } + } + + return result +} + +func (r *Receiver) Level() uint64 { + r.mx.RLock() + defer r.mx.RUnlock() + + return r.level +} + +func (r *Receiver) setLevel(level uint64) { + r.mx.Lock() + defer r.mx.Unlock() + + r.level = level +} diff --git a/pkg/indexer/subsquid/receiver/sequencer.go b/pkg/indexer/subsquid/receiver/sequencer.go new file mode 100644 index 0000000..538a402 --- /dev/null +++ b/pkg/indexer/subsquid/receiver/sequencer.go @@ -0,0 +1,38 @@ +package receiver + +import ( + "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" +) + +func (r *Receiver) sequencer(ctx context.Context) { + orderedBlocks := map[uint64]*api.SqdBlockResponse{} + currentBlock := r.Level() + if currentBlock != 0 { + currentBlock += 1 + } + + for { + select { + case <-ctx.Done(): + return + case block, ok := <-r.blocks: + if !ok { + r.Log.Warn().Msg("can't read message from input, it was drained and closed") + r.MustOutput(StopOutput).Push(struct{}{}) + return + } + orderedBlocks[block.Header.Number] = block + + b, ok := orderedBlocks[currentBlock] + for ok { + r.MustOutput(BlocksOutput).Push(b) + r.setLevel(currentBlock) + delete(orderedBlocks, currentBlock) + currentBlock += 1 + + b, ok = orderedBlocks[currentBlock] + } + } + } +} diff --git a/pkg/indexer/subsquid/receiver/sync.go b/pkg/indexer/subsquid/receiver/sync.go new file mode 100644 index 0000000..957b432 --- /dev/null +++ b/pkg/indexer/subsquid/receiver/sync.go @@ -0,0 +1,51 @@ +package receiver + +import ( + "context" + "github.com/rs/zerolog/log" +) + +func (r *Receiver) sync(ctx context.Context) { + head, err := r.Head(ctx) + if err != nil { + return + } + r.MustOutput(HeadOutput).Push(head) + + if head < r.getIndexerHeight() { + log.Warn(). + Uint64("indexer_height", r.getIndexerHeight()). + Uint64("node_height", head). + Msg("rollback detected by block height") + } + + r.Log.Info(). + Uint64("indexer_block", r.getIndexerHeight()). + Uint64("node_block", head). + Msg("syncing...") + + startLevel := r.startLevel + if startLevel < r.getIndexerHeight() { + startLevel = r.getIndexerHeight() + if r.getIndexerHeight() > 0 { + startLevel += 1 + } + } + + blocksToWorker, err := r.GetSqdWorkerRanges(ctx, startLevel, head) + if err != nil { + return + } + + for _, blockRange := range r.SplitWorkerRanger(blocksToWorker) { + select { + case <-ctx.Done(): + return + default: + if r.checkQueue(ctx) { + return + } + r.AddTask(blockRange) + } + } +} diff --git a/pkg/indexer/subsquid/receiver/worker.go b/pkg/indexer/subsquid/receiver/worker.go new file mode 100644 index 0000000..f294203 --- /dev/null +++ b/pkg/indexer/subsquid/receiver/worker.go @@ -0,0 +1,40 @@ +package receiver + +import ( + "context" + "github.com/rs/zerolog/log" +) + +func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) { + from := blockRange.From + var allBlocksDownloaded bool + + for !allBlocksDownloaded { + select { + case <-ctx.Done(): + return + default: + blocks, err := r.api.GetBlocks(ctx, from, blockRange.To, blockRange.WorkerURL) + if err != nil { + log.Err(err). + Uint64("fromLevel", from). + Uint64("toLevel", blockRange.To). + Str("worker url", blockRange.WorkerURL). + Msg("loading blocks error") + return + } + + lastBlock := blocks[len(blocks)-1] + + for _, block := range blocks { + r.blocks <- block + } + + if lastBlock.Header.Number == blockRange.To { + allBlocksDownloaded = true + } else { + from = lastBlock.Header.Number + 1 + } + } + } +}