-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_usage.py
More file actions
163 lines (132 loc) · 5.58 KB
/
basic_usage.py
File metadata and controls
163 lines (132 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
"""
Basic usage example for bwell-logkit.
This example demonstrates the core functionality:
- Loading a bWell log file
- Basic filtering operations
- Data export to different formats
"""
from pathlib import Path
from bwell_logkit import load_all_logs, load_log
from bwell_logkit.types import MovementSources, RecordTypes
def main():
# Replace with your actual log file path
log_file = Path("path/to/sample_log.json")
# Check if file exists (you'll need to provide your own log file)
if not log_file.exists():
print(f"Log file not found: {log_file}")
print("Please provide a valid bWell log file to run this example.")
return
print("=== Basic bwell-logkit Usage Example ===\n")
# 1. Load the log file
print("1a. Loading log file...")
session = load_log(log_file)
print(f" Loaded {len(session)} records")
# Alternatively, load all logs from a directory recursively and in parallel
print("1b. Loading all logs from a directory...")
sessions = load_all_logs(
"path/to",
file_pattern="*.json", # Custom file pattern
max_workers=4, # Limit parallel workers
skip_errors=False, # Skip files that fail to load
)
for relative_path, session in sessions.items():
print(f" {relative_path}: {len(session)} records")
# 2. Get basic statistics
print("\n2. Session statistics:")
stats = session.get_stats()
print(f" Record types: {list(stats['record_types'].keys())}")
if stats["game_time_range"]:
start, end = stats["game_time_range"]
print(
f" Game time range: {start:.2f} - {end:.2f} seconds ("
f"{end - start:.2f}s total)"
)
# 3. Filter by record type
print("\n3. Filtering by record type...")
movement_session = session.filter_type(RecordTypes.ABSOLUTE_ACTIVITY)
print(f" Movement records: {len(movement_session)}")
settings_session = session.filter_type(RecordTypes.GAME_SETTINGS)
print(f" Game settings records: {len(settings_session)}")
# 4. Filter by custom filter
print("\n4. Filtering by custom filter (head senderTag)...")
head_movement = movement_session.filter(
lambda r: r.get("senderTag") == MovementSources.HEAD
)
print(f" Head tracking records: {len(head_movement)}")
# 5. Time-based filtering
print("\n5. Time-based filtering...")
if len(session) > 0:
# Filter first 30 seconds
early_session = session.filter_time_range(0, 30)
print(f" First 30 seconds: {len(early_session)} records")
# Filter a middle portion
mid_session = session.filter_time_range(30, 60)
print(f" 30-60 seconds: {len(mid_session)} records")
# 6. Scene analysis
print("\n6. Scene analysis...")
scenes = session.list_scenes()
print(f" Available scenes: {scenes}")
if scenes:
# Get scene summary
scene_summary = session.scene_manager.get_scene_summary()
for scene_name, info in scene_summary.items():
print(
f" {scene_name}: {info['instance_count']} instance(s), "
f"total duration: {info['total_duration_secs']:.2f}s"
)
# 7. Export data
print("\n7. Exporting data...")
# Export to pandas DataFrame
print(" Converting to pandas DataFrame...")
df = session.to_pandas()
print(f" DataFrame shape: {df.shape}")
if len(df) > 0:
print(f" Columns: {list(df.columns)}")
# Export to CSV (uncomment to create files)
# print(" Exporting to CSV...")
# session.export_csv("full_session.csv")
# if len(movement_session) > 0:
# movement_session.export_csv("movement_data.csv")
# 8. Chained filtering example
print("\n8. Chained filtering example...")
complex_filter = (
session.filter_type(RecordTypes.ABSOLUTE_ACTIVITY)
.filter_time_range(10, 120) # 10-120 seconds
.filter(lambda r: r.get("senderTag") == MovementSources.HEAD)
)
print(f" Head movement in 10-120s range: {len(complex_filter)} records")
# 9. View all scene instances in chronological order
print("\n9. Viewing all scene instances in chronological order...")
scene_instances = session.scene_manager.get_all_scene_instances_chron()
for scene_instance in scene_instances:
name = scene_instance.name
index = scene_instance.instance
print(f" {name}: {index}")
# 10. Adding participant information to metadata
print("\n10. Adding participant information to metadata...")
session.set_metadata("participant_id", "P001")
session.set_metadata("participant_age", 25)
session.set_metadata("study_condition", "control")
# Or update multiple metadata fields at once
participant_info = {
"participant_id": "P001",
"participant_age": 25,
"study_condition": "control",
"session_date": "2024-01-15",
"researcher": "Dr. Smith",
}
session.update_metadata(participant_info)
# Access the metadata
print("Session metadata:", session.metadata)
# Metadata is preserved when filtering
filtered_session = session.filter_type(RecordTypes.ABSOLUTE_ACTIVITY)
print("Filtered session still has metadata:", filtered_session.metadata)
# Scene views also inherit and can modify metadata
if session.list_scenes():
scene = session.scene(session.list_scenes()[0])
scene.set_metadata("scene_notes", "Participant seemed engaged")
print("Scene metadata:", scene.metadata)
print("\n=== Example completed successfully! ===")
if __name__ == "__main__":
main()