Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 176 additions & 165 deletions database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
MigrationRegisteredEvent,
MigrationStatus,
} from "../interfaces";
import { DatabaseError } from "../errors";

let db: PostgresJsDatabase<typeof schema> | null = null;

Expand Down Expand Up @@ -41,28 +42,33 @@ export async function initializeDatabaseConnection(): Promise<
return db;
}

export async function addFractalityTokenMigrations(

export async function filterAndaddNewFractalityTokenMigrations(
migrations: MigrationRegisteredEvent[]
): Promise<{
newMigrations: MigrationRegisteredEvent[];
existingTxs: { txHash: string }[];
}> {
if (!db) {
throw new Error("Database not initialized");
throw new DatabaseError("Database not initialized");
}
// Get all transaction hashes from the incoming migrations
const incomingTxHashes = migrations.map((m) => m.transactionHash);

// Check which transactions already exist in the database
const existingTxs = await db
.select({ txHash: schema.fractalityTokenMigrations.transactionHash })
.from(schema.fractalityTokenMigrations)
.where(
inArray(
schema.fractalityTokenMigrations.transactionHash,
incomingTxHashes
)
);
let existingTxs: { txHash: string }[] = [];
try {
// Check which transactions already exist in the database
existingTxs = await db
.select({ txHash: schema.fractalityTokenMigrations.transactionHash })
.from(schema.fractalityTokenMigrations)
.where(
inArray(
schema.fractalityTokenMigrations.transactionHash,
incomingTxHashes
)
);
} catch (error) {
throw new DatabaseError("Error fetching existing migrations from database: " + error);
}

// Filter out migrations that already exist
const newMigrations = migrations.filter(
Expand All @@ -72,185 +78,190 @@ export async function addFractalityTokenMigrations(

// If there are new migrations, insert them
if (newMigrations.length > 0) {
await db.insert(schema.fractalityTokenMigrations).values(
newMigrations.map((migration) => ({
transactionHash: migration.transactionHash,
migrationContractAddress: migration.migrationContractAddress,
eventName: migration.eventName,
caller: migration.caller,
migrationAddress: migration.migrationAddress,
amount: migration.amount.toString(),
foundAt: new Date().toISOString(),
status: MigrationStatus.FOUND_ON_ARBITRUM,
migratedAt: null,
migratedAmount: null,
}))
);
try {
await db.insert(schema.fractalityTokenMigrations).values(
newMigrations.map((migration) => ({
transactionHash: migration.transactionHash,
migrationContractAddress: migration.migrationContractAddress,
eventName: migration.eventName,
caller: migration.caller,
migrationAddress: migration.migrationAddress,
amount: migration.amount.toString(),
foundAt: new Date().toISOString(),
status: MigrationStatus.FOUND_ON_ARBITRUM,
migratedAt: null,
migratedAmount: null,
}))
);
} catch (error) {
throw new DatabaseError("Error inserting new migrations batch into database: " + error);
}
}

return { newMigrations: newMigrations, existingTxs: existingTxs }; // Optionally return number of new insertions
return { newMigrations: newMigrations, existingTxs: existingTxs };
}

export async function dbCleanup(): Promise<void> {
try {
console.log("Starting database cleanup...");
export async function dbCleanup(): Promise<void> {
try {
console.log("Starting database cleanup...");

if (db) {
// Get the underlying postgres connection from drizzle
const client = (db as any).session?.config?.connection;
if (client) {
await client.end();
console.log("Database connection closed");
if (db) {
// Get the underlying postgres connection from drizzle
const client = (db as any).session?.config?.connection;
if (client) {
await client.end();
console.log("Database connection closed");
}
db = null;
}
db = null;
}

console.log("Database cleanup completed");
} catch (error) {
console.error("Error during database cleanup:", error);
throw error;
console.log("Database cleanup completed");
} catch (error) {
console.error("Error during database cleanup:", error);
throw error;
}
}
}

export async function getUnmigratedFractalityTokenMigrations(): Promise<
TokenMigration[]
> {
if (!db) {
throw new Error("Database not initialized");
}
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
not(
eq(
schema.fractalityTokenMigrations.status,
MigrationStatus.SENT_TO_HL
export async function getUnmigratedFractalityTokenMigrations(): Promise<
TokenMigration[]
> {
if (!db) {
throw new Error("Database not initialized");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of erroring, maybe try to initialize the database again?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into it, and it could be done, but I don't think it's worth it. After all the db gets initialized as the 2nd thing in main(). These errors shouldn't happen, I added them more to appease Typescript. The connection to the DB certainly could be dropped, which would give an error, but the db object will never be null after the initialization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@4nche if you agree, can you merge this so dylan can deploy it?

}
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
not(
eq(
schema.fractalityTokenMigrations.status,
MigrationStatus.SENT_TO_HL
)
)
)
);
return migrations;
} catch (error) {
console.error(
"Error fetching unmigrated fractality token migrations:",
error
);
throw error;
}
}

export async function getAllFractalityTokenMigrations(): Promise<
TokenMigration[]
> {
if (!db) {
throw new Error("Database not initialized");
}

try {
return await db.select().from(schema.fractalityTokenMigrations);
} catch (error) {
console.error("Error fetching fractality token migrations:", error);
throw error;
);
return migrations;
} catch (error) {
throw new DatabaseError("Error fetching unmigrated fractality token migrations: " + error);
}
}
}

export async function getFractalityTokenMigrationsByAddress(
migrationAddress: string
): Promise<TokenMigration[]> {
if (!db) {
throw new Error("Database not initialized");
}
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
eq(schema.fractalityTokenMigrations.migrationAddress, migrationAddress)
);
export async function getAllFractalityTokenMigrations(): Promise<
TokenMigration[]
> {
if (!db) {
throw new Error("Database not initialized");
}

return migrations;
} catch (error) {
console.error("Error fetching migrations by address:", error);
throw error;
try {
return await db.select().from(schema.fractalityTokenMigrations);
} catch (error) {
console.error("Error fetching fractality token migrations:", error);
throw error;
}
}
}

export async function getFractalityTokenMigrationsByMigrationContractAddress(
migrationContractAddress: string
): Promise<TokenMigration[]> {
if (!db) {
throw new Error("Database not initialized");
}
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
eq(
schema.fractalityTokenMigrations.migrationContractAddress,
migrationContractAddress
)
);
export async function getFractalityTokenMigrationsByAddress(
migrationAddress: string
): Promise<TokenMigration[]> {
if (!db) {
throw new Error("Database not initialized");
}
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
eq(schema.fractalityTokenMigrations.migrationAddress, migrationAddress)
);

return migrations;
} catch (error) {
console.error(
"Error fetching migrations by migration contract address:",
error
);
throw error;
return migrations;
} catch (error) {
console.error("Error fetching migrations by address:", error);
throw error;
}
}
}

