-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_small_subset.py
More file actions
165 lines (129 loc) · 5.2 KB
/
test_small_subset.py
File metadata and controls
165 lines (129 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
"""
Test converter on a small subset of frames from .8ij file
This allows quick testing without processing the entire file
"""
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from converter_8ij_to_mp4 import Converter8ijToMP4
def test_subset(input_path: Path, output_path: Path, num_frames: int = 100, workers: int = 16):
"""
Test conversion on only the first N frames
Args:
input_path: Path to .8ij file
output_path: Path to output MP4
num_frames: Number of frames to process (default: 100)
workers: Number of CPU workers
"""
print(f"\n{'='*60}")
print(f"Testing converter with SUBSET of frames")
print(f"Input: {input_path}")
print(f"Output: {output_path}")
print(f"Frames to process: {num_frames}")
print(f"{'='*60}\n")
# Create converter
converter = Converter8ijToMP4(num_cpu_workers=workers, gpu_id=0)
# Parse the file structure (fast - just reads headers)
print("Parsing file structure...")
all_frames, info = converter.parse_8ij_file(input_path)
print(f"Total frames in file: {len(all_frames)}")
print(f"Video info: {info.width}x{info.height}, {info.bpp}-bit, {info.fps} fps")
# Limit to first N frames
frames_to_process = all_frames[:num_frames]
print(f"\nProcessing only first {len(frames_to_process)} frames...")
# Temporarily modify the parse method return to only include subset
# We'll do this by creating a modified convert method
converter._test_frames = frames_to_process
converter._test_info = info
# Now convert using the subset
import av
import mmap
from tqdm import tqdm
output_path.parent.mkdir(parents=True, exist_ok=True)
# Setup NVENC encoder
print(f"Initializing NVENC encoder for {info.width}x{info.height} @ {info.fps} fps")
container = av.open(str(output_path), 'w')
stream = container.add_stream('h264_nvenc', rate=info.fps)
stream.width = info.width
stream.height = info.height
stream.pix_fmt = 'yuv420p'
# NVENC options
stream.options = {
'preset': 'p4',
'gpu': '0',
'rc': 'vbr',
'cq': '19',
'b:v': '10M',
'maxrate': '15M',
'bufsize': '20M',
'profile': 'high',
'level': '5.1',
'g': '30',
}
# Process frames
batch_size = 16
start_time = time.time()
with open(input_path, 'rb') as f:
mmapped = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
with tqdm(total=len(frames_to_process), desc="Converting") as pbar:
for batch_start in range(0, len(frames_to_process), batch_size):
batch_end = min(batch_start + batch_size, len(frames_to_process))
batch_frames = frames_to_process[batch_start:batch_end]
# Read JPEG data
jpeg_data_list = []
for frame in batch_frames:
jpeg_data = bytes(mmapped[frame.offset:frame.offset + frame.size])
jpeg_data_list.append(jpeg_data)
# CPU parallel decode
frames_12bit = converter.decode_batch_parallel(jpeg_data_list)
# GPU conversion
frames_8bit = converter.gpu_converter.convert_batch(frames_12bit, info.gamma)
# Encode with NVENC
for frame_8bit in frames_8bit:
frame_av = av.VideoFrame.from_ndarray(frame_8bit, format='rgb24')
frame_av = frame_av.reformat(format='yuv420p')
for packet in stream.encode(frame_av):
container.mux(packet)
pbar.update(len(batch_frames))
# Flush encoder
print("\nFlushing encoder...")
for packet in stream.encode():
container.mux(packet)
container.close()
elapsed = time.time() - start_time
fps = len(frames_to_process) / elapsed
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Frames processed: {len(frames_to_process)}")
print(f" Time: {elapsed:.2f}s")
print(f" Speed: {fps:.1f} fps")
print(f" Output: {output_path}")
print(f" File size: {output_path.stat().st_size / (1024*1024):.1f} MB")
print(f"{'='*60}\n")
return elapsed, fps
def main():
import argparse
parser = argparse.ArgumentParser(description='Test converter on subset of frames')
parser.add_argument('input', type=Path, help='Input .8ij file')
parser.add_argument('output', type=Path, help='Output MP4 file')
parser.add_argument('--frames', type=int, default=100,
help='Number of frames to process (default: 100)')
parser.add_argument('--workers', type=int, default=16,
help='Number of CPU workers (default: 16)')
args = parser.parse_args()
if not args.input.exists():
print(f"Error: Input file {args.input} not found")
return 1
try:
test_subset(args.input, args.output, args.frames, args.workers)
print("✓ Test completed successfully!")
return 0
except Exception as e:
print(f"\n✗ Error during conversion: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit(main())