-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbackend.go
More file actions
127 lines (115 loc) · 4.07 KB
/
Copy pathbackend.go
File metadata and controls
127 lines (115 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Backend is the engine-neutral database seam the OGC tier runs over. The tier
// holds no engine-specific type: it calls named MEOS/MobilityDB functions
// through portable SQL and reads rows through this interface, so a second MEOS
// engine (e.g. MobilityDuck over DuckDB) is added by implementing Backend, not
// by touching the handlers.
package main
import (
"context"
"errors"
"strings"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// ErrNoRows is the engine-neutral "no rows" sentinel; each backend maps its
// driver's no-rows error onto it so handlers test one value.
var ErrNoRows = errors.New("no rows in result set")
// Row, Rows and Tx mirror the minimal shapes the handlers use.
type Row interface {
Scan(dest ...any) error
}
type Rows interface {
Next() bool
Scan(dest ...any) error
Close()
Err() error
}
type Tx interface {
Exec(ctx context.Context, sql string, args ...any) (int64, error)
Query(ctx context.Context, sql string, args ...any) (Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) Row
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
// Backend is one MEOS-backed engine. Exec returns the affected-row count.
type Backend interface {
QueryRow(ctx context.Context, sql string, args ...any) Row
Query(ctx context.Context, sql string, args ...any) (Rows, error)
Exec(ctx context.Context, sql string, args ...any) (int64, error)
Begin(ctx context.Context) (Tx, error)
Ping(ctx context.Context) error
Close()
}
// openBackend selects the engine from the DSN scheme. Today every DSN is
// PostgreSQL/MobilityDB; the scheme switch is where MobilityDuck plugs in.
func openBackend(dsn string) (Backend, error) {
if strings.HasPrefix(dsn, "duckdb:") {
return openDuck(dsn)
}
if strings.HasPrefix(dsn, "spark:") {
return openSpark(dsn)
}
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, err
}
cfg.MaxConns = int32(envInt("MFAPI_MAXCONNS", 16))
// AIS and other moving-feature timestamps are UTC; pin every connection to
// UTC and ISO datestyle so asMFJSON serializes ISO 8601 UTC datetimes
// regardless of the server's locale.
if cfg.ConnConfig.RuntimeParams == nil {
cfg.ConnConfig.RuntimeParams = map[string]string{}
}
cfg.ConnConfig.RuntimeParams["timezone"] = "UTC"
cfg.ConnConfig.RuntimeParams["datestyle"] = "ISO, YMD"
pool, err := pgxpool.NewWithConfig(context.Background(), cfg)
if err != nil {
return nil, err
}
return &pgBackend{pool}, nil
}
// pgBackend is the PostgreSQL/MobilityDB engine over a pgx pool.
type pgBackend struct{ pool *pgxpool.Pool }
func (b *pgBackend) QueryRow(ctx context.Context, sql string, a ...any) Row {
return pgRow{b.pool.QueryRow(ctx, sql, a...)}
}
func (b *pgBackend) Query(ctx context.Context, sql string, a ...any) (Rows, error) {
return b.pool.Query(ctx, sql, a...)
}
func (b *pgBackend) Exec(ctx context.Context, sql string, a ...any) (int64, error) {
ct, err := b.pool.Exec(ctx, sql, a...)
return ct.RowsAffected(), err
}
func (b *pgBackend) Begin(ctx context.Context) (Tx, error) {
tx, err := b.pool.Begin(ctx)
if err != nil {
return nil, err
}
return pgTx{tx}, nil
}
func (b *pgBackend) Ping(ctx context.Context) error { return b.pool.Ping(ctx) }
func (b *pgBackend) Close() { b.pool.Close() }
// pgRow maps pgx.ErrNoRows onto the neutral ErrNoRows.
type pgRow struct{ r pgx.Row }
func (r pgRow) Scan(dest ...any) error {
if err := r.r.Scan(dest...); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return ErrNoRows
}
return err
}
return nil
}
type pgTx struct{ tx pgx.Tx }
func (t pgTx) Exec(ctx context.Context, sql string, a ...any) (int64, error) {
ct, err := t.tx.Exec(ctx, sql, a...)
return ct.RowsAffected(), err
}
func (t pgTx) Query(ctx context.Context, sql string, a ...any) (Rows, error) {
return t.tx.Query(ctx, sql, a...)
}
func (t pgTx) QueryRow(ctx context.Context, sql string, a ...any) Row {
return pgRow{t.tx.QueryRow(ctx, sql, a...)}
}
func (t pgTx) Commit(ctx context.Context) error { return t.tx.Commit(ctx) }
func (t pgTx) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) }