Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "bwell-logkit"
version = "1.1.0"
version = "1.2.0"
description = "A Python library for processing and analyzing log files from the National Research Council (NRC) Canada's bWell application."
readme = "README.md"
requires-python = ">=3.11"
Expand Down
22 changes: 0 additions & 22 deletions requirements-dev.txt

This file was deleted.

3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

39 changes: 24 additions & 15 deletions src/bwell_logkit/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@
def _flatten_dataframe(df: "pd.DataFrame") -> "pd.DataFrame":
"""Flatten nested dictionaries in DataFrame columns."""

flattened_data = []

for _, row in df.iterrows():
flattened_row = {}
for col, value in row.items():
# Ensure col is treated as str for dict indexing
col_str = str(col)
if isinstance(value, dict):
for nested_key, nested_value in value.items():
flattened_row[f"{col_str}_{nested_key}"] = nested_value
else:
flattened_row[col_str] = value
flattened_data.append(flattened_row)

return pd.DataFrame(flattened_data)
if df.empty:
return df

result_parts = []

for col in df.columns:
col_str = str(col)

# Check if column contains dictionaries
first_valid = df[col].dropna().iloc[0] if not df[col].dropna().empty else None

if isinstance(first_valid, dict):
# Use json_normalize for this column (vectorized)
normalized = pd.json_normalize(df[col].fillna({}))
normalized.columns = [
f"{col_str}_{subcol}" for subcol in normalized.columns
]
normalized.index = df.index # Preserve original index
result_parts.append(normalized)
else:
# Keep non-dict columns as-is
result_parts.append(df[[col]])

return pd.concat(result_parts, axis=1) if result_parts else pd.DataFrame()


class LogSessionExtractor:
Expand Down
76 changes: 45 additions & 31 deletions src/bwell_logkit/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"""

import bisect
import copy
from collections import defaultdict
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

import orjson

from .exceptions import SceneNotFoundError
from .extractor import LogSessionExtractor, SceneViewExtractor
from .scene import SceneManager
Expand All @@ -17,35 +17,29 @@
import pandas as pd


def _freeze(obj: Any) -> Any:
"""Freeze unhashable types."""
if isinstance(obj, Mapping):
# For dicts: produce a frozenset of (key, frozen_value) pairs
return frozenset((key, _freeze(val)) for key, val in obj.items())
elif isinstance(obj, Sequence) and not isinstance(obj, (str, bytes)):
# For lists/tuples: produce a tuple of frozen elements
return tuple(_freeze(item) for item in obj)
else:
# Primitives stay as-is
return obj


def _clean_records(records: list[LogRecord]) -> list[LogRecord]:
"""Clean and deduplicate records."""
seen = set()
cleaned = []
"""Clean and deduplicate records using JSON-based hashing.

Uses orjson for fast serialization and deduplication.
Records are considered duplicates if they have identical content
(excluding the 'ID' field).
"""
seen: set[bytes] = set()
cleaned: list[LogRecord] = []

for rec in records:
# Make a shallow copy, drop the ID
tmp = dict(rec)
tmp.pop(RecordFields.ID, None)
# Create copy without ID for deduplication
tmp = {k: v for k, v in rec.items() if k != RecordFields.ID}

key = _freeze(tmp)
# Serialize to JSON bytes for hashing (orjson is fast)
# sort_keys ensures consistent ordering
key = orjson.dumps(tmp, option=orjson.OPT_SORT_KEYS)

if key not in seen:
seen.add(key)
cleaned.append(rec)

# Sort by timestamp
cleaned.sort(key=lambda r: r.get(RecordFields.GAME_TIME_SECS, 0.0))
return cleaned

Expand Down Expand Up @@ -88,10 +82,22 @@ def __init__(

self._extractor: LogSessionExtractor | None = None

# Cached timestamp list for binary search operations
self._timestamps: list[float] | None = None

@property
def records(self) -> list[LogRecord]:
"""Get all records in the session."""
return copy.deepcopy(self._records)
def records(self) -> tuple[LogRecord, ...]:
"""Get all records in the session as an immutable tuple.

Returns a tuple with shallow copies of records for efficient access
while preventing modification.

