Skip to content

Commit 43f2eb1

Browse files
fix: polish export-events review feedback
1 parent 8ed6a89 commit 43f2eb1

1 file changed

Lines changed: 90 additions & 63 deletions

File tree

src/scripts/export-events.ts

Lines changed: 90 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,92 +3,119 @@ import dotenv from 'dotenv'
33
dotenv.config()
44

55
import fs from 'fs'
6-
import knex from 'knex'
76
import path from 'path'
87
import { pipeline } from 'stream/promises'
98
import { Transform } from 'stream'
109

11-
const getDbConfig = () => ({
12-
client: 'pg',
13-
connection: process.env.DB_URI || {
14-
host: process.env.DB_HOST ?? 'localhost',
15-
port: Number(process.env.DB_PORT ?? 5432),
16-
user: process.env.DB_USER ?? 'postgres',
17-
password: process.env.DB_PASSWORD ?? 'postgres',
18-
database: process.env.DB_NAME ?? 'nostream',
19-
},
20-
})
10+
import { getMasterDbClient } from '../database/client'
11+
12+
type EventRow = {
13+
event_id: Buffer
14+
event_pubkey: Buffer
15+
event_kind: number
16+
event_created_at: number
17+
event_content: string
18+
event_tags: unknown[] | null
19+
event_signature: Buffer
20+
}
2121

2222
async function exportEvents(): Promise<void> {
2323
const filename = process.argv[2] || 'events.jsonl'
2424
const outputPath = path.resolve(filename)
25-
const db = knex(getDbConfig())
25+
const db = getMasterDbClient()
26+
const abortController = new AbortController()
27+
let interruptedBySignal: NodeJS.Signals | undefined
28+
29+
const onSignal = (signal: NodeJS.Signals) => {
30+
if (abortController.signal.aborted) {
31+
return
32+
}
33+
34+
interruptedBySignal = signal
35+
process.exitCode = 130
36+
console.log(`${signal} received. Stopping export...`)
37+
abortController.abort()
38+
}
39+
40+
process
41+
.on('SIGINT', onSignal)
42+
.on('SIGTERM', onSignal)
2643

2744
try {
28-
const [{ count }] = await db('events')
45+
const firstEvent = await db('events')
46+
.select('event_id')
2947
.whereNull('deleted_at')
30-
.count('* as count')
31-
const total = Number(count)
48+
.first()
49+
50+
if (abortController.signal.aborted) {
51+
return
52+
}
3253

33-
if (total === 0) {
54+
if (!firstEvent) {
3455
console.log('No events to export.')
3556
return
3657
}
3758

38-
console.log(`Exporting ${total} events to ${outputPath}`)
59+
console.log(`Exporting events to ${outputPath}`)
3960

4061
const output = fs.createWriteStream(outputPath)
4162
let exported = 0
4263

43-
const trx = await db.transaction(null, { isolationLevel: 'repeatable read' })
44-
try {
45-
await trx.raw('SET TRANSACTION READ ONLY')
46-
47-
const dbStream = trx('events')
48-
.select(
49-
'event_id',
50-
'event_pubkey',
51-
'event_kind',
52-
'event_created_at',
53-
'event_content',
54-
'event_tags',
55-
'event_signature',
56-
)
57-
.whereNull('deleted_at')
58-
.orderBy('event_created_at', 'asc')
59-
.stream()
60-
61-
const toJsonLine = new Transform({
62-
objectMode: true,
63-
transform(row: any, _encoding, callback) {
64-
const event = {
65-
id: row.event_id.toString('hex'),
66-
pubkey: row.event_pubkey.toString('hex'),
67-
created_at: row.event_created_at,
68-
kind: row.event_kind,
69-
tags: row.event_tags || [],
70-
content: row.event_content,
71-
sig: row.event_signature.toString('hex'),
72-
}
73-
74-
exported++
75-
if (exported % 10000 === 0) {
76-
console.log(`Exported ${exported}/${total} events...`)
77-
}
78-
79-
callback(null, JSON.stringify(event) + '\n')
80-
},
81-
})
82-
83-
await pipeline(dbStream, toJsonLine, output)
84-
await trx.commit()
85-
} catch (err) {
86-
await trx.rollback()
87-
throw err
88-
}
64+
const dbStream = db('events')
65+
.select(
66+
'event_id',
67+
'event_pubkey',
68+
'event_kind',
69+
'event_created_at',
70+
'event_content',
71+
'event_tags',
72+
'event_signature',
73+
)
74+
.whereNull('deleted_at')
75+
.orderBy('event_created_at', 'asc')
76+
.orderBy('event_id', 'asc')
77+
.stream()
78+
79+
const toJsonLine = new Transform({
80+
objectMode: true,
81+
transform(row: EventRow, _encoding, callback) {
82+
const event = {
83+
id: row.event_id.toString('hex'),
84+
pubkey: row.event_pubkey.toString('hex'),
85+
created_at: row.event_created_at,
86+
kind: row.event_kind,
87+
tags: Array.isArray(row.event_tags) ? row.event_tags : [],
88+
content: row.event_content,
89+
sig: row.event_signature.toString('hex'),
90+
}
91+
92+
exported++
93+
if (exported % 10000 === 0) {
94+
console.log(`Exported ${exported} events...`)
95+
}
96+
97+
callback(null, JSON.stringify(event) + '\n')
98+
},
99+
})
100+
101+
await pipeline(dbStream, toJsonLine, output, {
102+
signal: abortController.signal,
103+
})
89104

90105
console.log(`Export complete: ${exported} events written to ${outputPath}`)
106+
} catch (error) {
107+
if (abortController.signal.aborted) {
108+
console.log(`Export interrupted by ${interruptedBySignal ?? 'signal'}.`)
109+
process.exitCode = 130
110+
return
111+
}
112+
113+
throw error
91114
} finally {
115+
process
116+
.off('SIGINT', onSignal)
117+
.off('SIGTERM', onSignal)
118+
92119
await db.destroy()
93120
}
94121
}

0 commit comments

Comments
 (0)