Skip to content

Commit c2276b8

Browse files
committed
Replace echo stub with database components
5 components for Postgres + Redis: - postgres_exec: INSERT/UPDATE/DELETE with positional params - postgres_query: SELECT with configurable row shape - redis_dedup: SET NX EX, routes new vs seen - redis_set, redis_get: primitives Connection pools (pgxpool, go-redis) cached per DSN/URL across calls. Credentials flow in via the input message, not component settings.
1 parent 6b00253 commit c2276b8

11 files changed

Lines changed: 828 additions & 143 deletions

File tree

README.md

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,60 @@
1-
# Tiny Systems Example Module
1+
# Tiny Systems Database Module
22

3-
Template repository for building your own Tiny Systems module. Fork this repo to get started.
3+
Postgres and Redis components for Tiny Systems flows.
44

5-
## What's Included
5+
## Components
66

7-
A minimal Echo component that receives a message and passes it through:
7+
| Name | Purpose |
8+
|---|---|
9+
| `postgres_exec` | Run INSERT/UPDATE/DELETE with positional parameters; emits `rowsAffected`. |
10+
| `postgres_query` | Run SELECT; emits `rows[]` keyed by column name with a configurable row shape. |
11+
| `redis_dedup` | Atomic "first seen" check via `SET NX EX`; routes new IDs to **out_new** and duplicates to **out_seen**. |
12+
| `redis_set` | Set a key, with optional TTL and NX. |
13+
| `redis_get` | Get a key, returns `found=false` for missing keys without raising an error. |
14+
15+
All components take their connection string (`dsn` for Postgres, `url` for Redis) **per message**, so a single deployed module can talk to many databases. Connections are pooled by DSN/URL across calls.
16+
17+
## Patterns
18+
19+
### Dedup-then-route
820

