Skip to content

Conversation

@leshy
Copy link
Contributor

@leshy leshy commented Jan 21, 2026

Summary

Unified SensorStore[T] abstraction for timestamped sensor data with multiple backend implementations. Designed for robot sensor recording and replay.

SensorStore[T] Base Class (dimos/memory/sensor/base.py)

  • Generic abstract base with 4 methods to implement: _save, _load, _iter_items, _find_closest_timestamp
  • Convenience methods built on top: find_closest(), iterate(), iterate_ts(), iterate_realtime(), stream()

Backend Implementations

Backend File Use Case
InMemoryStore base.py Live streaming, testing
PickleDirStore pickledir.py Simple file-based, one file per timestamp
SqliteStore sqlite.py Single-file DB, indexed queries
PostgresStore postgres.py Multi-user, implements Resource

Usage

from dimos.memory.sensor import SqliteStore, PostgresStore

# SQLite (single file)
store = SqliteStore("sensors.db", table="lidar")
store.save(data, timestamp)
data = store.find_closest(seek=10.0)

# PostgreSQL (implements Resource)
store = PostgresStore("lidar")
store.start()
store.save(data, timestamp)
store.stop()

Misc

removed generic Embedding and made all Embedding returns the same from all models (since they are)

leshy added 11 commits January 21, 2026 17:17
- quality_barrier: Callable[[Observable[T]], Observable[T]]
- sharpness_barrier: Callable[[Observable[Image]], Observable[Image]]
- Implement find_closest(), first_timestamp(), iterate(), iterate_ts(),
  iterate_realtime() methods using abstract _iter_items/_find_closest_timestamp
- Add scheduler-based stream() with absolute time reference to prevent timing
  drift during long playback (ported from replay.py)
- Move imports to top of file, add proper typing throughout
- Fix pickledir.py mypy error (pickle.load returns Any)
- Single-file SQLite storage with indexed timestamp queries
- BLOB storage for pickled sensor data
- INSERT OR REPLACE for duplicate timestamp handling
- Supports multiple tables per database (different sensors)
- Added to parametrized tests (15 tests across 3 backends)
- PostgresStore implements SensorStore[T] + Resource for lifecycle management
- Multiple stores can share same database with different tables
- Tables created automatically on first save
- Tests are optional - skip gracefully if PostgreSQL not available
- Added psycopg2-binary and types-psycopg2 dependencies
- Includes reset_db() helper for simple migrations (drop/recreate)
@leshy leshy requested a review from a team January 21, 2026 14:27
@leshy leshy marked this pull request as draft January 21, 2026 14:27
@greptile-apps
Copy link

greptile-apps bot commented Jan 21, 2026

Greptile Summary

This PR implements a unified sensor storage and replay system with multiple backend implementations (in-memory, SQLite, PostgreSQL, and pickle directory). The core design follows a clean abstract base class pattern where backends only need to implement 4 methods (_save, _load, _iter_items, _find_closest_timestamp), while the base class provides rich functionality including streaming with drift-free timing control, seeking, and iteration.

Key Changes:

  • Added SensorStore abstract base class with streaming, seeking, and timing control via RxPy Observables
  • Implemented 4 backend storage options: InMemoryStore, SqliteStore, PostgresStore, and PickleDirStore
  • Added comprehensive parametrized tests covering all backend implementations
  • Updated dimos.utils.data to support nested paths in LFS archives
  • Added new EmbeddingMemory module for spatial embedding storage
  • Added psycopg2 dependency for PostgreSQL support

Critical Security Issues:

  • SQL injection vulnerabilities in both SqliteStore and PostgresStore where table names and database names are directly interpolated into SQL strings without validation
  • Race condition in PickleDirStore._save() using check-then-act pattern that could allow timestamp collisions

Thread Safety Concerns:

  • SqliteStore uses check_same_thread=False without additional synchronization mechanisms
  • PostgresStore and PickleDirStore may have concurrent access issues without explicit thread safety

Architecture Strengths:

  • Clean separation of concerns with abstract base class
  • Sophisticated drift-free streaming implementation using absolute time references
  • Flexible backend system supporting multiple storage strategies
  • Comprehensive test coverage with proper cleanup

