Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stash/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"@typescript-eslint/eslint-plugin": "^8.29.1",
"@typescript-eslint/parser": "^8.29.1",
"@vitest/coverage-v8": "^3.1.1",
"benny": "^3.7.1",
"concurrently": "7.4.0",
"eslint": "^9.24.0",
"eslint-config-prettier": "^10.1.1",
Expand Down
5 changes: 4 additions & 1 deletion stash/src/processing/PoolRatesProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ export const initService = async () => {
await processRates()
}

const processRates = async () => {
export const processRates = async (): Promise<Map<number, number>> => {
const head = (await chainStore.getLatest()).timestamp
const assets = await chainStore.getAssets()
const latestProcessed = new Map<number, number>()
logger.info(`PoolRatesService: have assets: ${assets}`)
for (const asset of assets) {
logger.info(`PoolRatesService: processing asset ${asset}`)
await processAsset(asset, head)
latestProcessed.set(asset.id, head)
}
return latestProcessed
}

const processAsset = async (asset: chainStore.Asset, head: number) => {
Expand Down
20 changes: 13 additions & 7 deletions stash/src/processing/PriceProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ export const initService = async (base: number = BASE_TOKEN) => {
await processPrices(base)
}

const processPrices = async (base: number) => {
export const processPrices = async (
base: number,
): Promise<Map<number, number>> => {
// we will price in all assets until head, if an asset can't be priced it is safe to use head as latest
const head = (await chainStore.getLatest()).timestamp
const assets = await getAssets(base)
const pricedIn = [base]
const latestProcessed = new Map<number, number>()

logger.debug(`PriceService: have assets: ${assets}`)
for (const asset of assets) {
// both assets have prices already, we need to traverse twice with base as both
if (pricedIn.includes(asset.pool[0]) && pricedIn.includes(asset.pool[1])) {
logger.debug(
`PriceService: assets ${asset} has both base prices, traverse twice`,
Expand All @@ -36,6 +39,7 @@ const processPrices = async (base: number) => {
// reset the latest a process again with other id as base
await priceStore.saveLatest(asset.id, latest)
await processAsset(asset.pool[1], asset, head)
latestProcessed.set(asset.id, head)
continue
}
// get the unpriced asset id
Expand All @@ -44,17 +48,19 @@ const processPrices = async (base: number) => {
: pricedIn.includes(asset.pool[1])
? asset.pool[0]
: -1
if (id < 0) {

if (id >= 0) {
await processAsset(id, asset, head)
latestProcessed.set(asset.id, head)
pricedIn.push(id)
} else {
logger.debug(
`PriceService: assets ${asset} has no base prices, save latest`,
)
await priceStore.saveLatest(asset.id, head)
continue
}

await processAsset(id, asset, head)
pricedIn.push(id)
}
return latestProcessed
}

const processAsset = async (id: number, asset: Asset, head: number) => {
Expand Down
10 changes: 10 additions & 0 deletions stash/src/repository/ChainRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ export const getPools = async (
})
}

export const removeUnusedKeys = async (poolTimestamps: Map<number, number>) => {
const trx = redis.client.multi()

for (const [poolId, timestamp] of poolTimestamps) {
trx.zremrangebyscore(keyPool(poolId), '-inf', `(${timestamp - 300000}`)
}

await trx.exec()
}

export const getAssets = async (): Promise<Asset[]> => {
const assets = await redis.client.get(KEY_ASSETS)
return _.isNull(assets)
Expand Down
85 changes: 84 additions & 1 deletion stash/src/scraper/BlockScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,68 @@ import { CodecOrArray, toHuman } from '../util/Chain.js'
import logger from '../util/Logger.js'
import { Metrics } from '../util/Metrics.js'

class AsyncBlockBuffer {
private queue: Block[] = []
private readonly bufferSize: number
private readonly api: ApiPromise
private fillInProgress = false
private currentBlockNumber: number

constructor(api: ApiPromise, startBlock: number, bufferSize = 5) {
this.api = api
this.currentBlockNumber = startBlock
this.bufferSize = bufferSize
}

async initialize() {
await this.fillBuffer()
}

async getNextBlock(): Promise<Block | null> {
if (this.queue.length === 0) {
return null
}

const block = this.queue.shift()!
this.fillBuffer()
return block
}

private async fillBuffer() {
if (this.fillInProgress || this.queue.length >= this.bufferSize) {
return
}

this.fillInProgress = true
try {
const blocksToFetch = this.bufferSize - this.queue.length
const fetchPromises: Promise<Block>[] = []

for (let i = 0; i < blocksToFetch; i++) {
const blockNumber = this.currentBlockNumber + i
fetchPromises.push(getBlockByNumber(this.api, blockNumber))
}

const blocks = await Promise.all(fetchPromises)
this.queue.push(...blocks)
this.currentBlockNumber += blocks.length
logger.info(`current block number in buffer: ${this.currentBlockNumber}`)
} catch (error) {
logger.error('Error filling block buffer:', error)
} finally {
this.fillInProgress = false
}
}

isEmpty(): boolean {
return this.queue.length === 0
}

size(): number {
return this.queue.length
}
}

export const BLOCK_TIME = 6000

export interface Block {
Expand Down Expand Up @@ -41,6 +103,8 @@ export const withBlocks = async (
const metrics = new Metrics('BlockScraper')
let last = 0
let current = from
const buffer = new AsyncBlockBuffer(api, from, 5)

const unsub = await api.rpc.chain.subscribeFinalizedHeads(async (head) => {
const headBlock = await getBlockByHash(
api,
Expand All @@ -51,15 +115,34 @@ export const withBlocks = async (
logger.debug(`BlockScraper: new head ${last}`)
})

await buffer.initialize()

// infinite
while (last >= 0) {
if (current >= last) {
await new Promise((f) => setTimeout(f, BLOCK_TIME))
continue
}

store.setBatchMode(current, last)
const block = await getBlockByNumber(api, current)

let stepStart = Date.now()
const block = await buffer.getNextBlock()
if (!block) {
await new Promise((f) => setTimeout(f, BLOCK_TIME))
continue
}
let timings = Date.now() - stepStart
logger.debug(`Fetch block time : ${timings}`)


stepStart = Date.now()
await fn(block)
let processingTime = Date.now() - stepStart

if (block.number % 100 === 0) {
logger.warn(`Block #${block.number} processed in ${processingTime}ms`)
}

metrics.tick()
current++
Expand Down
11 changes: 7 additions & 4 deletions stash/src/scraper/L1LogScraper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ApiPromise } from '@polkadot/api'
import BN from "bn.js";
import { setTimeout } from 'timers/promises'
import { createPublicClient, http, type PublicClientConfig } from 'viem'

Expand Down Expand Up @@ -32,11 +33,12 @@ export const watchDepositAcceptedIntoQueue = async (

while (keepProcessing) {
try {
const toBlock = await publicClient.getBlockNumber()
const latestBlock = await publicClient.getBlockNumber()
let fromBlock = await getLastProcessedBlock(chainName, 'deposit')
if (fromBlock === 0n) {
fromBlock = toBlock
fromBlock = latestBlock
}
const toBlock = fromBlock + 100n < latestBlock ? fromBlock + 100n : latestBlock;
logger.info({
message: `Deposit: chainName: ${chainName}, fromBlock: ${fromBlock}, toBlock: ${toBlock}`,
})
Expand Down Expand Up @@ -104,11 +106,12 @@ export const watchWithdrawalClosed = async (

while (keepProcessing) {
try {
const toBlock = await publicClient.getBlockNumber()
const latestBlock = await publicClient.getBlockNumber()
let fromBlock = await getLastProcessedBlock(chainName, 'withdrawal')
if (fromBlock === 0n) {
fromBlock = toBlock
fromBlock = latestBlock
}
const toBlock = fromBlock + 100n < latestBlock ? fromBlock + 100n : latestBlock;
logger.info({
message: ` Withdrawal: chainName: ${chainName}, fromBlock: ${fromBlock}, toBlock: ${toBlock}`,
})
Expand Down
22 changes: 12 additions & 10 deletions stash/src/scraper/PoolsScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@ const getPools = async (block: Block): Promise<PoolEntry[]> => {

const liquidityTokenIds = (
await block.api.query.xyk.liquidityAssets.entries()
).map(([, tokenId]) => {
return tokenId.toString()
).map(([key, tokenId]) => {
let first = key.args[0][0].toString()
let second = key.args[0][1].toString()
return [`${first}:${second}`, tokenId] as [string, BN]
})

const filteredPools = pools.map(async ([storageKey, poolAssetsAmount]) => {
let mapping = new Map(liquidityTokenIds)
const filteredPools = pools.map(([storageKey, poolAssetsAmount]) => {
const amounts = poolAssetsAmount.toHuman() as [string, string]
const liquidityAssetsInPool = storageKey.args[0].toHuman() as [
string,
string,
]
const liquidityPoolId = await block.api.query.xyk.liquidityAssets([
liquidityAssetsInPool[0].toString(),
liquidityAssetsInPool[1].toString(),
])
const liquidityPoolId = mapping.get(
`${liquidityAssetsInPool[0].toString()}:${liquidityAssetsInPool[1].toString()}`,
)
const humanLiquidityPoolId = toHuman(liquidityPoolId)
const numberLiquidityPoolId = Number(humanLiquidityPoolId)
const entry: PoolEntry = {
Expand All @@ -60,12 +62,12 @@ const getPools = async (block: Block): Promise<PoolEntry[]> => {
return entry
})

const poolEntries = (await Promise.all(filteredPools)).filter(
const poolEntries = filteredPools.filter(
(p) => p.amounts[0].gt(ZERO) && p.amounts[1].gt(ZERO),
)

for (const key of liquidityTokenIds) {
await checkHasAsset(block, key)
for (const [, tokenId] of liquidityTokenIds) {
await checkHasAsset(block, tokenId.toString())
}

return poolEntries
Expand Down
7 changes: 3 additions & 4 deletions stash/src/scraper/SwapScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export const processDataForVolumeHistory = async (
event: Event,
) => {
logger.info('Entered processDataForVolumeHistory')
const totalAmountIn = String((event.data as any).totalAmountIn)
const totalAmountIn = String((event.data as any).totalAmountIn ? (event.data as any).totalAmountIn : (event.data as any).swaps[0].amountIn)
let isFirstSwap = true
// Implementation for processing data for volume history
for (const swap of (event.data as any).swaps) {
Expand Down Expand Up @@ -271,7 +271,7 @@ export const processDataForTVLHistory = async (
event: Event,
) => {
logger.info('Entered processDataForTVLHistory')
const totalAmountIn = String((event.data as any).totalAmountIn)
const totalAmountIn = String((event.data as any).totalAmountIn ? (event.data as any).totalAmountIn : (event.data as any).swaps[0].amountIn)
let isFirstSwap = true
// Implementation for processing data for TVL history
for (const swap of (event.data as any).swaps) {
Expand Down Expand Up @@ -440,8 +440,7 @@ export async function calculateVolume(
): Promise<number> {
try {
const price = await getTokenPrice(tokenId)
console.log('price of the token', price)
if (price == null) {
if (price === null || price === undefined) {
throw new Error(`Token price for token id ${tokenId} is null`)
}
const currentPrice = price.toString()
Expand Down
2 changes: 0 additions & 2 deletions stash/src/scraper/WithdrawalScraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export async function extractExtrinsicHashAndAnAddressFromBlock(
let extrinsic = extinsics[phaseApplyExtrinsic]
extrinsicHash = extrinsic.hash.toString()
address = extrinsic.signer.toString()
console.log('Extrinsic Hash:', extrinsicHash, 'Address:', address)
return { extrinsicHash, address }
} catch (error) {
logger.error('Error extracting extrinsic hash and address:', error)
Expand Down Expand Up @@ -124,7 +123,6 @@ export const startTracingWithdrawal = async (
)
const redisKey = `withdrawal:${extrinsicHash}`
const keyExists = await redis.client.exists(redisKey)
console.log('Key Exists:', keyExists)

const withdrawalData = {
requestId: Number(String(eventData.requestId.id).replace(/,/g, '')),
Expand Down
26 changes: 19 additions & 7 deletions stash/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import 'dotenv/config'

import app from './app.js'
import * as poolRatesService from './processing/PoolRatesProcessorService.js'
import * as priceService from './processing/PriceProcessorService.js'
import * as chainRepository from './repository/ChainRepository.js'
import { BLOCK_TIME } from './scraper/BlockScraper.js'
import * as networkService from './service/NetworkService.js'
import * as blockService from './service/SyncBlockService.js'
import * as syncTransactionsService from './service/SyncTransactionsService.js'
import * as tokenPriceService from './service/TokenPriceService.js'
import * as tokenService from './service/TokenService.js'
import * as xcmService from './service/XcmNetworkService.js'
import * as poolRatesService from './processing/PoolRatesProcessorService.js'
import * as priceService from './processing/PriceProcessorService.js'
import * as store from './repository/ChainRepository.js'
const BASE_TOKEN_ID = 1;
import logger from './util/Logger.js'

// Express Server boot
const server = app.listen(app.get('port'), async () => {
logger.info(
Expand All @@ -28,21 +30,31 @@ const server = app.listen(app.get('port'), async () => {

logger.info('DB initialized')

await tokenPriceService.refreshTokenPrice()

blockService.initService()

syncTransactionsService.initService()

const run = 1
while (run) {
while (true) {
await new Promise((f) => setTimeout(f, BLOCK_TIME * 10))
await poolRatesService.initService()
await priceService.initService()
let processedByPoolRates = await poolRatesService.processRates();
let processedByPriceService = await priceService.processPrices(BASE_TOKEN_ID);
const mergedProcessed = new Map([...processedByPoolRates])
for (const [key, value] of processedByPriceService) {
if (!mergedProcessed.has(key) || value < mergedProcessed.get(key)) {
mergedProcessed.set(key, value)
}
}
await store.removeUnusedKeys(mergedProcessed)
}

})

let isRefreshing = false

const runPeriodically = async () => {
logger.warn("Refreshing prices")
if (isRefreshing) {
return
}
Expand Down
Loading
Loading