Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions backend/src/api/routes/assets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
import { HealthService } from "../../services/health.service.js";
import { LiquidityService } from "../../services/liquidity.service.js";
import { PriceService } from "../../services/price.service.js";
import { assetTagService } from "../../services/assetTag.service.js";
import { authMiddleware } from "../middleware/auth.js";

function getAuditActorType(source: "api-key" | "bootstrap" | undefined): "user" | "api_key" | "system" {
if (source === "api-key") return "api_key";
if (source === "bootstrap") return "system";
return "user";
}

export async function assetsRoutes(server: FastifyInstance) {
const healthService = new HealthService();
Expand Down Expand Up @@ -207,4 +215,168 @@ export async function assetsRoutes(server: FastifyInstance) {
return price;
},
);

// List all asset tags
server.get(
"/tags",
async (_request: FastifyRequest, _reply: FastifyReply) => {
const tags = await assetTagService.getAllTags();
return { tags };
}
);

// Get tag details
server.get<{ Params: { id: string } }>(
"/tags/:id",
async (request: FastifyRequest<{ Params: { id: string } }>, reply: FastifyReply) => {
const { id } = request.params;
const tag = await assetTagService.getTagById(id);
if (!tag) {
return reply.status(404).send({ error: "Tag not found" });
}
return tag;
}
);

// Create an asset tag
server.post<{ Body: { name: string; color?: string | null } }>(
"/tags",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Body: { name: string; color?: string | null } }>, reply: FastifyReply) => {
try {
const { name, color } = request.body;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);
const tag = await assetTagService.createTag(name, color || null, performedBy, actorType);
return reply.status(201).send(tag);
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to create tag";
return reply.status(400).send({ error: message });
}
}
);

// Update an asset tag
server.put<{ Params: { id: string }; Body: { name?: string; color?: string | null } }>(
"/tags/:id",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Params: { id: string }; Body: { name?: string; color?: string | null } }>, reply: FastifyReply) => {
try {
const { id } = request.params;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);
const tag = await assetTagService.updateTag(id, request.body, performedBy, actorType);
return tag;
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to update tag";
return reply.status(400).send({ error: message });
}
}
);

// Delete an asset tag
server.delete<{ Params: { id: string } }>(
"/tags/:id",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Params: { id: string } }>, reply: FastifyReply) => {
try {
const { id } = request.params;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);
await assetTagService.deleteTag(id, performedBy, actorType);
return reply.status(200).send({ success: true, message: "Tag deleted successfully" });
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to delete tag";
return reply.status(400).send({ error: message });
}
}
);

// Bulk assign tags to assets
server.post<{ Body: { assetSymbols: string[]; tagNames: string[] } }>(
"/tags/bulk-assign",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Body: { assetSymbols: string[]; tagNames: string[] } }>, reply: FastifyReply) => {
try {
const { assetSymbols, tagNames } = request.body;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);
const result = await assetTagService.bulkAssignTags(assetSymbols, tagNames, performedBy, actorType);
return reply.status(200).send({ success: true, ...result });
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to bulk assign tags";
return reply.status(400).send({ error: message });
}
}
);

// Get tags for an asset symbol
server.get<{ Params: { symbol: string } }>(
"/:symbol/tags",
async (request: FastifyRequest<{ Params: { symbol: string } }>, reply: FastifyReply) => {
try {
const { symbol } = request.params;
const tags = await assetTagService.getTagsForAsset(symbol);
return { symbol, tags };
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to get tags for asset";
return reply.status(400).send({ error: message });
}
}
);

// Assign tags to an asset symbol
server.post<{ Params: { symbol: string }; Body: { tags: string[] } }>(
"/:symbol/tags",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Params: { symbol: string }; Body: { tags: string[] } }>, reply: FastifyReply) => {
try {
const { symbol } = request.params;
const { tags } = request.body;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);

for (const tagName of tags) {
await assetTagService.assignTagToAsset(symbol, tagName, performedBy, actorType);
}

const currentTags = await assetTagService.getTagsForAsset(symbol);
return reply.status(200).send({ success: true, tags: currentTags });
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to assign tags";
return reply.status(400).send({ error: message });
}
}
);

// Unassign a tag from an asset symbol
server.delete<{ Params: { symbol: string; tagName: string } }>(
"/:symbol/tags/:tagName",
{
preHandler: authMiddleware({ requiredScopes: ["assets:write"] }),
},
async (request: FastifyRequest<{ Params: { symbol: string; tagName: string } }>, reply: FastifyReply) => {
try {
const { symbol, tagName } = request.params;
const performedBy = request.apiKeyAuth?.name || "system";
const actorType = getAuditActorType(request.apiKeyAuth?.source);

await assetTagService.unassignTagFromAsset(symbol, tagName, performedBy, actorType);
return reply.status(200).send({ success: true, message: `Tag "${tagName}" unassigned from asset "${symbol}"` });
} catch (error) {
const message = error instanceof Error ? error.message : "Failed to unassign tag";
return reply.status(400).send({ error: message });
}
}
);
}
40 changes: 40 additions & 0 deletions backend/src/database/migrations/027_asset_tag_service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { Knex } from "knex";