export async function setHLMigrationStatus(
migrationHash: string,
status: MigrationStatus
) {
try {
export async function getFractalityTokenMigrationsByMigrationContractAddress(
migrationContractAddress: string
): Promise<TokenMigration[]> {
if (!db) {
throw new Error("Database not initialized");
}
await db
.update(schema.fractalityTokenMigrations)
.set({ status: status })
.where(
eq(schema.fractalityTokenMigrations.transactionHash, migrationHash)
try {
const migrations = await db
.select()
.from(schema.fractalityTokenMigrations)
.where(
eq(
schema.fractalityTokenMigrations.migrationContractAddress,
migrationContractAddress
)
);

return migrations;
} catch (error) {
console.error(
"Error fetching migrations by migration contract address:",
error
);
} catch (error) {
console.error(`ERROR setting HL migration status. Error: ${error}`);
throw error;
throw error;
}
}
}

export async function finalizeHlMigrations(migrations: HLMigration[]) {
if (!db) {
throw new Error("Database not initialized");
}
try {
for (const migration of migrations) {
console.log(
`Setting HL migration for ${migration.originalTransactionHash}`
);
export async function setHLMigrationStatus(
migrationHash: string,
status: MigrationStatus
) {
try {
if (!db) {
throw new Error("Database not initialized");
}
await db
.update(schema.fractalityTokenMigrations)
.set({
status: MigrationStatus.SENT_TO_HL,
migratedAt: new Date().toISOString(),
migratedAmount: migration.hlTokenAmount,
})
.set({ status: status })
.where(
eq(
schema.fractalityTokenMigrations.transactionHash,
migration.originalTransactionHash
)
eq(schema.fractalityTokenMigrations.transactionHash, migrationHash)
);
} catch (error) {
console.error(`ERROR setting HL migration status. Error: ${error}`);
throw error;
}
} catch (error) {
console.error(`FATAL ERROR updating HL migrations. Error: ${error}`);
throw error;
}
}

export type TokenMigration =
typeof schema.fractalityTokenMigrations.$inferSelect;
export async function finalizeHlMigrations(migrations: HLMigration[]) {
const successes = [];
const failures = [];
if (!db) {
throw new Error("Database not initialized");
}

for (const migration of migrations) {
try {
console.log(
`Setting HL migration for ${migration.originalTransactionHash}`
);
await db
.update(schema.fractalityTokenMigrations)
.set({
status: MigrationStatus.SENT_TO_HL,
migratedAt: new Date().toISOString(),
migratedAmount: migration.hlTokenAmount,
})
.where(
eq(
schema.fractalityTokenMigrations.transactionHash,
migration.originalTransactionHash
)
);
successes.push(migration);
} catch (error) {
failures.push(migration);
//THhis needs to shut down the service, as the next run could cause a double send.
console.error(`ERROR updating HL ${migration} migrations. Error: ${error}`);
}
}
return { successes, failures };
}

export type TokenMigration =
typeof schema.fractalityTokenMigrations.$inferSelect;
Loading