diff --git a/database/index.ts b/database/index.ts index cc4d12b..c8d70ea 100644 --- a/database/index.ts +++ b/database/index.ts @@ -9,6 +9,7 @@ import { MigrationRegisteredEvent, MigrationStatus, } from "../interfaces"; +import { DatabaseError } from "../errors"; let db: PostgresJsDatabase | null = null; @@ -41,28 +42,33 @@ export async function initializeDatabaseConnection(): Promise< return db; } -export async function addFractalityTokenMigrations( + +export async function filterAndaddNewFractalityTokenMigrations( migrations: MigrationRegisteredEvent[] ): Promise<{ newMigrations: MigrationRegisteredEvent[]; existingTxs: { txHash: string }[]; }> { if (!db) { - throw new Error("Database not initialized"); + throw new DatabaseError("Database not initialized"); } // Get all transaction hashes from the incoming migrations const incomingTxHashes = migrations.map((m) => m.transactionHash); - - // Check which transactions already exist in the database - const existingTxs = await db - .select({ txHash: schema.fractalityTokenMigrations.transactionHash }) - .from(schema.fractalityTokenMigrations) - .where( - inArray( - schema.fractalityTokenMigrations.transactionHash, - incomingTxHashes - ) - ); + let existingTxs: { txHash: string }[] = []; + try { + // Check which transactions already exist in the database + existingTxs = await db + .select({ txHash: schema.fractalityTokenMigrations.transactionHash }) + .from(schema.fractalityTokenMigrations) + .where( + inArray( + schema.fractalityTokenMigrations.transactionHash, + incomingTxHashes + ) + ); + } catch (error) { + throw new DatabaseError("Error fetching existing migrations from database: " + error); + } // Filter out migrations that already exist const newMigrations = migrations.filter( @@ -72,185 +78,190 @@ export async function addFractalityTokenMigrations( // If there are new migrations, insert them if (newMigrations.length > 0) { - await db.insert(schema.fractalityTokenMigrations).values( - newMigrations.map((migration) => ({ - transactionHash: migration.transactionHash, - migrationContractAddress: migration.migrationContractAddress, - eventName: migration.eventName, - caller: migration.caller, - migrationAddress: migration.migrationAddress, - amount: migration.amount.toString(), - foundAt: new Date().toISOString(), - status: MigrationStatus.FOUND_ON_ARBITRUM, - migratedAt: null, - migratedAmount: null, - })) - ); + try { + await db.insert(schema.fractalityTokenMigrations).values( + newMigrations.map((migration) => ({ + transactionHash: migration.transactionHash, + migrationContractAddress: migration.migrationContractAddress, + eventName: migration.eventName, + caller: migration.caller, + migrationAddress: migration.migrationAddress, + amount: migration.amount.toString(), + foundAt: new Date().toISOString(), + status: MigrationStatus.FOUND_ON_ARBITRUM, + migratedAt: null, + migratedAmount: null, + })) + ); + } catch (error) { + throw new DatabaseError("Error inserting new migrations batch into database: " + error); + } } - - return { newMigrations: newMigrations, existingTxs: existingTxs }; // Optionally return number of new insertions + return { newMigrations: newMigrations, existingTxs: existingTxs }; } -export async function dbCleanup(): Promise { - try { - console.log("Starting database cleanup..."); + export async function dbCleanup(): Promise { + try { + console.log("Starting database cleanup..."); - if (db) { - // Get the underlying postgres connection from drizzle - const client = (db as any).session?.config?.connection; - if (client) { - await client.end(); - console.log("Database connection closed"); + if (db) { + // Get the underlying postgres connection from drizzle + const client = (db as any).session?.config?.connection; + if (client) { + await client.end(); + console.log("Database connection closed"); + } + db = null; } - db = null; - } - console.log("Database cleanup completed"); - } catch (error) { - console.error("Error during database cleanup:", error); - throw error; + console.log("Database cleanup completed"); + } catch (error) { + console.error("Error during database cleanup:", error); + throw error; + } } -} -export async function getUnmigratedFractalityTokenMigrations(): Promise< - TokenMigration[] -> { - if (!db) { - throw new Error("Database not initialized"); - } - try { - const migrations = await db - .select() - .from(schema.fractalityTokenMigrations) - .where( - not( - eq( - schema.fractalityTokenMigrations.status, - MigrationStatus.SENT_TO_HL + export async function getUnmigratedFractalityTokenMigrations(): Promise< + TokenMigration[] + > { + if (!db) { + throw new Error("Database not initialized"); + } + try { + const migrations = await db + .select() + .from(schema.fractalityTokenMigrations) + .where( + not( + eq( + schema.fractalityTokenMigrations.status, + MigrationStatus.SENT_TO_HL + ) ) - ) - ); - return migrations; - } catch (error) { - console.error( - "Error fetching unmigrated fractality token migrations:", - error - ); - throw error; - } -} - -export async function getAllFractalityTokenMigrations(): Promise< - TokenMigration[] -> { - if (!db) { - throw new Error("Database not initialized"); - } - - try { - return await db.select().from(schema.fractalityTokenMigrations); - } catch (error) { - console.error("Error fetching fractality token migrations:", error); - throw error; + ); + return migrations; + } catch (error) { + throw new DatabaseError("Error fetching unmigrated fractality token migrations: " + error); + } } -} -export async function getFractalityTokenMigrationsByAddress( - migrationAddress: string -): Promise { - if (!db) { - throw new Error("Database not initialized"); - } - try { - const migrations = await db - .select() - .from(schema.fractalityTokenMigrations) - .where( - eq(schema.fractalityTokenMigrations.migrationAddress, migrationAddress) - ); + export async function getAllFractalityTokenMigrations(): Promise< + TokenMigration[] + > { + if (!db) { + throw new Error("Database not initialized"); + } - return migrations; - } catch (error) { - console.error("Error fetching migrations by address:", error); - throw error; + try { + return await db.select().from(schema.fractalityTokenMigrations); + } catch (error) { + console.error("Error fetching fractality token migrations:", error); + throw error; + } } -} -export async function getFractalityTokenMigrationsByMigrationContractAddress( - migrationContractAddress: string -): Promise { - if (!db) { - throw new Error("Database not initialized"); - } - try { - const migrations = await db - .select() - .from(schema.fractalityTokenMigrations) - .where( - eq( - schema.fractalityTokenMigrations.migrationContractAddress, - migrationContractAddress - ) - ); + export async function getFractalityTokenMigrationsByAddress( + migrationAddress: string + ): Promise { + if (!db) { + throw new Error("Database not initialized"); + } + try { + const migrations = await db + .select() + .from(schema.fractalityTokenMigrations) + .where( + eq(schema.fractalityTokenMigrations.migrationAddress, migrationAddress) + ); - return migrations; - } catch (error) { - console.error( - "Error fetching migrations by migration contract address:", - error - ); - throw error; + return migrations; + } catch (error) { + console.error("Error fetching migrations by address:", error); + throw error; + } } -} -export async function setHLMigrationStatus( - migrationHash: string, - status: MigrationStatus -) { - try { + export async function getFractalityTokenMigrationsByMigrationContractAddress( + migrationContractAddress: string + ): Promise { if (!db) { throw new Error("Database not initialized"); } - await db - .update(schema.fractalityTokenMigrations) - .set({ status: status }) - .where( - eq(schema.fractalityTokenMigrations.transactionHash, migrationHash) + try { + const migrations = await db + .select() + .from(schema.fractalityTokenMigrations) + .where( + eq( + schema.fractalityTokenMigrations.migrationContractAddress, + migrationContractAddress + ) + ); + + return migrations; + } catch (error) { + console.error( + "Error fetching migrations by migration contract address:", + error ); - } catch (error) { - console.error(`ERROR setting HL migration status. Error: ${error}`); - throw error; + throw error; + } } -} -export async function finalizeHlMigrations(migrations: HLMigration[]) { - if (!db) { - throw new Error("Database not initialized"); - } - try { - for (const migration of migrations) { - console.log( - `Setting HL migration for ${migration.originalTransactionHash}` - ); + export async function setHLMigrationStatus( + migrationHash: string, + status: MigrationStatus + ) { + try { + if (!db) { + throw new Error("Database not initialized"); + } await db .update(schema.fractalityTokenMigrations) - .set({ - status: MigrationStatus.SENT_TO_HL, - migratedAt: new Date().toISOString(), - migratedAmount: migration.hlTokenAmount, - }) + .set({ status: status }) .where( - eq( - schema.fractalityTokenMigrations.transactionHash, - migration.originalTransactionHash - ) + eq(schema.fractalityTokenMigrations.transactionHash, migrationHash) ); + } catch (error) { + console.error(`ERROR setting HL migration status. Error: ${error}`); + throw error; } - } catch (error) { - console.error(`FATAL ERROR updating HL migrations. Error: ${error}`); - throw error; } -} -export type TokenMigration = - typeof schema.fractalityTokenMigrations.$inferSelect; + export async function finalizeHlMigrations(migrations: HLMigration[]) { + const successes = []; + const failures = []; + if (!db) { + throw new Error("Database not initialized"); + } + + for (const migration of migrations) { + try { + console.log( + `Setting HL migration for ${migration.originalTransactionHash}` + ); + await db + .update(schema.fractalityTokenMigrations) + .set({ + status: MigrationStatus.SENT_TO_HL, + migratedAt: new Date().toISOString(), + migratedAmount: migration.hlTokenAmount, + }) + .where( + eq( + schema.fractalityTokenMigrations.transactionHash, + migration.originalTransactionHash + ) + ); + successes.push(migration); + } catch (error) { + failures.push(migration); + //THhis needs to shut down the service, as the next run could cause a double send. + console.error(`ERROR updating HL ${migration} migrations. Error: ${error}`); + } + } + return { successes, failures }; + } + + export type TokenMigration = + typeof schema.fractalityTokenMigrations.$inferSelect; diff --git a/errors.ts b/errors.ts new file mode 100644 index 0000000..7207fa4 --- /dev/null +++ b/errors.ts @@ -0,0 +1,54 @@ +import { HLMigration } from "./interfaces"; + +export class FatalFinalizationError extends Error { + failedMigrations: HLMigration[]; + constructor(message: string, failedMigrations: HLMigration[]) { + super(message); + this.name = "FatalError"; + this.failedMigrations = failedMigrations; + } + } + +export class RedisError extends Error { + constructor(message: string) { + super(message); + this.name = "RedisError"; + } + } + +export class DecimalConversionError extends Error { + constructor(message: string) { + super(message); + this.name = "DecimalConversionError"; + } + } + + + export class HyperliquidError extends Error { + constructor(message: string) { + super(message); + this.name = "HyperliquidError"; + } + } + + export class BlockchainConnectionError extends Error { + constructor(message: string) { + super(message); + this.name = "BlockchainConnectionError"; + } + } + + export class DatabaseError extends Error { + constructor(message: string) { + super(message); + this.name = "DatabaseError"; + } + } + + export class MigrationPrepError extends Error { + constructor(message: string) { + super(message); + this.name = "MigrationPrepError"; + } + } + \ No newline at end of file diff --git a/libs/BlockchainConnectionProvider.ts b/libs/BlockchainConnectionProvider.ts index 8ede50f..a20aa68 100644 --- a/libs/BlockchainConnectionProvider.ts +++ b/libs/BlockchainConnectionProvider.ts @@ -13,6 +13,7 @@ import { env } from "../env"; import abi from "../contracts/FractalityTokenMigration.sol.json"; import { MigrationRegisteredEvent } from "../interfaces"; import { MySqlBigInt64BuilderInitial } from "drizzle-orm/mysql-core"; +import { BlockchainConnectionError } from "../errors"; interface BlockchainConnectionProviderOptions { providerUrl: string; y2kTokenMigrationAddress: Address; @@ -80,7 +81,12 @@ export class BlockchainConnectionProvider { }; public async getCurrentBlockNumber(): Promise { - return this._viemClient.getBlockNumber(); + try { + return this._viemClient.getBlockNumber(); + } catch (error) { + throw new BlockchainConnectionError("Error getting current block number: " + error); + } + } public async getArbitrumTokenDecimals(): Promise { @@ -127,6 +133,7 @@ export class BlockchainConnectionProvider { fromBlock: bigint, toBlock: bigint ): Promise { +try{ //This is the current block number if (fromBlock >= toBlock) { throw new Error("from block must be before toBlock"); @@ -161,5 +168,8 @@ export class BlockchainConnectionProvider { }); }); return decodedLogs; + } catch (error) { + throw new BlockchainConnectionError("Error scanning migrations: " + error); + } } } diff --git a/libs/DecimalConversion.ts b/libs/DecimalConversion.ts index 17b37c4..6673cf6 100644 --- a/libs/DecimalConversion.ts +++ b/libs/DecimalConversion.ts @@ -1,9 +1,15 @@ import { ethers } from "ethers"; +import { DecimalConversionError } from "../errors"; export class DecimalConversion { hlTokenDecimals: bigint; arbitrumTokenDecimals: bigint; constructor(hlTokenDecimals: bigint, arbitrumTokenDecimals: bigint) { + if (hlTokenDecimals > arbitrumTokenDecimals) { + throw new DecimalConversionError( + `HL token decimals (${hlTokenDecimals}) must be strictly smaller or equal to Arbitrum token decimals (${arbitrumTokenDecimals}).` + ); + } this.hlTokenDecimals = hlTokenDecimals; this.arbitrumTokenDecimals = arbitrumTokenDecimals; } diff --git a/libs/HyperliquidManager.ts b/libs/HyperliquidManager.ts index fad5691..4db65fc 100644 --- a/libs/HyperliquidManager.ts +++ b/libs/HyperliquidManager.ts @@ -1,6 +1,7 @@ const { Hyperliquid } = require("hyperliquid"); import { setHLMigrationStatus } from "../database"; import { env } from "../env"; +import { HyperliquidError } from "../errors"; import { HLMigration, MigrationStatus } from "../interfaces"; import { DecimalConversion } from "./DecimalConversion"; @@ -33,11 +34,15 @@ export class HyperliquidManager { } async getUserTokenBalances(userAddress: string) { - const balances = await this.hlSdk.info.spot.getSpotClearinghouseState( - userAddress, - false - ); - return balances; + try { + const balances = await this.hlSdk.info.spot.getSpotClearinghouseState( + userAddress, + false + ); + return balances; + } catch (error) { + throw new HyperliquidError("Error getting user token balances: " + error); + } } getTokenDecimals() { @@ -45,7 +50,11 @@ export class HyperliquidManager { } async getTokenInfo(tokenAddress: string) { - return this.hlSdk.info.spot.getTokenDetails(tokenAddress); + try { + return this.hlSdk.info.spot.getTokenDetails(tokenAddress); + } catch (error) { + throw new HyperliquidError("Error getting token info: " + error); + } } //NOTE: the amount here is NOT in wei. It is in decimal representation. @@ -60,7 +69,7 @@ export class HyperliquidManager { console.log("Transfer successful"); } else { console.log("Transfer failed", result.response); - throw new Error("Transfer failed"); + throw new HyperliquidError("Transfer failed: " + result.response); } } //4000 migrations will take aroud 1000$ USDC! diff --git a/libs/PreviousBlockManager.ts b/libs/PreviousBlockManager.ts index 00adcb9..d17379a 100644 --- a/libs/PreviousBlockManager.ts +++ b/libs/PreviousBlockManager.ts @@ -1,41 +1,33 @@ import IORedis from "ioredis"; import { env } from "../env"; +import { RedisOperations } from "../redisOperations/redisOperations"; export class PreviousBlockManager { - private redisConnection: IORedis | null = null; + private redisOperations: RedisOperations; private safetyCushionNumberOfBlocks: bigint; private getCurrentBlock: () => Promise; constructor( - redisConnection: IORedis, + redisOperations: RedisOperations, safetyCushionNumberOfBlocks: bigint, getCurrentBlock: () => Promise ) { - this.redisConnection = redisConnection; + this.redisOperations = redisOperations; this.safetyCushionNumberOfBlocks = safetyCushionNumberOfBlocks; this.getCurrentBlock = getCurrentBlock; } //Returns the block number of the last scan, or the current block number minus the number of blocks back that corresponds to the scan period, as a start. async getFromBlockForScan() { - if (!this.redisConnection) { - throw new Error("Redis connection is not initialized"); - } - const blockNumber = await this.redisConnection.get("lastScanBlockNumber"); + const blockNumber = await this.redisOperations.getLastScanBlockNumber(); if (!blockNumber) { return BigInt(env.BLOCK_START_NUMBER); } else { - return BigInt(blockNumber) - this.safetyCushionNumberOfBlocks; //Add safety cushion to the block number, so we don't miss any blocks, repeated events can be ignored + return blockNumber - this.safetyCushionNumberOfBlocks; //Add safety cushion to the block number, so we don't miss any blocks, repeated events can be ignored } } async setFromBlockForScanToCurrentBlock(): Promise { - if (!this.redisConnection) { - throw new Error("Redis connection is not initialized"); - } const currentBlockNumber = await this.getCurrentBlock(); - await this.redisConnection.set( - "lastScanBlockNumber", - currentBlockNumber.toString() - ); + await this.redisOperations.setLastScanBlockNumber(currentBlockNumber); return currentBlockNumber; } } diff --git a/migrationService.ts b/migrationService.ts index c9f360d..4fe5711 100644 --- a/migrationService.ts +++ b/migrationService.ts @@ -1,11 +1,10 @@ import { PrivateKeyManager } from "./libs/PrivateKeyManager"; -import IORedis from "ioredis"; const privateKeyManager = new PrivateKeyManager(); import { env } from "./env"; import { BlockchainConnectionProvider } from "./libs/BlockchainConnectionProvider"; import { Address } from "viem"; import { - addFractalityTokenMigrations, + filterAndaddNewFractalityTokenMigrations, getUnmigratedFractalityTokenMigrations, finalizeHlMigrations, setHLMigrationStatus, @@ -20,12 +19,15 @@ import { initializeDatabaseConnection } from "./database"; import cron from "node-cron"; import { PreviousBlockManager } from "./libs/PreviousBlockManager"; import { HyperliquidManager } from "./libs/HyperliquidManager"; +import { FatalFinalizationError, MigrationPrepError, RedisError } from "./errors"; +import { RedisOperations } from "./redisOperations/redisOperations"; export async function main(runWithCron: boolean) { await privateKeyManager.init(); await initializeDatabaseConnection(); - const redisConnection = await initRedisConnection(); + const redisOperations = new RedisOperations(); + await redisOperations.initialize(); const hlManager = new HyperliquidManager( true, @@ -44,27 +46,50 @@ export async function main(runWithCron: boolean) { ); const blockManager = new PreviousBlockManager( - redisConnection, + redisOperations, BigInt(env.SAFETY_CUSHION_NUMBER_OF_BLOCKS), () => blockchainConnectionProvider.getCurrentBlockNumber() ); if (runWithCron) { console.log("starting cron job for migrations, running every 5 minutes"); - cron.schedule("* * * * *", async () => { - await coreMigrationService( - blockManager, - blockchainConnectionProvider, - hlManager - ); + const scheduledTask = cron.schedule("* * * * *", async () => { + try { + await coreMigrationService( + blockManager, + blockchainConnectionProvider, + hlManager + ); + } catch (error) { + if (error instanceof FatalFinalizationError) { + scheduledTask.stop(); + } else { + console.log("Error in core migration service, this run will be skipped", error); + } + } }); } else { - await coreMigrationService( - blockManager, - blockchainConnectionProvider, - hlManager - ); + try { + if (await redisOperations.shouldRunAccordingToStopRunningFlag()) { + await coreMigrationService( + blockManager, + blockchainConnectionProvider, + hlManager + ); + } else { + console.log("stopRunning flag is set, not running core migration service"); + return; + } + } catch (error) { + if (error instanceof FatalFinalizationError) { + await redisOperations.setStopRunningFlag(); + } else { + console.log("Error in core migration service, this run will be skipped", error); + } + throw error; + } } + } export async function coreMigrationService( @@ -72,12 +97,16 @@ export async function coreMigrationService( blockchainConnectionProvider: BlockchainConnectionProvider, hlManager: HyperliquidManager ) { + //This, if fails will do a scan from the start block, not a big dea. const fromBlock = await blockManager.getFromBlockForScan(); + + //This gets the current block and sets it in redis. If fails, will bubble up and this run will be skipped. const toBlock = await blockManager.setFromBlockForScanToCurrentBlock(); console.log( `looking for migrations from block ${fromBlock} to block ${toBlock}` ); + //Gets logs from the blockchain. If fails, will bubble up and this run will be skipped. const y2kMigrations = await blockchainConnectionProvider.scanMigrations( env.Y2K_TOKEN_MIGRATION_ADDRESS as Address, fromBlock, @@ -88,81 +117,87 @@ export async function coreMigrationService( fromBlock, toBlock ); + //This is atomic, if it fails, nothing was written and we can try again next time. await addMigrationsToDatabase([...y2kMigrations, ...frctRMigrations]); //get migrations that still have not been sent to hyperliquid + //This includes the ones we added above... as well as those that were not migrated for some reason. + //If this fails, we will skip this run and try again next time. const unmigratedMigrations: TokenMigration[] = await getUnmigratedFractalityTokenMigrations(); //calcualate the amount of tokens to send to hyperliquid + //If all the migrations are not able to be prepped, we will skip this run and try again next time. + //However, I don't see this failing as it's just doing some math. const hlMigrations = await prepForHLMigration( hlManager, unmigratedMigrations ); console.log("hlMigrations", hlMigrations); + //This fails gracefully, the ones we could not send are in the faulures array. const { successes, failures } = await hlManager.sendHLMigrations( hlMigrations ); console.log("successes", successes); console.log("failures", failures); - try { - await finalizeHlMigrations(successes); - } catch (error) { - console.error("FATAL ERROR: Error finalizing HL migrations", error); + let finalizationMaxRetries = 3; + let migrationsToFinalize = successes; + for (const attemptNumber of Array(finalizationMaxRetries).keys()) { + const finalizationResults = await finalizeHlMigrations(migrationsToFinalize); + if (finalizationResults.failures.length === 0) { + break; + } + migrationsToFinalize = finalizationResults.failures; + if (attemptNumber === finalizationMaxRetries - 1) { + throw new FatalFinalizationError( + "FATAL ERROR: Error finalizing HL migrations. The following migration need to manually be marked as sent to HL", + finalizationResults.failures + ); + } } - - console.log("done"); } + +//Maybe I can add some in success and failure buckets. async function prepForHLMigration( hlManager: HyperliquidManager, unmigratedMigrations: TokenMigration[] ): Promise { - const hlMigrations: HLMigration[] = []; - for (const unmigratedMigration of unmigratedMigrations) { - if (unmigratedMigration.amount && unmigratedMigration.migrationAddress) { - const arbitrumAmount = BigInt(unmigratedMigration.amount); - const hlAmount = - hlManager.decimalConversion!.convertToHlToken(arbitrumAmount); - hlMigrations.push({ - originalTransactionHash: unmigratedMigration.transactionHash, - hlTokenAmount: hlAmount, - sendToAddress: unmigratedMigration.migrationAddress, - }); - } else { - console.error( - `migration with hash ${unmigratedMigration.transactionHash} has no amount or no migration address` - ); + try { + const hlMigrations: HLMigration[] = []; + for (const unmigratedMigration of unmigratedMigrations) { + if (unmigratedMigration.amount && unmigratedMigration.migrationAddress) { + const arbitrumAmount = BigInt(unmigratedMigration.amount); + const hlAmount = + hlManager.decimalConversion!.convertToHlToken(arbitrumAmount); + hlMigrations.push({ + originalTransactionHash: unmigratedMigration.transactionHash, + hlTokenAmount: hlAmount, + sendToAddress: unmigratedMigration.migrationAddress, + }); + } else { + console.error( + `migration with hash ${unmigratedMigration.transactionHash} has no amount or no migration address` + ); + } } + return hlMigrations; + } catch (error) { + throw new MigrationPrepError("Error preparing for HL migration: " + error); } - return hlMigrations; } -async function initRedisConnection() { - return new IORedis(env.REDIS_CONNECTION_STRING, { - maxRetriesPerRequest: null, - enableReadyCheck: false, - tls: env.REDIS_USE_TLS - ? { - rejectUnauthorized: false, - } - : undefined, - }); -} + async function addMigrationsToDatabase(migrations: MigrationRegisteredEvent[]) { - try { - const result = await addFractalityTokenMigrations(migrations); //TODO: make this batch - console.log( - `Inserted ${result.newMigrations.length} new migrations and found ${result.existingTxs.length} existing migrations` - ); - console.info( - `existing migrations that already exist in the database`, - result.existingTxs - ); - } catch (e) { - console.error("Error adding migration to database", e); - } + const result = await filterAndaddNewFractalityTokenMigrations(migrations); //TODO: make this batch + console.log( + `Inserted ${result.newMigrations.length} new migrations and found ${result.existingTxs.length} existing migrations` + ); + console.info( + `existing migrations that already exist in the database`, + result.existingTxs + ); } diff --git a/redisOperations/redisOperations.ts b/redisOperations/redisOperations.ts new file mode 100644 index 0000000..061ffb2 --- /dev/null +++ b/redisOperations/redisOperations.ts @@ -0,0 +1,71 @@ +import { env } from "../env"; +import IORedis from "ioredis"; +import { RedisError } from "../errors"; + +export class RedisOperations { + private redisConnection: IORedis | null = null; + + + async initialize() { + this.redisConnection = await this.initRedisConnection(); + } + + + async initRedisConnection() { + return new IORedis(env.REDIS_CONNECTION_STRING, { + maxRetriesPerRequest: null, + enableReadyCheck: false, + tls: env.REDIS_USE_TLS + ? { + rejectUnauthorized: false, + } + : undefined, + }); + } + + async shouldRunAccordingToStopRunningFlag(): Promise { + if (!this.redisConnection) { + return false; + } + let stopRunningFlag = null; + try { + stopRunningFlag = await this.redisConnection.get("stopRunning"); + } catch (error) { + return false; + } + if (stopRunningFlag==="true") { + return false; + } + return true; + } + + + async getLastScanBlockNumber(): Promise { + if (!this.redisConnection) { + return null; + } + const lastScanBlockNumber = await this.redisConnection.get("lastScanBlockNumber"); + if(!lastScanBlockNumber) { + return null; + } + return BigInt(lastScanBlockNumber); + } + + async setLastScanBlockNumber(blockNumber: bigint) { + if (!this.redisConnection) { + throw new RedisError("Redis connection is not initialized"); + } + try { + await this.redisConnection.set("lastScanBlockNumber", blockNumber.toString()); + } catch (error) { + throw new RedisError("Error setting lastScanBlockNumber in redis"); + } + } + async setStopRunningFlag() { + if (!this.redisConnection) { + return; + } + await this.redisConnection.set("stopRunning", "true"); + } + +} \ No newline at end of file