export async function up(knex: Knex): Promise<void> {
// Create tags table
await knex.schema.createTable("tags", (table) => {
table.string("id").primary();
table.string("name").notNullable().unique();
table.string("color").nullable();
table.timestamp("created_at").notNullable().defaultTo(knex.fn.now());
table.timestamp("updated_at").notNullable().defaultTo(knex.fn.now());

table.index(["name"]);
});

// Create asset_tags association table (Join Table)
await knex.schema.createTable("asset_tags", (table) => {
table
.uuid("asset_id")
.notNullable()
.references("id")
.inTable("assets")
.onDelete("CASCADE");
table
.string("tag_id")
.notNullable()
.references("id")
.inTable("tags")
.onDelete("CASCADE");
table.timestamp("created_at").notNullable().defaultTo(knex.fn.now());

table.primary(["asset_id", "tag_id"]);
table.index(["asset_id"]);
table.index(["tag_id"]);
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTableIfExists("asset_tags");
await knex.schema.dropTableIfExists("tags");
}
124 changes: 124 additions & 0 deletions backend/src/database/models/tag.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { getDatabase } from "../connection.js";

export interface Tag {
id: string;
name: string;
color: string | null;
created_at: Date;
updated_at: Date;
}

export interface AssetTag {
asset_id: string;
tag_id: string;
created_at: Date;
}

export class TagModel {
private db = getDatabase();

async findAll(): Promise<Tag[]> {
return this.db("tags").select("*").orderBy("name", "asc");
}

async findById(id: string): Promise<Tag | undefined> {
return this.db("tags").where("id", id).first();
}

async findByName(name: string): Promise<Tag | undefined> {
return this.db("tags").where("name", name).first();
}

async create(data: Omit<Tag, "created_at" | "updated_at">): Promise<Tag> {
const [tag] = await this.db("tags")
.insert({
...data,
created_at: new Date(),
updated_at: new Date(),
})
.returning("*");
return tag;
}

async update(id: string, data: Partial<Omit<Tag, "id" | "created_at" | "updated_at">>): Promise<Tag | undefined> {
const [tag] = await this.db("tags")
.where("id", id)
.update({
...data,
updated_at: new Date(),
})
.returning("*");
return tag;
}

async delete(id: string): Promise<number> {
return this.db("tags").where("id", id).delete();
}

// Association methods

async assign(assetId: string, tagId: string): Promise<void> {
await this.db("asset_tags")
.insert({
asset_id: assetId,
tag_id: tagId,
created_at: new Date(),
})
.onConflict(["asset_id", "tag_id"])
.ignore();
}

async unassign(assetId: string, tagId: string): Promise<number> {
return this.db("asset_tags")
.where({ asset_id: assetId, tag_id: tagId })
.delete();
}

async getTagsForAsset(assetId: string): Promise<Tag[]> {
return this.db("tags")
.join("asset_tags", "tags.id", "asset_tags.tag_id")
.where("asset_tags.asset_id", assetId)
.select("tags.*")
.orderBy("tags.name", "asc");
}

async getTagsForAssets(assetIds: string[]): Promise<{ asset_id: string; tag: Tag }[]> {
if (!assetIds.length) return [];
const rows = await this.db("tags")
.join("asset_tags", "tags.id", "asset_tags.tag_id")
.whereIn("asset_tags.asset_id", assetIds)
.select("asset_tags.asset_id", "tags.*");

return rows.map((row: any) => ({
asset_id: row.asset_id,
tag: {
id: row.id,
name: row.name,
color: row.color,
created_at: row.created_at,
updated_at: row.updated_at,
},
}));
}

async getAssetIdsForTag(tagId: string): Promise<string[]> {
const rows = await this.db("asset_tags")
.where("tag_id", tagId)
.select("asset_id");
return rows.map((r: any) => r.asset_id);
}

async getAssetIdsForTags(tagIds: string[]): Promise<string[]> {
if (!tagIds.length) return [];
const rows = await this.db("asset_tags")
.whereIn("tag_id", tagIds)
.select("asset_id")
.groupBy("asset_id")
.havingRaw("count(distinct tag_id) = ?", [tagIds.length]);
return rows.map((r: any) => r.asset_id);
}

async clearTagsForAsset(assetId: string): Promise<number> {
return this.db("asset_tags").where("asset_id", assetId).delete();
}
}
5 changes: 5 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import { initJobSystem } from "./workers/index.js";
import { JobQueue } from "./workers/queue.js";
import { initWebhookWorker, stopWebhookWorker } from "./workers/webhookDelivery.worker.js";
import { initNotificationQueueWorker, stopNotificationQueueWorker } from "./workers/notificationQueue.worker.js";
import { getSupplyVerificationQueue } from "./jobs/supplyVerification.job.js";
import { swaggerOptions, swaggerUiOptions } from "./config/openapi.js";
import { registerCorrelationMiddleware } from "./api/middleware/correlation.middleware.js";
Expand Down Expand Up @@ -210,6 +211,9 @@ async function start() {
// Initialize webhook delivery worker
await initWebhookWorker();

// Initialize notification queue worker
await initNotificationQueueWorker();

// Start outbox dispatcher (after all other systems are ready)
await startOutboxSystem();
server.log.info("Outbox dispatcher started");
Expand All @@ -224,6 +228,7 @@ async function start() {

// Stop outbox system first
await stopOutboxSystem();
await stopNotificationQueueWorker();
logger.info("Outbox system stopped");

await wsServer.shutdown();
Expand Down
Loading
Loading