Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
933979f
Tagalign
k-karuna Dec 11, 2024
45bf456
Fetching data from Subsquid
k-karuna Dec 12, 2024
0958621
Working receiver
k-karuna Dec 16, 2024
31f7300
subsquid receiver allocated to separate module
k-karuna Dec 17, 2024
5582148
Fixed config datasource options
k-karuna Dec 17, 2024
03a4417
Sequencer
k-karuna Dec 17, 2024
b9395a2
Added splitting block ranged for smaller chunks to download; Finalize…
k-karuna Dec 19, 2024
6aa2dff
Using base module logger in receiver
k-karuna Dec 19, 2024
46937ab
Added Adapter module
k-karuna Dec 19, 2024
29a948c
Added transactions converting
k-karuna Dec 23, 2024
9e1b932
Convert traces almost done
k-karuna Jan 28, 2025
22fd394
Block traces done without events; Tested and compared with data from …
k-karuna Jan 29, 2025
430c175
Added events and messages; Traces done except correct ValidateInvocat…
k-karuna Jan 31, 2025
be68718
Traces done with all invocation types
k-karuna Feb 11, 2025
8ade228
State updates & storage diffs
k-karuna Feb 13, 2025
3467206
Renamed state_update file
k-karuna Feb 14, 2025
81a381e
Added IReceiver interface. Working indexing
k-karuna Feb 17, 2025
9d052ed
Rollback code sequence
k-karuna Feb 17, 2025
68f85f3
Loops optimizations
k-karuna Feb 17, 2025
db8502a
Merge branch 'master' into feat/subsquid-datasource
k-karuna Feb 21, 2025
d9f782c
Merge vs master
k-karuna Feb 21, 2025
b99dba5
Refactor: transferred commonReceiver type check from getNewBlocks to …
k-karuna Feb 21, 2025
e0261e4
Refactor: added stringSliceToFeltSlice helper func
k-karuna Feb 21, 2025
50ebd58
Added context setting to fastshot client
k-karuna Feb 21, 2025
280d414
Small refactor
k-karuna Feb 21, 2025
f27616c
Decreased SQD receiver threads count to 2; Fixed converting decimal t…
k-karuna Feb 25, 2025
7b2e27a
Added swapping receivers feature, after subsquid reached his head
k-karuna Feb 27, 2025
d6d729b
Added comment
k-karuna Feb 27, 2025
5641eed
Linter
k-karuna Feb 27, 2025
ef3e5ca
Merge branch 'master' into feat/subsquid-datasource
k-karuna Feb 27, 2025
8bd3d06
Added ActualFee field other necessary fields. Compared data on first …
k-karuna Mar 13, 2025
15de986
Working datasource replacing with node, after head achieving
k-karuna Mar 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ POSTGRES_DB=starknet
POSTGRES_PASSWORD=<TYPE_SOMETHING_STRONG> # REQUIRED
STARKNET_NODE_URL=<URL_HERE> # REQUIRED if INDEXER_DATASOURCE=node
NODE_APIKEY=<API_KEY_FROM_NODE_PROVIDER> # REQUIRED if your node provider has api key. It's api key.
NODE_HEADER_APIKEY=<HEADER_NAME> # REQUIRED if your node provider has api key. It's header name.
NODE_HEADER_APIKEY=<HEADER_NAME> # REQUIRED if your node provider has api key. It's header name.
STARKNET_SUBSQUID_URL=<URL_HERE> # REQUIRED if INDEXER_DATASOURCE=subsquid
3 changes: 3 additions & 0 deletions build/dipdup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ datasources:
fallback:
url: ${STARKNET_FALLBACK_NODE_URL}
rps: ${STARKNET_FALLBACK_NODE_RPS:-1}
subsquid:
url: ${STARKNET_SUBSQUID_URL}
rps: ${STARKNET_SUBSQUID_RPS:-5}

database:
kind: postgres
Expand Down
2 changes: 1 addition & 1 deletion cmd/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
config.Config `yaml:",inline"`
LogLevel string `yaml:"log_level" validate:"omitempty,oneof=debug trace info warn error fatal panic"`
Indexer indexerConfig.Config `yaml:"indexer"`
GRPC *sdkGrpc.ServerConfig `yaml:"grpc" validate:"required"`
GRPC *sdkGrpc.ServerConfig `yaml:"grpc" validate:"required"`
}

