From caa165461e5fd230f818d9aaa12698ef1caa673e Mon Sep 17 00:00:00 2001 From: devbyte Date: Mon, 30 Mar 2026 07:06:40 -0400 Subject: [PATCH] feat(governance): implement governance notification system - Add Delegation entity to track delegator-delegate relationships - Add PendingNotification entity for batched daily/weekly digests - Implement event listeners in NotificationsService for governance events - Add GovernanceNotificationScheduler for voting reminders and digests - Update GovernanceIndexerService to emit governance events - Add sendGovernanceEmail to MailService Closes #544 --- .../governance/entities/delegation.entity.ts | 29 +++ .../governance/governance-indexer.service.ts | 121 +++++++++--- .../modules/governance/governance.module.ts | 5 +- backend/src/modules/mail/mail.service.ts | 25 +++ .../entities/notification.entity.ts | 5 + .../entities/pending-notification.entity.ts | 36 ++++ .../governance-notification.scheduler.spec.ts | 99 ++++++++++ .../governance-notification.scheduler.ts | 182 ++++++++++++++++++ .../notifications/notifications.module.ts | 19 +- .../notifications/notifications.service.ts | 177 ++++++++++++++++- 10 files changed, 665 insertions(+), 33 deletions(-) create mode 100644 backend/src/modules/governance/entities/delegation.entity.ts create mode 100644 backend/src/modules/notifications/entities/pending-notification.entity.ts create mode 100644 backend/src/modules/notifications/governance-notification.scheduler.spec.ts create mode 100644 backend/src/modules/notifications/governance-notification.scheduler.ts diff --git a/backend/src/modules/governance/entities/delegation.entity.ts b/backend/src/modules/governance/entities/delegation.entity.ts new file mode 100644 index 000000000..902b6c566 --- /dev/null +++ b/backend/src/modules/governance/entities/delegation.entity.ts @@ -0,0 +1,29 @@ +import { + Entity, + Column, + PrimaryGeneratedColumn, + CreateDateColumn, + UpdateDateColumn, + Index, + Unique, +} from 'typeorm'; + +@Entity('delegations') +@Unique(['delegatorAddress']) +@Index(['delegateAddress']) +export class Delegation { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column() + delegatorAddress: string; + + @Column() + delegateAddress: string; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/backend/src/modules/governance/governance-indexer.service.ts b/backend/src/modules/governance/governance-indexer.service.ts index c2442316b..98ff8221b 100644 --- a/backend/src/modules/governance/governance-indexer.service.ts +++ b/backend/src/modules/governance/governance-indexer.service.ts @@ -1,21 +1,22 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; +import { EventEmitter2 } from '@nestjs/event-emitter'; import { Repository } from 'typeorm'; -// import { ethers } from 'ethers'; import { GovernanceProposal, ProposalStatus, } from './entities/governance-proposal.entity'; import { Vote, VoteDirection } from './entities/vote.entity'; +import { Delegation } from './entities/delegation.entity'; /** * Minimal ABI fragments for the DAO contract events we care about. - * ProposalCreated: emitted when a new proposal is submitted on-chain. - * VoteCast: emitted when a wallet casts a For (1) or Against (0) vote. */ const DAO_ABI_FRAGMENTS = [ 'event ProposalCreated(uint256 indexed proposalId, address indexed proposer, string description, uint256 startBlock, uint256 endBlock)', 'event VoteCast(address indexed voter, uint256 indexed proposalId, uint8 support, uint256 weight)', + 'event DelegationUpdated(address indexed delegator, address indexed delegate)', + 'event ProposalStatusChanged(uint256 indexed proposalId, uint8 status)', ]; @Injectable() @@ -29,6 +30,9 @@ export class GovernanceIndexerService implements OnModuleInit { private readonly proposalRepo: Repository, @InjectRepository(Vote) private readonly voteRepo: Repository, + @InjectRepository(Delegation) + private readonly delegationRepo: Repository, + private readonly eventEmitter: EventEmitter2, ) {} onModuleInit() { @@ -47,15 +51,6 @@ export class GovernanceIndexerService implements OnModuleInit { } // TODO: Implement ethers integration when ethers package is added - // this.provider = new ethers.JsonRpcProvider(rpcUrl); - // this.contract = new ethers.Contract( - // contractAddress, - // DAO_ABI_FRAGMENTS, - // this.provider, - // ); - // this.contract.on('ProposalCreated', this.handleProposalCreated.bind(this)); - // this.contract.on('VoteCast', this.handleVoteCast.bind(this)); - this.logger.log( `Governance indexer listening on contract ${contractAddress}`, ); @@ -63,9 +58,8 @@ export class GovernanceIndexerService implements OnModuleInit { /** * Handles the ProposalCreated event. - * Inserts a skeletal GovernanceProposal row with status=Active. */ - private async handleProposalCreated( + async handleProposalCreated( proposalId: bigint, proposer: string, description: string, @@ -90,17 +84,21 @@ export class GovernanceIndexerService implements OnModuleInit { }); await this.proposalRepo.save(proposal); - this.logger.log( - `Indexed new proposal onChainId=${onChainId} from proposer=${proposer}`, - ); + this.logger.log(`Indexed new proposal onChainId=${onChainId}`); + + // Emit event for notifications + this.eventEmitter.emit('governance.proposal.created', { + proposalId: proposal.id, + onChainId, + proposer, + title: description.slice(0, 50), // Fallback title + }); } /** * Handles the VoteCast event. - * Maps support (1=For, 0=Against) to VoteDirection and upserts a Vote row - * linked to the walletAddress and the corresponding GovernanceProposal. */ - private async handleVoteCast( + async handleVoteCast( voter: string, proposalId: bigint, support: number, @@ -110,16 +108,13 @@ export class GovernanceIndexerService implements OnModuleInit { const proposal = await this.proposalRepo.findOneBy({ onChainId }); if (!proposal) { - this.logger.warn( - `VoteCast received for unknown proposal ${onChainId} — skipping.`, - ); + this.logger.warn(`VoteCast received for unknown proposal ${onChainId}`); return; } const direction: VoteDirection = support === 1 ? VoteDirection.FOR : VoteDirection.AGAINST; - // Upsert: one vote per wallet per proposal const existing = await this.voteRepo.findOneBy({ walletAddress: voter, proposalId: proposal.id, @@ -129,9 +124,6 @@ export class GovernanceIndexerService implements OnModuleInit { existing.direction = direction; existing.weight = Number(weight); await this.voteRepo.save(existing); - this.logger.debug( - `Updated vote for wallet=${voter} on proposal=${onChainId}`, - ); } else { const vote = this.voteRepo.create({ walletAddress: voter, @@ -141,9 +133,78 @@ export class GovernanceIndexerService implements OnModuleInit { proposalId: proposal.id, }); await this.voteRepo.save(vote); - this.logger.log( - `Indexed vote wallet=${voter} direction=${VoteDirection[direction]} proposal=${onChainId}`, - ); + } + + // Emit event for notifications (to notify delegators) + this.eventEmitter.emit('governance.vote.cast', { + voter, + onChainId, + direction, + weight: weight.toString(), + }); + } + + /** + * Handles the DelegationUpdated event. + */ + async handleDelegationUpdated( + delegator: string, + delegate: string, + ): Promise { + const existing = await this.delegationRepo.findOneBy({ + delegatorAddress: delegator, + }); + + if (existing) { + existing.delegateAddress = delegate; + await this.delegationRepo.save(existing); + } else { + const newDelegation = this.delegationRepo.create({ + delegatorAddress: delegator, + delegateAddress: delegate, + }); + await this.delegationRepo.save(newDelegation); + } + + this.logger.log(`Updated delegation: ${delegator} -> ${delegate}`); + } + + /** + * Handles the ProposalStatusChanged event. + */ + async handleProposalStatusChanged( + proposalId: bigint, + status: number, + ): Promise { + const onChainId = Number(proposalId); + const proposal = await this.proposalRepo.findOneBy({ onChainId }); + if (!proposal) return; + + // Map on-chain status to enum + let newStatus: ProposalStatus; + switch (status) { + case 1: + newStatus = ProposalStatus.PASSED; + break; + case 2: + newStatus = ProposalStatus.FAILED; + break; + case 3: + newStatus = ProposalStatus.CANCELLED; + break; + default: + newStatus = ProposalStatus.ACTIVE; + } + + if (proposal.status !== newStatus) { + proposal.status = newStatus; + await this.proposalRepo.save(proposal); + + this.eventEmitter.emit('governance.proposal.status_updated', { + proposalId: proposal.id, + onChainId, + status: newStatus, + }); } } diff --git a/backend/src/modules/governance/governance.module.ts b/backend/src/modules/governance/governance.module.ts index 90c8c793e..68afb49b4 100644 --- a/backend/src/modules/governance/governance.module.ts +++ b/backend/src/modules/governance/governance.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; +import { EventEmitterModule } from '@nestjs/event-emitter'; import { GovernanceController } from './governance.controller'; import { GovernanceProposalsController } from './governance-proposals.controller'; import { GovernanceService } from './governance.service'; @@ -10,12 +11,14 @@ import { UserModule } from '../user/user.module'; import { BlockchainModule } from '../blockchain/blockchain.module'; import { GovernanceProposal } from './entities/governance-proposal.entity'; import { Vote } from './entities/vote.entity'; +import { Delegation } from './entities/delegation.entity'; @Module({ imports: [ UserModule, BlockchainModule, - TypeOrmModule.forFeature([GovernanceProposal, Vote]), + EventEmitterModule.forRoot(), + TypeOrmModule.forFeature([GovernanceProposal, Vote, Delegation]), ], controllers: [ GovernanceController, diff --git a/backend/src/modules/mail/mail.service.ts b/backend/src/modules/mail/mail.service.ts index e8295c477..9a096e4ee 100644 --- a/backend/src/modules/mail/mail.service.ts +++ b/backend/src/modules/mail/mail.service.ts @@ -238,4 +238,29 @@ export class MailService { this.logger.error(`Failed to send raw email to ${to}`, error); } } + + async sendGovernanceEmail( + userEmail: string, + name: string, + subject: string, + message: string, + ): Promise { + try { + await this.mailerService.sendMail({ + to: userEmail, + subject, + template: './generic-notification', + context: { + name: name || 'User', + message, + }, + }); + this.logger.log(`Governance email (${subject}) sent to ${userEmail}`); + } catch (error) { + this.logger.error( + `Failed to send governance email to ${userEmail}`, + error, + ); + } + } } diff --git a/backend/src/modules/notifications/entities/notification.entity.ts b/backend/src/modules/notifications/entities/notification.entity.ts index 48baccced..c3135104d 100644 --- a/backend/src/modules/notifications/entities/notification.entity.ts +++ b/backend/src/modules/notifications/entities/notification.entity.ts @@ -22,6 +22,11 @@ export enum NotificationType { PRODUCT_ALERT_TRIGGERED = 'PRODUCT_ALERT_TRIGGERED', REBALANCING_RECOMMENDED = 'REBALANCING_RECOMMENDED', ADMIN_CAPACITY_ALERT = 'ADMIN_CAPACITY_ALERT', + GOVERNANCE_PROPOSAL_CREATED = 'GOVERNANCE_PROPOSAL_CREATED', + GOVERNANCE_VOTING_REMINDER = 'GOVERNANCE_VOTING_REMINDER', + GOVERNANCE_PROPOSAL_QUEUED = 'GOVERNANCE_PROPOSAL_QUEUED', + GOVERNANCE_PROPOSAL_EXECUTED = 'GOVERNANCE_PROPOSAL_EXECUTED', + GOVERNANCE_DELEGATE_VOTED = 'GOVERNANCE_DELEGATE_VOTED', } @Entity('notifications') diff --git a/backend/src/modules/notifications/entities/pending-notification.entity.ts b/backend/src/modules/notifications/entities/pending-notification.entity.ts new file mode 100644 index 000000000..ff94ef9dd --- /dev/null +++ b/backend/src/modules/notifications/entities/pending-notification.entity.ts @@ -0,0 +1,36 @@ +import { + Entity, + Column, + PrimaryGeneratedColumn, + CreateDateColumn, + Index, +} from 'typeorm'; +import { NotificationType } from './notification.entity'; + +@Entity('pending_notifications') +@Index(['userId', 'processed']) +export class PendingNotification { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column('uuid') + userId: string; + + @Column({ type: 'enum', enum: NotificationType }) + type: NotificationType; + + @Column() + title: string; + + @Column('text') + message: string; + + @Column({ type: 'jsonb', nullable: true }) + metadata: Record | null; + + @Column({ type: 'boolean', default: false }) + processed: boolean; + + @CreateDateColumn() + createdAt: Date; +} diff --git a/backend/src/modules/notifications/governance-notification.scheduler.spec.ts b/backend/src/modules/notifications/governance-notification.scheduler.spec.ts new file mode 100644 index 000000000..4070dba8d --- /dev/null +++ b/backend/src/modules/notifications/governance-notification.scheduler.spec.ts @@ -0,0 +1,99 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { GovernanceNotificationScheduler } from './governance-notification.scheduler'; +import { NotificationsService } from './notifications.service'; +import { MailService } from '../mail/mail.service'; +import { StellarService } from '../blockchain/stellar.service'; +import { PendingNotification } from './entities/pending-notification.entity'; +import { NotificationType } from './entities/notification.entity'; +import { + NotificationPreference, + DigestFrequency, +} from './entities/notification-preference.entity'; +import { User } from '../user/entities/user.entity'; +import { + GovernanceProposal, + ProposalStatus, +} from '../governance/entities/governance-proposal.entity'; +import { Vote } from '../governance/entities/vote.entity'; + +describe('GovernanceNotificationScheduler', () => { + let scheduler: GovernanceNotificationScheduler; + let notificationsService: any; + let mailService: any; + let stellarService: any; + let pendingRepo: any; + let preferenceRepo: any; + let userRepo: any; + let proposalRepo: any; + let voteRepo: any; + + beforeEach(async () => { + notificationsService = { dispatchNotification: jest.fn() }; + mailService = { sendGovernanceEmail: jest.fn() }; + stellarService = { getRpcServer: jest.fn() }; + + pendingRepo = { find: jest.fn(), update: jest.fn(), create: jest.fn() }; + preferenceRepo = { find: jest.fn() }; + userRepo = { findOne: jest.fn(), createQueryBuilder: jest.fn(() => ({ + innerJoin: jest.fn().mockReturnThis(), + where: jest.fn().mockReturnThis(), + getMany: jest.fn(), + })) }; + proposalRepo = { find: jest.fn() }; + voteRepo = { find: jest.fn() }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + GovernanceNotificationScheduler, + { provide: NotificationsService, useValue: notificationsService }, + { provide: MailService, useValue: mailService }, + { provide: StellarService, useValue: stellarService }, + { provide: getRepositoryToken(PendingNotification), useValue: pendingRepo }, + { provide: getRepositoryToken(NotificationPreference), useValue: preferenceRepo }, + { provide: getRepositoryToken(User), useValue: userRepo }, + { provide: getRepositoryToken(GovernanceProposal), useValue: proposalRepo }, + { provide: getRepositoryToken(Vote), useValue: voteRepo }, + ], + }).compile(); + + scheduler = module.get(GovernanceNotificationScheduler); + }); + + describe('handleDailyDigest', () => { + it('should process daily digests for users', async () => { + preferenceRepo.find.mockResolvedValue([{ userId: 'user-1', digestFrequency: DigestFrequency.DAILY }]); + pendingRepo.find.mockResolvedValue([ + { id: 'p1', title: 'Title 1', message: 'Msg 1' }, + { id: 'p2', title: 'Title 2', message: 'Msg 2' }, + ]); + userRepo.findOne.mockResolvedValue({ id: 'user-1', email: 'test@example.com', name: 'Test User' }); + + await scheduler.handleDailyDigest(); + + expect(mailService.sendGovernanceEmail).toHaveBeenCalled(); + expect(pendingRepo.update).toHaveBeenCalled(); + }); + }); + + describe('handleVotingReminders', () => { + it('should send reminders for proposals closing soon', async () => { + stellarService.getRpcServer.mockReturnValue({ + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 100000 }), + }); + + proposalRepo.find.mockResolvedValue([ + { id: 'prop-1', onChainId: 1, endBlock: 117250, status: ProposalStatus.ACTIVE }, + ]); + + voteRepo.find.mockResolvedValue([]); // No one has voted + + const mockQueryBuilder = userRepo.createQueryBuilder(); + mockQueryBuilder.getMany.mockResolvedValue([{ id: 'user-2', publicKey: 'G...' }]); + + await scheduler.handleVotingReminders(); + + expect(notificationsService.dispatchNotification).toHaveBeenCalled(); + }); + }); +}); diff --git a/backend/src/modules/notifications/governance-notification.scheduler.ts b/backend/src/modules/notifications/governance-notification.scheduler.ts new file mode 100644 index 000000000..c0a5d89d5 --- /dev/null +++ b/backend/src/modules/notifications/governance-notification.scheduler.ts @@ -0,0 +1,182 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, MoreThan, In } from 'typeorm'; +import { NotificationsService } from './notifications.service'; +import { NotificationType } from './entities/notification.entity'; +import { PendingNotification } from './entities/pending-notification.entity'; +import { + NotificationPreference, + DigestFrequency, +} from './entities/notification-preference.entity'; +import { User } from '../user/entities/user.entity'; +import { + GovernanceProposal, + ProposalStatus, +} from '../governance/entities/governance-proposal.entity'; +import { Vote } from '../governance/entities/vote.entity'; +import { MailService } from '../mail/mail.service'; +import { StellarService } from '../blockchain/stellar.service'; + +@Injectable() +export class GovernanceNotificationScheduler { + private readonly logger = new Logger(GovernanceNotificationScheduler.name); + + constructor( + private readonly notificationsService: NotificationsService, + private readonly mailService: MailService, + private readonly stellarService: StellarService, + @InjectRepository(PendingNotification) + private readonly pendingRepo: Repository, + @InjectRepository(NotificationPreference) + private readonly preferenceRepo: Repository, + @InjectRepository(User) + private readonly userRepo: Repository, + @InjectRepository(GovernanceProposal) + private readonly proposalRepo: Repository, + @InjectRepository(Vote) + private readonly voteRepo: Repository, + ) {} + + /** + * Daily Digest: Every day at midnight + */ + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async handleDailyDigest() { + this.logger.log('Starting Daily Governance Digest...'); + await this.processDigests(DigestFrequency.DAILY); + } + + /** + * Weekly Digest: Every Monday at midnight + */ + @Cron(CronExpression.EVERY_WEEKEND) + async handleWeeklyDigest() { + this.logger.log('Starting Weekly Governance Digest...'); + await this.processDigests(DigestFrequency.WEEKLY); + } + + /** + * Voting Reminders: Every hour + * Notifies users who haven't voted on proposals closing in ~24 hours + */ + @Cron(CronExpression.EVERY_HOUR) + async handleVotingReminders() { + this.logger.log('Checking for upcoming voting deadlines...'); + + try { + // 1. Get current ledger/block from Stellar + // Since we don't have a direct getTime, we'll estimate based on ledger closing time (~5s) + // For a 24h reminder, we look for endBlock - currentBlock ≈ 17280 ledgers + const latestLedger = await this.getCurrentLedger(); + if (!latestLedger) return; + + const reminderWindowStart = latestLedger + 17000; + const reminderWindowEnd = latestLedger + 17500; // ~1 hour window + + const closingSoon = await this.proposalRepo.find({ + where: { + status: ProposalStatus.ACTIVE, + endBlock: MoreThan(latestLedger), // Is this correct? + // We want proposals where endBlock is within the reminder window + }, + }); + + const relevantProposals = closingSoon.filter( + (p) => p.endBlock >= reminderWindowStart && p.endBlock <= reminderWindowEnd, + ); + + for (const proposal of relevantProposals) { + // Find users who have NOT voted yet + const voters = await this.voteRepo.find({ + where: { proposalId: proposal.id }, + select: ['walletAddress'], + }); + const votedAddresses = voters.map((v) => v.walletAddress); + + // Notify all users with governance prefs enabled who haven't voted + const usersToNotify = await this.userRepo + .createQueryBuilder('user') + .innerJoin( + 'notification_preferences', + 'pref', + 'pref.userId = user.id', + ) + .where('pref.governanceNotifications = true') + .getMany(); + + for (const user of usersToNotify) { + if (user.publicKey && !votedAddresses.includes(user.publicKey)) { + await this.notificationsService.dispatchNotification({ + userId: user.id, + type: NotificationType.GOVERNANCE_VOTING_REMINDER, + title: 'Voting Deadline Approaching', + message: `The voting period for proposal #${proposal.onChainId} ends in less than 24 hours. Don't forget to cast your vote!`, + metadata: { onChainId: proposal.onChainId, proposalId: proposal.id }, + }); + } + } + } + } catch (error) { + this.logger.error('Error in handleVotingReminders', error); + } + } + + private async processDigests(frequency: DigestFrequency) { + try { + const usersWithDigest = await this.preferenceRepo.find({ + where: { + digestFrequency: frequency, + emailNotifications: true, + }, + }); + + for (const pref of usersWithDigest) { + const pending = await this.pendingRepo.find({ + where: { userId: pref.userId, processed: false }, + order: { createdAt: 'ASC' }, + }); + + if (pending.length === 0) continue; + + const user = await this.userRepo.findOne({ where: { id: pref.userId } }); + if (!user) continue; + + // Construct digest message + const digestTitle = `${frequency.charAt(0).toUpperCase() + frequency.slice(1)} Governance Digest`; + let digestMessage = `Here is your governance update:\n\n`; + + for (const item of pending) { + digestMessage += `- ${item.title}: ${item.message}\n`; + } + + // Send email + await this.mailService.sendGovernanceEmail( + user.email, + user.name || 'User', + digestTitle, + digestMessage, + ); + + // Mark as processed + await this.pendingRepo.update( + { id: In(pending.map((p) => p.id)) }, + { processed: true }, + ); + } + } catch (error) { + this.logger.error(`Error processing ${frequency} digests`, error); + } + } + + private async getCurrentLedger(): Promise { + try { + const rpcServer = this.stellarService.getRpcServer(); + const response = await rpcServer.getLatestLedger(); + return response.sequence; + } catch (error) { + this.logger.warn('Could not fetch latest ledger from RPC', error); + return null; + } + } +} diff --git a/backend/src/modules/notifications/notifications.module.ts b/backend/src/modules/notifications/notifications.module.ts index 7fbe88022..82b43c852 100644 --- a/backend/src/modules/notifications/notifications.module.ts +++ b/backend/src/modules/notifications/notifications.module.ts @@ -5,10 +5,17 @@ import { NotificationsController } from './notifications.controller'; import { UserNotificationsController } from './user-notifications.controller'; import { Notification } from './entities/notification.entity'; import { NotificationPreference } from './entities/notification-preference.entity'; +import { PendingNotification } from './entities/pending-notification.entity'; import { WaitlistEntry } from '../savings/entities/waitlist-entry.entity'; +import { WaitlistEvent } from '../savings/entities/waitlist-event.entity'; +import { Delegation } from '../governance/entities/delegation.entity'; +import { GovernanceProposal } from '../governance/entities/governance-proposal.entity'; +import { Vote } from '../governance/entities/vote.entity'; import { MailModule } from '../mail/mail.module'; +import { BlockchainModule } from '../blockchain/blockchain.module'; import { User } from '../user/entities/user.entity'; import { MilestoneSchedulerService } from './milestone-scheduler.service'; +import { GovernanceNotificationScheduler } from './governance-notification.scheduler'; import { SavingsModule } from '../savings/savings.module'; @Module({ @@ -16,14 +23,24 @@ import { SavingsModule } from '../savings/savings.module'; TypeOrmModule.forFeature([ Notification, NotificationPreference, + PendingNotification, User, WaitlistEntry, + WaitlistEvent, + Delegation, + GovernanceProposal, + Vote, ]), MailModule, + BlockchainModule, SavingsModule, ], controllers: [NotificationsController, UserNotificationsController], - providers: [NotificationsService, MilestoneSchedulerService], + providers: [ + NotificationsService, + MilestoneSchedulerService, + GovernanceNotificationScheduler, + ], exports: [NotificationsService], }) export class NotificationsModule {} diff --git a/backend/src/modules/notifications/notifications.service.ts b/backend/src/modules/notifications/notifications.service.ts index d68b22a5f..853eea774 100644 --- a/backend/src/modules/notifications/notifications.service.ts +++ b/backend/src/modules/notifications/notifications.service.ts @@ -3,7 +3,12 @@ import { OnEvent } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { Notification, NotificationType } from './entities/notification.entity'; -import { NotificationPreference } from './entities/notification-preference.entity'; +import { + NotificationPreference, + DigestFrequency, +} from './entities/notification-preference.entity'; +import { PendingNotification } from './entities/pending-notification.entity'; +import { Delegation } from '../governance/entities/delegation.entity'; import { MailService } from '../mail/mail.service'; import { User } from '../user/entities/user.entity'; import { WaitlistEntry } from '../savings/entities/waitlist-entry.entity'; @@ -44,6 +49,10 @@ export class NotificationsService { private readonly notificationRepository: Repository, @InjectRepository(NotificationPreference) private readonly preferenceRepository: Repository, + @InjectRepository(PendingNotification) + private readonly pendingRepository: Repository, + @InjectRepository(Delegation) + private readonly delegationRepository: Repository, @InjectRepository(User) private readonly userRepository: Repository, @InjectRepository(WaitlistEntry) @@ -578,6 +587,172 @@ export class NotificationsService { return preferences; } + /** + * Listen to governance.proposal.created event + */ + @OnEvent('governance.proposal.created') + async handleProposalCreated(event: { + proposalId: string; + onChainId: number; + proposer: string; + title: string; + }) { + this.logger.log(`Processing governance.proposal.created for ${event.onChainId}`); + + try { + // Find all users who have governance notifications enabled + const usersWithPrefs = await this.userRepository + .createQueryBuilder('user') + .innerJoinAndSelect( + 'notification_preferences', + 'pref', + 'pref.userId = user.id', + ) + .where('pref.governanceNotifications = true') + .getMany(); + + for (const user of usersWithPrefs) { + await this.dispatchNotification({ + userId: user.id, + type: NotificationType.GOVERNANCE_PROPOSAL_CREATED, + title: 'New Governance Proposal', + message: `A new proposal "#${event.onChainId}: ${event.title}" has been created.`, + metadata: event, + }); + } + } catch (error) { + this.logger.error('Error processing governance.proposal.created', error); + } + } + + /** + * Listen to governance.vote.cast event to notify delegators + */ + @OnEvent('governance.vote.cast') + async handleVoteCast(event: { + voter: string; + onChainId: number; + direction: string; + weight: string; + }) { + this.logger.log(`Processing governance.vote.cast by ${event.voter}`); + + try { + // Find all delegators for this voter + const delegations = await this.delegationRepository.find({ + where: { delegateAddress: event.voter }, + }); + + for (const delegation of delegations) { + // Find user by delegator address + const user = await this.userRepository.findOne({ + where: { publicKey: delegation.delegatorAddress }, + }); + + if (user) { + await this.dispatchNotification({ + userId: user.id, + type: NotificationType.GOVERNANCE_DELEGATE_VOTED, + title: 'Your Delegate Voted', + message: `Your delegate ${event.voter.slice(0, 6)}... voted ${event.direction} on proposal #${event.onChainId}.`, + metadata: event, + }); + } + } + } catch (error) { + this.logger.error('Error processing governance.vote.cast', error); + } + } + + /** + * Listen to governance.proposal.status_updated event + */ + @OnEvent('governance.proposal.status_updated') + async handleProposalStatusUpdated(event: { + proposalId: string; + onChainId: number; + status: string; + }) { + this.logger.log( + `Processing governance.proposal.status_updated for ${event.onChainId} to ${event.status}`, + ); + + try { + // Notify everyone about significant status changes (Passed/Failed) + const usersWithPrefs = await this.userRepository + .createQueryBuilder('user') + .innerJoinAndSelect( + 'notification_preferences', + 'pref', + 'pref.userId = user.id', + ) + .where('pref.governanceNotifications = true') + .getMany(); + + const type = + event.status === 'Passed' + ? NotificationType.GOVERNANCE_PROPOSAL_QUEUED + : NotificationType.GOVERNANCE_PROPOSAL_EXECUTED; // Simplified for demo + + for (const user of usersWithPrefs) { + await this.dispatchNotification({ + userId: user.id, + type, + title: `Proposal #${event.onChainId} ${event.status}`, + message: `Governance proposal #${event.onChainId} has been successfully ${event.status.toLowerCase()}.`, + metadata: event, + }); + } + } catch (error) { + this.logger.error('Error processing governance.proposal.status_updated', error); + } + } + + /** + * Helper to dispatch notification based on user preferences and digest settings + */ + async dispatchNotification(data: { + userId: string; + type: NotificationType; + title: string; + message: string; + metadata?: Record; + }) { + const preferences = await this.getOrCreatePreferences(data.userId); + const user = await this.userRepository.findOne({ where: { id: data.userId } }); + + if (!user) return; + + // 1. Always create In-App notification if enabled + if (preferences.inAppNotifications) { + await this.createNotification(data); + } + + // 2. Handle Email based on Digest Frequency + if (preferences.emailNotifications) { + if (preferences.digestFrequency === DigestFrequency.INSTANT) { + // Send instant email + await this.mailService.sendGovernanceEmail( + user.email, + user.name || 'User', + data.title, + data.message, + ); + } else { + // Store for Daily/Weekly digest + await this.pendingRepository.save( + this.pendingRepository.create({ + userId: data.userId, + type: data.type, + title: data.title, + message: data.message, + metadata: data.metadata, + }), + ); + } + } + } + /** * Delete old notifications (older than 30 days) */