9-
```go
10-
func (t *Component) Handle(ctx context.Context, handler module.Handler, port string, msg interface{}) any {
11-
if in, ok := msg.(InMessage); ok {
12-
return handler(ctx, OutPort, in.Context)
13-
}
14-
return fmt.Errorf("invalid message")
15-
}
21+
```
22+
ticker → http_request(api) → json_decode → array_split → redis_dedup
23+
├── out_new → … process and store ──→ postgres_exec
24+
└── out_seen → drop
1625
```
1726

18-
This demonstrates the core patterns:
19-
- Component interface (`GetInfo`, `Handle`, `Ports`, `Instance`)
20-
- Input/output ports with typed messages
21-
- Handler response propagation (blocking I/O)
22-
- `configurable:"true"` struct tag for edge data mapping
27+
Use `redis_dedup` for "have I already processed this ID?" checks. `keyPrefix` + `id` form the composite key. TTL determines how long Redis remembers — set it longer than the polling cycle plus margin.
2328

24-
## Project Structure
29+
### Insert with parameters
2530

2631
```
27-
cmd/main.go # Entry point — registers components, runs CLI
28-
components/echo/echo.go # Example component
29-
go.mod # SDK dependency (github.com/tiny-systems/module)
32+
redis_dedup:out_new → postgres_exec
33+
sql: INSERT INTO matched_posts (id, source, title, score) VALUES ($1, $2, $3, $4)
34+
params: ["{{$.id}}", "reddit", "{{$.context.title}}", "{{$.context.score}}"]
3035
```
3136

32-
## Getting Started
37+
### Query with configurable row shape
38+
39+
In `postgres_query` settings, define the expected row shape:
3340

34-
1. **Use this template** — click "Use this template" on GitHub
35-
2. **Rename the module** in `go.mod`
36-
3. **Add your components** under `components/`
37-
4. **Register them** via `init()` + `registry.Register()`
41+
```json
42+
{
43+
"row": { "id": "abc", "title": "title", "score": 0 }
44+
}
45+
```
46+
47+
Downstream edges can then navigate `$.rows[0].title`, `$.count`, etc.
3848

3949
## Run Locally
4050

4151
```shell
4252
go run cmd/main.go run \
43-
--name=my-org/my-module-v1 \
53+
--name=tiny-systems/database-module-v0 \
4454
--namespace=tinysystems \
45-
--version=1.0.0
55+
--version=0.1.0
4656
```
4757

48-
## Build and Deploy
49-
50-
```shell
51-
# Build container image
52-
docker build -t myregistry/my-module:1.0.0 .
53-
docker push myregistry/my-module:1.0.0
54-
55-
# Install via Helm
56-
helm repo add tinysystems https://tiny-systems.github.io/module/
57-
helm install my-module tinysystems/tinysystems-operator \
58-
--set controllerManager.manager.image.repository=myregistry/my-module
59-
```
60-
61-
## Resources
62-
63-
- [Developer Guide](https://docs.tinysystems.io/developer-guide/getting-started/hello-world-component) — build your first component
64-
- [Module SDK](https://github.com/tiny-systems/module) — core library
65-
- [Component Examples](https://docs.tinysystems.io/examples/components/simple-transformer) — real-world patterns
66-
- [Tiny Systems Platform](https://tinysystems.io) — visual editor and module directory
67-
6858
## License
6959

70-
This module's source code is MIT-licensed. It depends on the [Tiny Systems Module SDK](https://github.com/tiny-systems/module) (BSL 1.1). See [LICENSE](LICENSE) for details.
60+
MIT for this module's source. Depends on [Tiny Systems Module SDK](https://github.com/tiny-systems/module) (BSL 1.1).

cmd/main.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,30 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
610
"github.com/rs/zerolog"
711
"github.com/spf13/cobra"
812
"github.com/spf13/viper"
9-
_ "github.com/tiny-systems/example-module/components/echo"
13+
_ "github.com/tiny-systems/database-module/components/postgresexec"
14+
_ "github.com/tiny-systems/database-module/components/postgresquery"
15+
_ "github.com/tiny-systems/database-module/components/redisdedup"
16+
_ "github.com/tiny-systems/database-module/components/redisget"
17+
_ "github.com/tiny-systems/database-module/components/redisset"
1018
"github.com/tiny-systems/module/cli"
11-
"os"
12-
"os/signal"
13-
"syscall"
1419
)
1520

16-
// RootCmd represents the base command when called without any subcommands
1721
var rootCmd = &cobra.Command{
1822
Use: "server",
19-
Short: "tiny-system's example module",
23+
Short: "tiny-system's database module — Postgres and Redis components",
2024
Run: func(cmd *cobra.Command, args []string) {
21-
cmd.Help()
25+
_ = cmd.Help()
2226
},
2327
}
2428

2529
func main() {
26-
// Default level for this example is info, unless debug flag is present
2730
zerolog.SetGlobalLevel(zerolog.InfoLevel)
2831
viper.AutomaticEnv()
2932
if viper.GetBool("debug") {

components/echo/echo.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

components/pool/pool.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Package pool caches database connections across component invocations.
2+
// Postgres pools are keyed by DSN; Redis clients are keyed by URL.
3+
// Each unique DSN/URL produces one shared pool that lives for the process lifetime.
4+
package pool
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"github.com/jackc/pgx/v5/pgxpool"
11+
"github.com/redis/go-redis/v9"
12+
)
13+
14+
var (
15+
pgPools sync.Map // map[string]*pgxpool.Pool
16+
redisClients sync.Map // map[string]*redis.Client
17+
)
18+
19+
// Postgres returns a cached pgx pool for the given DSN, creating one on first use.
20+
func Postgres(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
21+
if v, ok := pgPools.Load(dsn); ok {
22+
return v.(*pgxpool.Pool), nil
23+
}
24+
p, err := pgxpool.New(ctx, dsn)
25+
if err != nil {
26+
return nil, err
27+
}
28+
if actual, loaded := pgPools.LoadOrStore(dsn, p); loaded {
29+
p.Close()
30+
return actual.(*pgxpool.Pool), nil
31+
}
32+
return p, nil
33+
}
34+
35+
// Redis returns a cached Redis client for the given URL, creating one on first use.
36+
func Redis(url string) (*redis.Client, error) {
37+
if v, ok := redisClients.Load(url); ok {
38+
return v.(*redis.Client), nil
39+
}
40+
opts, err := redis.ParseURL(url)
41+
if err != nil {
42+
return nil, err
43+
}
44+
c := redis.NewClient(opts)
45+
if actual, loaded := redisClients.LoadOrStore(url, c); loaded {
46+
if closeErr := c.Close(); closeErr != nil {
47+
_ = closeErr
48+
}
49+
return actual.(*redis.Client), nil
50+
}
51+
return c, nil
52+
}

components/postgresexec/exec.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package postgresexec
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/tiny-systems/database-module/components/pool"
8+
"github.com/tiny-systems/module/api/v1alpha1"
9+
"github.com/tiny-systems/module/module"
10+
"github.com/tiny-systems/module/registry"
11+
)
12+
13+
const (
14+
ComponentName = "postgres_exec"
15+
RequestPort = "request"
16+
ResponsePort = "response"
17+
ErrorPort = "error"
18+
)
19+
20+
type Context any
21+
22+
type Settings struct {
23+
EnableErrorPort bool `json:"enableErrorPort" required:"true" title:"Enable Error Port"`
24+
}
25+
26+
type Request struct {
27+
Context Context `json:"context,omitempty" configurable:"true" title:"Context"`
28+
DSN string `json:"dsn" required:"true" minLength:"1" title:"DSN" description:"Postgres connection string (e.g. postgres://user:pass@host:port/db?sslmode=disable)"`
29+
SQL string `json:"sql" required:"true" minLength:"1" title:"SQL" description:"INSERT/UPDATE/DELETE with $1, $2, ... placeholders" format:"textarea"`
30+
Params []any `json:"params,omitempty" title:"Params" description:"Positional parameters for $1, $2, ..."`
31+
}
32+
33+
type Response struct {
34+
Context Context `json:"context,omitempty" configurable:"true" title:"Context"`
35+
RowsAffected int64 `json:"rowsAffected" title:"Rows Affected"`
36+
}
37+
38+
type Error struct {
39+
Context Context `json:"context,omitempty" configurable:"true" title:"Context"`
40+
Error string `json:"error" title:"Error"`
41+
}
42+
43+
type Component struct {
44+
settings Settings
45+
}
46+
47+
func (c *Component) Instance() module.Component {
48+
return &Component{}
49+
}
50+
51+
func (c *Component) GetInfo() module.ComponentInfo {
52+
return module.ComponentInfo{
53+
Name: ComponentName,
54+
Description: "Postgres Exec",
55+
Info: "Executes INSERT/UPDATE/DELETE against Postgres with positional parameters. Connection pool is cached per DSN across calls.",
56+
Tags: []string{"Postgres", "SQL", "DB"},
57+
}
58+
}
59+
60+
func (c *Component) Handle(ctx context.Context, handler module.Handler, port string, msg any) any {
61+
switch port {
62+
case v1alpha1.SettingsPort:
63+
in, ok := msg.(Settings)
64+
if !ok {
65+
return fmt.Errorf("invalid settings")
66+
}
67+
c.settings = in
68+
return nil
69+
70+
case RequestPort:
71+
in, ok := msg.(Request)
72+
if !ok {
73+
return fmt.Errorf("invalid request")
74+
}
75+
return c.run(ctx, handler, in)
76+
}
77+
return fmt.Errorf("port %s not supported", port)
78+
}
79+
80+
func (c *Component) run(ctx context.Context, handler module.Handler, in Request) any {
81+
p, err := pool.Postgres(ctx, in.DSN)
82+
if err != nil {
83+
return c.fail(ctx, handler, in.Context, err)
84+
}
85+
86+
tag, err := p.Exec(ctx, in.SQL, in.Params...)
87+
if err != nil {
88+
return c.fail(ctx, handler, in.Context, err)
89+
}
90+
91+
return handler(ctx, ResponsePort, Response{
92+
Context: in.Context,
93+
RowsAffected: tag.RowsAffected(),
94+
})
95+
}
96+
97+
func (c *Component) fail(ctx context.Context, handler module.Handler, reqCtx Context, err error) any {
98+
if !c.settings.EnableErrorPort {
99+
return err
100+
}
101+
return handler(ctx, ErrorPort, Error{Context: reqCtx, Error: err.Error()})
102+
}
103+
104+
func (c *Component) Ports() []module.Port {
105+
ports := []module.Port{
106+
{Name: v1alpha1.SettingsPort, Label: "Settings", Configuration: c.settings},
107+
{Name: RequestPort, Label: "Request", Configuration: Request{}, Position: module.Left},
108+
{Name: ResponsePort, Label: "Response", Source: true, Configuration: Response{}, Position: module.Right},
109+
}
110+
if !c.settings.EnableErrorPort {
111+
return ports
112+
}
113+
return append(ports, module.Port{
114+
Name: ErrorPort, Label: "Error", Source: true, Configuration: Error{}, Position: module.Bottom,
115+
})
116+
}
117+
118+
var _ module.Component = (*Component)(nil)
119+
120+
func init() {
121+
registry.Register(&Component{})
122+
}

0 commit comments

Comments
 (0)