// Substitute -
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
hasura:
image: hasura/graphql-engine:v2.28.0
ports:
- 127.0.0.1:8080:8080
- "127.0.0.1:8080:8080"
restart: always
environment:
- HASURA_GRAPHQL_DATABASE_URL=postgres://${POSTGRES_USER:-dipdup}:${POSTGRES_PASSWORD:-changeme}@${HASURA_POSTGRES_HOST:-db}:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-starknet}
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ go 1.23.5
require (
github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10
github.com/dipdup-io/workerpool v0.0.4
github.com/dipdup-net/go-lib v0.3.3
github.com/dipdup-net/indexer-sdk v0.0.4
github.com/dipdup-net/go-lib v0.3.6
github.com/dipdup-net/indexer-sdk v0.0.5
github.com/go-testfixtures/testfixtures/v3 v3.9.0
github.com/goccy/go-json v0.10.2
github.com/karlseguin/ccache/v2 v2.0.8
github.com/lib/pq v1.10.9
github.com/opus-domini/fast-shot v1.1.4
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.30.0
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/uptrace/bun v1.1.14
go.uber.org/mock v0.2.0
google.golang.org/grpc v1.58.3
Expand Down Expand Up @@ -110,7 +111,7 @@ require (
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
20 changes: 12 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10 h1:5QbKq
github.com/dipdup-io/starknet-go-api v0.0.0-20250221100402-18cfac749c10/go.mod h1:AY8SrbR86x/TYipue99eZZBN0VJpfoMovp451bcHF3M=
github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s=
github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8=
github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4=
github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U=
github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg=
github.com/dipdup-net/go-lib v0.3.6 h1:ctas0AYDgN8gfKLvrIgRsMHqvH6wwmKJaNoEWA8YtrM=
github.com/dipdup-net/go-lib v0.3.6/go.mod h1:UibJawy7ILP3jD+vG3ebNiRjv05+G6Wuv2HmGqaadTA=
github.com/dipdup-net/indexer-sdk v0.0.5 h1:BcI+R+hRO7uDMVrp3S1WzSSNPdC+t6+nrmFUxua8aYs=
github.com/dipdup-net/indexer-sdk v0.0.5/go.mod h1:v4j99EnJs2T6CheLYCcD095WHAb7RR56JJNEAWJv0DA=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v25.0.6+incompatible h1:5cPwbwriIcsua2REJe8HqQV+6WlWc1byg2QSXzBxBGg=
Expand Down Expand Up @@ -233,6 +233,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0=
github.com/opencontainers/image-spec v1.1.0-rc4/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8=
github.com/opus-domini/fast-shot v1.1.4 h1:xWTO/4JEILjZM/rP6mwiWe/jZyE9+L1G9sC4BsoynAk=
github.com/opus-domini/fast-shot v1.1.4/go.mod h1:BOr2JXHQJhOnYsxyCvFbgBP3BuYCjgh2YfzWKweEL0A=
github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s=
github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
Expand Down Expand Up @@ -270,15 +272,17 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/testcontainers/testcontainers-go v0.22.0 h1:hOK4NzNu82VZcKEB1aP9LO1xYssVFMvlfeuDW9JMmV0=
github.com/testcontainers/testcontainers-go v0.22.0/go.mod h1:k0YiPa26xJCRUbUkYqy5rY6NGvSbVCeUBXCvucscBR4=
github.com/testcontainers/testcontainers-go/modules/postgres v0.22.0 h1:OHVaqu9MRGMSlro9AD5UCfj8XiHwQdhB9thE4vINq+E=
Expand Down Expand Up @@ -341,8 +345,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
12 changes: 6 additions & 6 deletions pkg/indexer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package config

// Config - configuration structure for indexer
type Config struct {
Name string `yaml:"name" validate:"omitempty"`
StartLevel uint64 `yaml:"start_level" validate:"omitempty"`
ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"`
Timeout uint64 `yaml:"timeout" validate:"omitempty"`
Name string `yaml:"name" validate:"omitempty"`
StartLevel uint64 `yaml:"start_level" validate:"omitempty"`
ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"`
Timeout uint64 `yaml:"timeout" validate:"omitempty"`
ClassInterfacesDir string `yaml:"class_interfaces_dir" validate:"required,dir"`
BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"`
Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node"`
BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"`
Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node subsquid"`
}
142 changes: 122 additions & 20 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package indexer
import (
"bytes"
"context"
sqdAdapter "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/adapter"
sqdRcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -32,8 +34,9 @@ const (
type Indexer struct {
modules.BaseModule

cfg config.Config
queue map[uint64]receiver.Result
cfg config.Config
datasource map[string]ddConfig.DataSource
queue map[uint64]receiver.Result

address models.IAddress
blocks models.IBlock
Expand All @@ -52,15 +55,17 @@ type Indexer struct {

state *state
idGenerator *generator.IdGenerator
receiver *receiver.Receiver
receiver receiver.IReceiver
adapter *sqdAdapter.Adapter
statusChecker *statusChecker
rollbackManager models.Rollback

rollback chan struct{}
rollbackRerun chan struct{}
rollbackWait *sync.WaitGroup

txWriteMutex *sync.Mutex
txWriteMutex *sync.Mutex
cancelReceiver context.CancelFunc
}

// New - creates new indexer entity
Expand All @@ -72,6 +77,7 @@ func New(
indexer := &Indexer{
BaseModule: modules.New("indexer"),
cfg: cfg,
datasource: datasource,
queue: make(map[uint64]receiver.Result),
stateRepo: storage.State,
address: storage.Address,
Expand All @@ -93,11 +99,43 @@ func New(
txWriteMutex: new(sync.Mutex),
rollbackWait: new(sync.WaitGroup),
}
rcvr, err := receiver.NewReceiver(cfg, datasource)
if err != nil {
return nil, err

switch cfg.Datasource {
case "subsquid":
sqdReceiver, err := sqdRcvr.New(
cfg,
datasource,
cfg.StartLevel,
2,
func() uint64 {
return indexer.state.Height()
},
)
if err != nil {
return nil, err
}

indexer.receiver = sqdReceiver
indexer.adapter = sqdAdapter.New(sqdReceiver.GetResults())

if err = indexer.adapter.AttachTo(sqdReceiver, sqdRcvr.BlocksOutput, sqdAdapter.BlocksInput); err != nil {
return nil, errors.Wrap(err, "while attaching adapter to receiver")
}
if err = indexer.adapter.AttachTo(sqdReceiver, sqdRcvr.HeadOutput, sqdAdapter.HeadInput); err != nil {
return nil, errors.Wrap(err, "while attaching adapter to receiver")
}
indexer.CreateInputWithCapacity(InputStopSubsquid, 1)
if err = indexer.AttachTo(indexer.adapter, sqdAdapter.HeadAchieved, InputStopSubsquid); err != nil {
return nil, errors.Wrap(err, "while attaching indexer to subsquid done channel")
}

default:
rcvr, err := receiver.NewReceiver(cfg, datasource)
if err != nil {
return nil, err
}
indexer.receiver = rcvr
}
indexer.receiver = rcvr

indexer.CreateOutput(OutputBlocks)

Expand Down Expand Up @@ -129,17 +167,26 @@ func New(
// Start -
func (indexer *Indexer) Start(ctx context.Context) {
indexer.Log.Info().Msg("starting indexer...")
receiverCtx, cancel := context.WithCancel(ctx)
indexer.cancelReceiver = cancel

if err := indexer.init(ctx); err != nil {
indexer.Log.Err(err).Msg("state initializing error")
return
}

indexer.receiver.Start(ctx)
indexer.receiver.Start(receiverCtx)
indexer.statusChecker.Start(receiverCtx)

indexer.statusChecker.Start(ctx)
switch indexer.cfg.Datasource {
case "subsquid":
indexer.adapter.Start(receiverCtx)
indexer.G.GoCtx(ctx, indexer.listenStopSubsquid)
default:
indexer.G.GoCtx(ctx, indexer.sync)
}

indexer.G.GoCtx(ctx, indexer.saveBlocks)
indexer.G.GoCtx(ctx, indexer.sync)
}

// Name -
Expand All @@ -162,7 +209,6 @@ func (indexer *Indexer) Close() error {
if err := indexer.receiver.Close(); err != nil {
return err
}

close(indexer.rollback)
close(indexer.rollbackRerun)
return nil
Expand Down Expand Up @@ -201,8 +247,8 @@ func (indexer *Indexer) checkQueue(ctx context.Context) bool {
return false
}

func (indexer *Indexer) getNewBlocks(ctx context.Context) error {
head, err := indexer.receiver.Head(ctx)
func (indexer *Indexer) getNewBlocks(ctx context.Context, commonReceiver *receiver.Receiver) error {
head, err := commonReceiver.Head(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -242,7 +288,7 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error {
if indexer.checkQueue(ctx) {
return nil
}
indexer.receiver.AddTask(height)
commonReceiver.AddTask(height)
}
}

Expand All @@ -267,7 +313,12 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error {
}

func (indexer *Indexer) sync(ctx context.Context) {
if err := indexer.getNewBlocks(ctx); err != nil {
commonReceiver, ok := indexer.receiver.(*receiver.Receiver)
if !ok {
log.Panic().Msg("incorrect receiver type")
return
}
if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}

Expand All @@ -281,11 +332,11 @@ func (indexer *Indexer) sync(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
if err := indexer.getNewBlocks(ctx); err != nil {
if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}
case <-indexer.rollbackRerun:
if err := indexer.getNewBlocks(ctx); err != nil {
if err := indexer.getNewBlocks(ctx, commonReceiver); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}
}
Expand All @@ -303,7 +354,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) {
case result := <-indexer.receiver.Results():
indexer.queue[result.Block.Height] = result

if indexer.state.Height() == 0 && !zeroBlock {
if indexer.state.Height() == 0 && !zeroBlock && indexer.cfg.StartLevel == 0 {
if data, ok := indexer.queue[0]; ok {
if err := indexer.handleBlock(ctx, data); err != nil {
indexer.Log.Err(err).Msg("handle block")
Expand Down Expand Up @@ -381,7 +432,11 @@ func (indexer *Indexer) makeRollback(ctx context.Context, height uint64) error {
delete(indexer.queue, key)
}

indexer.receiver.Clear()
commonReceiver, ok := indexer.receiver.(*receiver.Receiver)
if !ok {
return errors.Errorf("incorrect receiver type")
}
commonReceiver.Clear()

if err := indexer.Rollback(ctx, height-1); err != nil {
return errors.Wrap(err, "rollback")
Expand Down Expand Up @@ -467,3 +522,50 @@ func (indexer *Indexer) Rollback(ctx context.Context, height uint64) error {

return indexer.rollbackManager.Rollback(ctx, indexer.Name(), height)
}

func (indexer *Indexer) listenStopSubsquid(ctx context.Context) {
input := indexer.MustInput(InputStopSubsquid)

for {
select {
case <-ctx.Done():
return
case headLevel, ok := <-input.Listen():
if !ok {
indexer.Log.Warn().Msg("can't read message from input, it was drained and closed")
if err := indexer.Close(); err != nil {
return
}
}

for indexer.state.Height() != headLevel {
time.Sleep(time.Millisecond * 10)
}

indexer.cancelReceiver()
if err := indexer.receiver.Close(); err != nil {
return
}
if err := indexer.adapter.Close(); err != nil {
return
}
if err := indexer.statusChecker.Close(); err != nil {
return
}

indexer.cfg.Datasource = "node"
rcvr, err := receiver.NewReceiver(indexer.cfg, indexer.datasource)
if err != nil {
indexer.Log.Error().Msg("can't create node receiver")
return
}
indexer.receiver = rcvr
indexer.statusChecker.SetReceiver(rcvr)

indexer.Log.Info().Msgf("data source has been replaced with node, starting...")
indexer.receiver.Start(ctx)
indexer.statusChecker.Start(ctx)
indexer.G.GoCtx(ctx, indexer.sync)
}
}
}
Loading
Loading