Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 184 additions & 142 deletions apps/engine/src/__generated__/openapi.d.ts

Large diffs are not rendered by default.

2,155 changes: 1,146 additions & 1,009 deletions apps/engine/src/__generated__/openapi.json

Large diffs are not rendered by default.

224 changes: 224 additions & 0 deletions apps/engine/src/api/app.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,230 @@ describe('POST /source_discover', () => {
})
})

// ---------------------------------------------------------------------------
// JSON body mode (Content-Type: application/json)
// ---------------------------------------------------------------------------

const syncParamsObj = JSON.parse(syncParams)

describe('JSON body mode', () => {
it('POST /pipeline_check accepts pipeline in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
expect(res.headers.get('Content-Type')).toBe('application/x-ndjson')
const events = await readNdjson<Record<string, unknown>>(res)
const statuses = events.filter((e) => e.type === 'connection_status')
expect(statuses).toHaveLength(2)
})

it('POST /pipeline_setup accepts pipeline in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_setup', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
expect(res.headers.get('Content-Type')).toBe('application/x-ndjson')
})

it('POST /pipeline_teardown accepts pipeline in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_teardown', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
expect(res.headers.get('Content-Type')).toBe('application/x-ndjson')
})

it('POST /source_discover accepts source in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/source_discover', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
source: { type: 'test', test: { streams: { customers: {} } } },
}),
})
expect(res.status).toBe(200)
const events = await readNdjson<Record<string, unknown>>(res)
expect(events.some((e) => e.type === 'catalog')).toBe(true)
})

it('POST /pipeline_read accepts pipeline + state + body array in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_read', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
pipeline: syncParamsObj,
body: [
{
type: 'record',
record: {
stream: 'customers',
data: { id: 'cus_1', name: 'Alice' },
emitted_at: new Date().toISOString(),
},
},
{
type: 'source_state',
source_state: { stream: 'customers', data: { status: 'complete' } },
},
],
}),
})
expect(res.status).toBe(200)
const events = await readNdjson<Message>(res)
expect(events).toHaveLength(3)
expect(events[0]!.type).toBe('record')
expect(events[1]!.type).toBe('source_state')
expect(events[2]).toMatchObject({ type: 'eof', eof: { reason: 'complete' } })
})

it('POST /pipeline_read accepts pipeline in JSON body without input', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_read', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
const events = await readNdjson<Message>(res)
expect(events.some((e) => e.type === 'eof')).toBe(true)
})

it('POST /pipeline_write accepts pipeline + body array in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_write', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
pipeline: syncParamsObj,
body: [
{
type: 'record',
record: {
stream: 'customers',
data: { id: 'cus_1' },
emitted_at: '2024-01-01T00:00:00.000Z',
},
},
{
type: 'source_state',
source_state: { stream: 'customers', data: { cursor: 'cus_1' } },
},
],
}),
})
expect(res.status).toBe(200)
const events = await readNdjson<Record<string, unknown>>(res)
expect(events.some((e) => e.type === 'source_state')).toBe(true)
})

it('POST /pipeline_sync accepts pipeline + state + body array in JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_sync', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
pipeline: syncParamsObj,
body: [
{
type: 'record',
record: {
stream: 'customers',
data: { id: 'cus_1', name: 'Alice' },
emitted_at: new Date().toISOString(),
},
},
{
type: 'source_state',
source_state: { stream: 'customers', data: { status: 'complete' } },
},
],
}),
})
expect(res.status).toBe(200)
const events = await readNdjson<Record<string, unknown>>(res)
expect(events.some((e) => e.type === 'source_state')).toBe(true)
expect(events.some((e) => e.type === 'eof')).toBe(true)
})

it('POST /pipeline_sync without body array runs backfill mode', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_sync', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
const events = await readNdjson<Record<string, unknown>>(res)
expect(events.some((e) => e.type === 'eof')).toBe(true)
})

it('returns 400 when JSON body is missing pipeline', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({}),
})
expect(res.status).toBe(400)
})

it('NDJSON content-type uses header mode even with JSON-like body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
'X-Pipeline': syncParams,
},
})
expect(res.status).toBe(200)
})

it('no content-type defaults to header mode', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: { 'X-Pipeline': syncParams },
})
expect(res.status).toBe(200)
})

it('mixed-case Content-Type is accepted as JSON body mode', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: { 'Content-Type': 'Application/JSON; charset=utf-8' },
body: JSON.stringify({ pipeline: syncParamsObj }),
})
expect(res.status).toBe(200)
expect(res.headers.get('Content-Type')).toBe('application/x-ndjson')
})

it('application/json-seq falls back to header mode, not JSON body', async () => {
const app = await createApp(resolver)
const res = await app.request('/pipeline_check', {
method: 'POST',
headers: {
'Content-Type': 'application/json-seq',
'X-Pipeline': syncParams,
},
})
expect(res.status).toBe(200)
})
})

// ---------------------------------------------------------------------------
// POST /internal/query
// ---------------------------------------------------------------------------
Expand Down
Loading
Loading