Returns:
Immutable tuple of log records sorted by timestamp. Each record
is a shallow copy, so modifying nested structures affects the
original, but top-level field changes are isolated.
"""
return tuple(rec.copy() for rec in self._records)

@property
def metadata(self) -> dict[str, Any]:
Expand Down Expand Up @@ -140,6 +146,15 @@ def scene_manager(self) -> SceneManager:
self._scene_manager = SceneManager(self._records)
return self._scene_manager

@property
def _timestamp_list(self) -> list[float]:
"""Lazy-loaded timestamp list for binary search operations."""
if self._timestamps is None:
self._timestamps = [
r.get(RecordFields.GAME_TIME_SECS, 0.0) for r in self._records
]
return self._timestamps

def list_scenes(self) -> list[str]:
"""
List all available scene names.
Expand Down Expand Up @@ -230,10 +245,9 @@ def filter_time_range(self, start: float, end: float) -> "LogSession":
if not self._records:
return LogSession([], self._metadata, _scene_manager=self._scene_manager)

timestamps = [r.get(RecordFields.GAME_TIME_SECS, 0.0) for r in self._records]

start_idx = bisect.bisect_left(timestamps, start)
end_idx = bisect.bisect_right(timestamps, end)
# Use cached timestamp list for binary search
start_idx = bisect.bisect_left(self._timestamp_list, start)
end_idx = bisect.bisect_right(self._timestamp_list, end)

filtered_records = self._records[start_idx:end_idx]

Expand Down Expand Up @@ -377,8 +391,8 @@ def __init__(self, session: LogSession, scene_info: SceneInfo):
self._extractor: SceneViewExtractor | None = None

@property
def records(self) -> list[LogRecord]:
"""Get records in this scene."""
def records(self) -> tuple[LogRecord, ...]:
"""Get records in this scene as an immutable tuple."""
return self._session.records

@property
Expand Down
21 changes: 13 additions & 8 deletions src/bwell_logkit/scene.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Scene management and segmentation for bWell log data."""

import bisect
from collections import defaultdict
from typing import Literal, Optional

Expand All @@ -12,6 +13,8 @@ class SceneManager:

def __init__(self, records: list[LogRecord]):
self._records = records
# Cache timestamps for binary search operations
self._timestamps = [r.get(RecordFields.GAME_TIME_SECS, 0.0) for r in records]
self._scenes = self._build_scene_index()

def _build_scene_index(self) -> dict[str, list[SceneInfo]]:
Expand Down Expand Up @@ -86,15 +89,17 @@ def get_scene_info(self, scene_name: str, instance: int = 0) -> SceneInfo:
return self._scenes[scene_name][instance]

def get_scene_records(self, scene_name: str, instance: int = 0) -> list[LogRecord]:
"""Get all records within a specific scene instance."""
"""Get all records within a specific scene instance using binary search."""
if not self._records:
return []

info = self.get_scene_info(scene_name, instance)
return [
r
for r in self._records
if info.start_game_time_secs
<= r.get(RecordFields.GAME_TIME_SECS, 0)
<= info.end_game_time_secs
]

# Use cached timestamps for binary search
start_idx = bisect.bisect_left(self._timestamps, info.start_game_time_secs)
end_idx = bisect.bisect_right(self._timestamps, info.end_game_time_secs)

return self._records[start_idx:end_idx]

def get_scene_instances(
self,
Expand Down
21 changes: 13 additions & 8 deletions tests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,20 @@ def test_init_with_metadata(self, sample_records):
assert session.metadata["user"] == "test_user"

def test_records_property_returns_copy(self, sample_records):
"""Test that records property returns a copy."""
"""Test that records property returns immutable tuple."""
session = LogSession(sample_records)
records_copy = session.records

# Modify the copy
records_copy[0]["modified"] = True

# Original should be unchanged
assert "modified" not in session.records[0]
records_tuple = session.records

# Verify it's a tuple (immutable)
assert isinstance(records_tuple, tuple)
assert len(records_tuple) == len(sample_records)

# Individual records can be modified for user's use,
# but modifying them doesn't affect internal state
if len(records_tuple) > 0:
test_record = records_tuple[0]
test_record["modified"] = True
assert "modified" not in session.records[0]

def test_metadata_property_returns_copy(self, sample_records):
"""Test that metadata property returns a copy."""
Expand Down
Loading