Skip to content

Commit d5f966d

Browse files
committed
Add underlying key-value state runtime interface
1 parent 476e530 commit d5f966d

4 files changed

Lines changed: 346 additions & 4 deletions

File tree

common/wasm_utils/wasm_utils.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,15 @@ func PtrSize(ptr, size uint32) uint64 {
4444
func ExtractPtrSize(ptrSize uint64) (uint32, uint32) {
4545
return uint32(ptrSize >> 32), uint32(ptrSize)
4646
}
47+
48+
// PtrSize64 packs a 64-bit pointer and size into a uint128-like structure
49+
// We use two uint64 values to represent a 128-bit pointer+size pair
50+
type PtrSize64 struct {
51+
Ptr uint64
52+
Size uint64
53+
}
54+
55+
// ExtractPtrSize64 extracts pointer and size from a PtrSize64 struct
56+
func ExtractPtrSize64(ps PtrSize64) (uint64, uint64) {
57+
return ps.Ptr, ps.Size
58+
}

fs/api/func_ctx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ type FunctionContext interface {
2929
DeleteState(ctx context.Context, key string) error
3030
Write(record contube.Record) error
3131
GetConfig() map[string]string
32+
GetStateStore() StateStore
3233
}

fs/func_ctx_impl.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,7 @@ func (f *funcCtxImpl) GetConfig() map[string]string {
8989
func (f *funcCtxImpl) setSink(sink chan<- contube.Record) {
9090
f.sink = sink
9191
}
92+
93+
func (f *funcCtxImpl) GetStateStore() api.StateStore {
94+
return f.stateStore
95+
}

fs/runtime/wazero/wazero_runtime.go

Lines changed: 329 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI
8080
rc *model.RuntimeConfig) (api.FunctionRuntime, error) {
8181
log := instance.Logger()
8282
r := wazero.NewRuntime(instance.Context())
83-
_, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context,
84-
m wazero_api.Module, a, b, c, d uint32) {
85-
log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort")
86-
}).Export("abort").Instantiate(instance.Context())
83+
84+
// Register env module with abort and stateStore functions
85+
err := registerEnvModule(r, instance.Context(), instance, log)
8786
if err != nil {
8887
return nil, fmt.Errorf("error instantiating env module: %w", err)
8988
}
89+
9090
wasmLog := &logWriter{
9191
log: log,
9292
}
@@ -173,3 +173,328 @@ func (r *FunctionRuntime) Call(e contube.Record) (contube.Record, error) {
173173
func (r *FunctionRuntime) Stop() {
174174
r.stopFunc()
175175
}
176+
177+
// helper functions to read/write byte arrays from/to wasm memory
178+
//
179+
// Note on pointer size compatibility:
180+
// - Current WebAssembly implementations (wasm32) use 32-bit memory offsets,
181+
// limiting addressable memory to 4GB. The wazero runtime's Memory interface
182+
// uses uint32 for addresses.
183+
// - Future WebAssembly specifications may introduce wasm64 with 64-bit addresses.
184+
// - The current implementation uses uint32 for memory operations, which is
185+
// compatible with wasm32 and the wazero Memory API.
186+
// - For wasm64 compatibility in the future, these functions would need to be
187+
// modified to use uint64 for address operations when wazero adds support.
188+
func readBytesFromMemory(module wazero_api.Module, ptr, size uint32) ([]byte, error) {
189+
if size == 0 {
190+
return nil, nil
191+
}
192+
mem := module.Memory()
193+
if mem == nil {
194+
return nil, fmt.Errorf("memory is not available")
195+
}
196+
data, ok := mem.Read(ptr, size)
197+
if !ok {
198+
return nil, fmt.Errorf("failed to read memory at offset %d, size %d", ptr, size)
199+
}
200+
result := make([]byte, size)
201+
copy(result, data)
202+
return result, nil
203+
}
204+
205+
func writeBytesToMemory(module wazero_api.Module, data []byte) (uint32, error) {
206+
if len(data) == 0 {
207+
return 0, nil
208+
}
209+
210+
// Check for potential overflow: max uint32 is 4GB-1
211+
if uint64(len(data)) > uint64(^uint32(0)) {
212+
return 0, fmt.Errorf("data size exceeds wasm32 memory limit: %d bytes", len(data))
213+
}
214+
215+
mem := module.Memory()
216+
if mem == nil {
217+
return 0, fmt.Errorf("memory is not available")
218+
}
219+
// Get current memory size in pages
220+
currentSize := mem.Size()
221+
neededSize := uint32(len(data))
222+
totalNeeded := currentSize + neededSize
223+
224+
// Calculate how many pages we need to add
225+
pagesToAdd := (totalNeeded + 65535) / 65536 // Round up to pages
226+
currentPages := currentSize / 65536
227+
_, ok := mem.Grow(pagesToAdd)
228+
if !ok {
229+
return 0, fmt.Errorf("failed to grow memory")
230+
}
231+
232+
// Calculate offset: start at end of original memory
233+
offset := currentPages * 65536
234+
ok = mem.Write(offset, data)
235+
if !ok {
236+
return 0, fmt.Errorf("failed to write memory at offset %d", offset)
237+
}
238+
return offset, nil
239+
}
240+
241+
// registerEnvModule registers both abort and StateStore functions to the wasm env module
242+
func registerEnvModule(runtime wazero.Runtime, ctx context.Context, instance api.FunctionInstance, log *common.Logger) error {
243+
stateStore := instance.FunctionContext().GetStateStore()
244+
245+
// Helper to read byte array from wasm memory: (ptr, size uint32) -> bytes
246+
readBytes := func(module wazero_api.Module, ptrSize uint64) ([]byte, error) {
247+
ptr := uint32(ptrSize >> 32)
248+
size := uint32(ptrSize)
249+
return readBytesFromMemory(module, ptr, size)
250+
}
251+
252+
// Helper to write byte array to wasm memory: bytes -> (ptr, size uint32)
253+
writeBytes := func(module wazero_api.Module, data []byte) (uint64, error) {
254+
ptr, err := writeBytesToMemory(module, data)
255+
if err != nil {
256+
return 0, err
257+
}
258+
return uint64(ptr)<<32 | uint64(len(data)), nil
259+
}
260+
261+
builder := runtime.NewHostModuleBuilder("env")
262+
263+
// Register abort function
264+
builder = builder.NewFunctionBuilder().WithFunc(func(ctx context.Context,
265+
m wazero_api.Module, a, b, c, d uint32) {
266+
log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort")
267+
}).Export("abort")
268+
269+
// Put(keyGroup, key, namespace, userKey ptrSize, value ptrSize) -> uint64(ptr, size)
270+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize, vPtrSize uint64) (resultPtrSize uint64) {
271+
keyGroup, err := readBytes(module, kg)
272+
if err != nil {
273+
log.Error(err, "failed to read keyGroup")
274+
return 0
275+
}
276+
key, err := readBytes(module, k)
277+
if err != nil {
278+
log.Error(err, "failed to read key")
279+
return 0
280+
}
281+
namespace, err := readBytes(module, ns)
282+
if err != nil {
283+
log.Error(err, "failed to read namespace")
284+
return 0
285+
}
286+
userKey, err := readBytes(module, ukPtrSize)
287+
if err != nil {
288+
log.Error(err, "failed to read userKey")
289+
return 0
290+
}
291+
value, err := readBytes(module, vPtrSize)
292+
if err != nil {
293+
log.Error(err, "failed to read value")
294+
return 0
295+
}
296+
297+
if err := stateStore.Put(ctx, keyGroup, key, namespace, userKey, value); err != nil {
298+
log.Error(err, "StateStore.Put failed")
299+
return 0
300+
}
301+
302+
// Return success (empty result)
303+
resultPtrSize, _ = writeBytes(module, []byte{})
304+
return resultPtrSize
305+
}).Export("stateStorePut")
306+
307+
// Get(keyGroup, key, namespace, userKey ptrSize) -> uint64(ptr, size)
308+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize uint64) (resultPtrSize uint64) {
309+
keyGroup, err := readBytes(module, kg)
310+
if err != nil {
311+
log.Error(err, "failed to read keyGroup")
312+
return 0
313+
}
314+
key, err := readBytes(module, k)
315+
if err != nil {
316+
log.Error(err, "failed to read key")
317+
return 0
318+
}
319+
namespace, err := readBytes(module, ns)
320+
if err != nil {
321+
log.Error(err, "failed to read namespace")
322+
return 0
323+
}
324+
userKey, err := readBytes(module, ukPtrSize)
325+
if err != nil {
326+
log.Error(err, "failed to read userKey")
327+
return 0
328+
}
329+
330+
value, err := stateStore.Get(ctx, keyGroup, key, namespace, userKey)
331+
if err != nil {
332+
log.Error(err, "StateStore.Get failed")
333+
return 0
334+
}
335+
336+
resultPtrSize, err = writeBytes(module, value)
337+
if err != nil {
338+
log.Error(err, "failed to write result")
339+
return 0
340+
}
341+
return resultPtrSize
342+
}).Export("stateStoreGet")
343+
344+
// Delete(keyGroup, key, namespace, userKey ptrSize) -> uint64(ptr, size)
345+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize uint64) (resultPtrSize uint64) {
346+
keyGroup, err := readBytes(module, kg)
347+
if err != nil {
348+
log.Error(err, "failed to read keyGroup")
349+
return 0
350+
}
351+
key, err := readBytes(module, k)
352+
if err != nil {
353+
log.Error(err, "failed to read key")
354+
return 0
355+
}
356+
namespace, err := readBytes(module, ns)
357+
if err != nil {
358+
log.Error(err, "failed to read namespace")
359+
return 0
360+
}
361+
userKey, err := readBytes(module, ukPtrSize)
362+
if err != nil {
363+
log.Error(err, "failed to read userKey")
364+
return 0
365+
}
366+
367+
if err := stateStore.Delete(ctx, keyGroup, key, namespace, userKey); err != nil {
368+
log.Error(err, "StateStore.Delete failed")
369+
return 0
370+
}
371+
372+
resultPtrSize, _ = writeBytes(module, []byte{})
373+
return resultPtrSize
374+
}).Export("stateStoreDelete")
375+
376+
// DeleteAll(keyGroup, key, namespace) -> uint64(ptr, size)
377+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns uint64) (resultPtrSize uint64) {
378+
keyGroup, err := readBytes(module, kg)
379+
if err != nil {
380+
log.Error(err, "failed to read keyGroup")
381+
return 0
382+
}
383+
key, err := readBytes(module, k)
384+
if err != nil {
385+
log.Error(err, "failed to read key")
386+
return 0
387+
}
388+
namespace, err := readBytes(module, ns)
389+
if err != nil {
390+
log.Error(err, "failed to read namespace")
391+
return 0
392+
}
393+
394+
if err := stateStore.DeleteAll(ctx, keyGroup, key, namespace); err != nil {
395+
log.Error(err, "StateStore.DeleteAll failed")
396+
return 0
397+
}
398+
399+
resultPtrSize, _ = writeBytes(module, []byte{})
400+
return resultPtrSize
401+
}).Export("stateStoreDeleteAll")
402+
403+
// Merge(keyGroup, key, namespace, userKey ptrSize, value ptrSize) -> uint64(ptr, size)
404+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize, vPtrSize uint64) (resultPtrSize uint64) {
405+
keyGroup, err := readBytes(module, kg)
406+
if err != nil {
407+
log.Error(err, "failed to read keyGroup")
408+
return 0
409+
}
410+
key, err := readBytes(module, k)
411+
if err != nil {
412+
log.Error(err, "failed to read key")
413+
return 0
414+
}
415+
namespace, err := readBytes(module, ns)
416+
if err != nil {
417+
log.Error(err, "failed to read namespace")
418+
return 0
419+
}
420+
userKey, err := readBytes(module, ukPtrSize)
421+
if err != nil {
422+
log.Error(err, "failed to read userKey")
423+
return 0
424+
}
425+
value, err := readBytes(module, vPtrSize)
426+
if err != nil {
427+
log.Error(err, "failed to read value")
428+
return 0
429+
}
430+
431+
if err := stateStore.Merge(ctx, keyGroup, key, namespace, userKey, value); err != nil {
432+
log.Error(err, "StateStore.Merge failed")
433+
return 0
434+
}
435+
436+
resultPtrSize, _ = writeBytes(module, []byte{})
437+
return resultPtrSize
438+
}).Export("stateStoreMerge")
439+
440+
// NewIterator(prefix ptrSize) -> iteratorId
441+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, prefixPtrSize uint64) (iteratorId int64) {
442+
prefix, err := readBytes(module, prefixPtrSize)
443+
if err != nil {
444+
log.Error(err, "failed to read prefix")
445+
return 0
446+
}
447+
448+
iterID, err := stateStore.NewIterator(prefix)
449+
if err != nil {
450+
log.Error(err, "StateStore.NewIterator failed")
451+
return 0
452+
}
453+
454+
return iterID
455+
}).Export("stateStoreNewIterator")
456+
457+
// HasNext(iteratorId) -> hasNext
458+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (hasNext uint32) {
459+
has, err := stateStore.HasNext(iteratorId)
460+
if err != nil {
461+
log.Error(err, "StateStore.HasNext failed")
462+
return 0
463+
}
464+
465+
if has {
466+
return 1
467+
}
468+
return 0
469+
}).Export("stateStoreIteratorHasNext")
470+
471+
// Next(iteratorId) -> uint64(ptr, size)
472+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (resultPtrSize uint64) {
473+
value, err := stateStore.Next(iteratorId)
474+
if err != nil {
475+
log.Error(err, "StateStore.Next failed")
476+
return 0
477+
}
478+
479+
resultPtrSize, err = writeBytes(module, value)
480+
if err != nil {
481+
log.Error(err, "failed to write result")
482+
return 0
483+
}
484+
return resultPtrSize
485+
}).Export("stateStoreIteratorNext")
486+
487+
// CloseIterator(iteratorId) -> uint64(ptr, size)
488+
builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (resultPtrSize uint64) {
489+
if err := stateStore.CloseIterator(iteratorId); err != nil {
490+
log.Error(err, "StateStore.CloseIterator failed")
491+
return 0
492+
}
493+
494+
resultPtrSize, _ = writeBytes(module, []byte{})
495+
return resultPtrSize
496+
}).Export("stateStoreIteratorClose")
497+
498+
_, err := builder.Instantiate(ctx)
499+
return err
500+
}

0 commit comments

Comments
 (0)