Confidence Score: 2/5

  • This PR has critical security vulnerabilities that must be addressed before merging
  • The implementation demonstrates good architectural design with clean abstractions and comprehensive tests. However, the SQL injection vulnerabilities in both database backends (SqliteStore and PostgresStore) are critical security issues that could allow arbitrary SQL execution if user-controlled input reaches the table name parameter. Additionally, the race condition in PickleDirStore and thread safety concerns with check_same_thread=False in SQLite create reliability risks. These security and concurrency issues must be resolved before the code is production-ready.
  • Pay close attention to dimos/memory/sensor/sqlite.py and dimos/memory/sensor/postgres.py for SQL injection vulnerabilities, and dimos/memory/sensor/pickledir.py for race conditions

Important Files Changed

Filename Overview
dimos/memory/sensor/base.py Core abstract base class implementing unified sensor storage with streaming, seeking, and timing control via Observable streams
dimos/memory/sensor/sqlite.py SQLite backend implementation with SQL injection vulnerability in table name handling
dimos/memory/sensor/postgres.py PostgreSQL backend with SQL injection vulnerability and missing Resource dispose() implementation
dimos/memory/sensor/pickledir.py Pickle directory backend with potential timestamp collision vulnerability during concurrent writes

Sequence Diagram

sequenceDiagram
    participant Client
    participant SensorStore
    participant Backend as Backend (SQLite/Postgres/PickleDir)
    participant Scheduler
    participant Observer

    Note over Client,Backend: Save Operation
    Client->>SensorStore: save(data, timestamp)
    SensorStore->>SensorStore: Get timestamp from data.ts or time.time()
    SensorStore->>Backend: _save(timestamp, data)
    Backend->>Backend: Serialize data (pickle)
    Backend->>Backend: Store in DB/File
    Backend-->>SensorStore: Complete
    SensorStore-->>Client: Complete

    Note over Client,Backend: Query Operation
    Client->>SensorStore: find_closest(seek=10.0)
    SensorStore->>SensorStore: first_timestamp()
    SensorStore->>Backend: _iter_items()
    Backend-->>SensorStore: First timestamp
    SensorStore->>SensorStore: Calculate target: first + seek
    SensorStore->>Backend: _find_closest_timestamp(target)
    Backend->>Backend: Binary search / DB query
    Backend-->>SensorStore: Closest timestamp
    SensorStore->>Backend: _load(closest_timestamp)
    Backend->>Backend: Deserialize data (pickle)
    Backend-->>SensorStore: Data object
    SensorStore-->>Client: Data object

    Note over Client,Observer: Stream with Timing Control
    Client->>SensorStore: stream(speed=1.0, seek=5.0)
    SensorStore->>Observer: create Observable
    Observer->>SensorStore: subscribe()
    SensorStore->>Backend: iterate_ts(seek=5.0)
    Backend-->>SensorStore: Iterator[(ts, data)]
    SensorStore->>SensorStore: Get first message
    SensorStore->>SensorStore: Establish timing reference
    SensorStore->>Observer: on_next(first_data)
    loop For each message
        SensorStore->>SensorStore: Pre-load next message
        SensorStore->>SensorStore: Calculate absolute emission time
        SensorStore->>Scheduler: schedule_relative(delay, emit)
        Scheduler->>Scheduler: Wait until target time
        Scheduler->>Observer: on_next(data)
    end
    SensorStore->>Observer: on_completed()
    Observer-->>Client: Stream complete
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 files reviewed, 7 comments

Edit Code Review Agent Settings | Greptile

leshy added 10 commits January 22, 2026 11:33
- Validate table/database names in SqliteStore and PostgresStore
  using regex (alphanumeric/underscore, not starting with digit)
- Fix Transform.to_pose() return type using TYPE_CHECKING import
- Add return type annotation to TF.get_pose()
- Fix ambiguous doclink in transports.md
SqliteStore now accepts a name (e.g. "recordings/lidar") that gets
resolved via get_data_dir to data/recordings/lidar.db. Still supports
absolute paths and :memory: for backward compatibility.
- Add TypeVar bound: T = TypeVar("T", bound=Timestamped)
- Simplify save() to always use data.ts (no more optional timestamp)
- Update tests to use SampleData(Timestamped) instead of strings
- SqliteStore accepts str | Path for backward compatibility
# Conflicts:
#	dimos/models/manipulation/contact_graspnet_pytorch/inference.py
Required when cupy/contact_graspnet are installed locally without type stubs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants