|
1 | 1 | use std::collections::VecDeque; |
2 | 2 | use std::fs::{File as StdFile, OpenOptions}; |
3 | | -use std::io::{BufReader, BufWriter, Read, Write}; |
| 3 | +use std::io::{self, BufReader, BufWriter, Read, Write}; |
4 | 4 | use std::path::{Path, PathBuf}; |
5 | 5 | use std::sync::Arc; |
6 | 6 | use std::time::Instant; |
7 | 7 |
|
| 8 | +use cozip_util::{ParallelFileReader, ParallelFileReaderOptions, ParallelReadHandle}; |
8 | 9 | use thiserror::Error; |
9 | 10 | use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; |
10 | 11 |
|
@@ -137,6 +138,124 @@ impl<R: AsyncRead + Unpin> AsyncStream<R> { |
137 | 138 | } |
138 | 139 | } |
139 | 140 |
|
| 141 | +struct ParallelPrefetchReader { |
| 142 | + reader: ParallelFileReader, |
| 143 | + file_len: u64, |
| 144 | + next_submit_offset: u64, |
| 145 | + request_size: usize, |
| 146 | + max_inflight_ops: usize, |
| 147 | + max_inflight_bytes: usize, |
| 148 | + inflight_bytes: usize, |
| 149 | + inflight: VecDeque<(ParallelReadHandle, usize)>, |
| 150 | + current: Vec<u8>, |
| 151 | + current_pos: usize, |
| 152 | +} |
| 153 | + |
| 154 | +impl ParallelPrefetchReader { |
| 155 | + fn new( |
| 156 | + file: StdFile, |
| 157 | + chunk_size: usize, |
| 158 | + options: ParallelFileReaderOptions, |
| 159 | + ) -> Result<Self, CozipDeflateError> { |
| 160 | + let file_len = file.metadata()?.len(); |
| 161 | + let request_size = chunk_size.max(1); |
| 162 | + let max_inflight_ops = if options.max_inflight_ops > 0 { |
| 163 | + options.max_inflight_ops |
| 164 | + } else { |
| 165 | + let by_bytes = options.max_backlog_bytes.max(request_size) / request_size; |
| 166 | + by_bytes.clamp(64, 4096) |
| 167 | + }; |
| 168 | + let max_inflight_bytes = options.max_backlog_bytes.max(request_size); |
| 169 | + let reader = |
| 170 | + ParallelFileReader::new(file, options).map_err(|error| io::Error::other(error.to_string()))?; |
| 171 | + let mut this = Self { |
| 172 | + reader, |
| 173 | + file_len, |
| 174 | + next_submit_offset: 0, |
| 175 | + request_size, |
| 176 | + max_inflight_ops, |
| 177 | + max_inflight_bytes, |
| 178 | + inflight_bytes: 0, |
| 179 | + inflight: VecDeque::new(), |
| 180 | + current: Vec::new(), |
| 181 | + current_pos: 0, |
| 182 | + }; |
| 183 | + this.fill_prefetch()?; |
| 184 | + Ok(this) |
| 185 | + } |
| 186 | + |
| 187 | + fn fill_prefetch(&mut self) -> io::Result<()> { |
| 188 | + while self.inflight.len() < self.max_inflight_ops |
| 189 | + && self.inflight_bytes < self.max_inflight_bytes |
| 190 | + && self.next_submit_offset < self.file_len |
| 191 | + { |
| 192 | + let remaining = self.file_len.saturating_sub(self.next_submit_offset); |
| 193 | + let mut len = |
| 194 | + usize::try_from(remaining.min(self.request_size as u64)).unwrap_or(self.request_size); |
| 195 | + let available_budget = self.max_inflight_bytes.saturating_sub(self.inflight_bytes); |
| 196 | + if len > available_budget && available_budget > 0 { |
| 197 | + len = available_budget.min(len); |
| 198 | + } |
| 199 | + if len == 0 { |
| 200 | + break; |
| 201 | + } |
| 202 | + let handle = self |
| 203 | + .reader |
| 204 | + .submit(self.next_submit_offset, len) |
| 205 | + .map_err(|error| io::Error::other(error.to_string()))?; |
| 206 | + self.inflight.push_back((handle, len)); |
| 207 | + self.inflight_bytes = self.inflight_bytes.saturating_add(len); |
| 208 | + self.next_submit_offset = self.next_submit_offset.saturating_add(len as u64); |
| 209 | + } |
| 210 | + Ok(()) |
| 211 | + } |
| 212 | + |
| 213 | + fn finish(self) -> Result<(), CozipDeflateError> { |
| 214 | + self.reader |
| 215 | + .drain() |
| 216 | + .map_err(|error| io::Error::other(error.to_string()).into()) |
| 217 | + } |
| 218 | +} |
| 219 | + |
| 220 | +impl Read for ParallelPrefetchReader { |
| 221 | + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 222 | + if buf.is_empty() { |
| 223 | + return Ok(0); |
| 224 | + } |
| 225 | + |
| 226 | + let mut written = 0usize; |
| 227 | + loop { |
| 228 | + if self.current_pos >= self.current.len() { |
| 229 | + let Some((handle, len)) = self.inflight.pop_front() else { |
| 230 | + return Ok(written); |
| 231 | + }; |
| 232 | + self.inflight_bytes = self.inflight_bytes.saturating_sub(len); |
| 233 | + self.current = handle |
| 234 | + .recv() |
| 235 | + .map_err(|error| io::Error::other(error.to_string()))?; |
| 236 | + self.current_pos = 0; |
| 237 | + self.fill_prefetch()?; |
| 238 | + if self.current.is_empty() { |
| 239 | + if written > 0 { |
| 240 | + return Ok(written); |
| 241 | + } |
| 242 | + continue; |
| 243 | + } |
| 244 | + } |
| 245 | + |
| 246 | + let available = self.current.len().saturating_sub(self.current_pos); |
| 247 | + let take = available.min(buf.len().saturating_sub(written)); |
| 248 | + buf[written..written + take] |
| 249 | + .copy_from_slice(&self.current[self.current_pos..self.current_pos + take]); |
| 250 | + self.current_pos = self.current_pos.saturating_add(take); |
| 251 | + written = written.saturating_add(take); |
| 252 | + if written == buf.len() { |
| 253 | + return Ok(written); |
| 254 | + } |
| 255 | + } |
| 256 | + } |
| 257 | +} |
| 258 | + |
140 | 259 | #[derive(Debug, Clone, Copy, Default)] |
141 | 260 | pub struct CoZipDeflateInitStats { |
142 | 261 | pub gpu_context_init_ms: f64, |
@@ -200,6 +319,10 @@ impl CoZipDeflate { |
200 | 319 | self.options.parallel_write_threads |
201 | 320 | } |
202 | 321 |
|
| 322 | + pub fn parallel_read_threads(&self) -> usize { |
| 323 | + self.options.parallel_read_threads |
| 324 | + } |
| 325 | + |
203 | 326 | pub fn compress_stream<R: Read + Send, W: Write>( |
204 | 327 | &self, |
205 | 328 | reader: &mut R, |
@@ -431,6 +554,22 @@ impl CoZipDeflate { |
431 | 554 | .map_err(map_pdeflate_error) |
432 | 555 | } |
433 | 556 |
|
| 557 | + pub fn compress_file_parallel_read_with_options( |
| 558 | + &self, |
| 559 | + input_file: StdFile, |
| 560 | + output_file: StdFile, |
| 561 | + stream_options: StreamOptions, |
| 562 | + reader_options: ParallelFileReaderOptions, |
| 563 | + ) -> Result<PDeflateStats, CozipDeflateError> { |
| 564 | + let chunk_size = self.options.chunk_size.max(1); |
| 565 | + let mut reader = ParallelPrefetchReader::new(input_file, chunk_size, reader_options)?; |
| 566 | + let mut writer = BufWriter::new(output_file); |
| 567 | + let stats = self.compress_stream_with_options(&mut reader, &mut writer, stream_options)?; |
| 568 | + writer.flush()?; |
| 569 | + reader.finish()?; |
| 570 | + Ok(stats) |
| 571 | + } |
| 572 | + |
434 | 573 | pub fn decompress_file_from_name<PIn: AsRef<Path>, POut: AsRef<Path>>( |
435 | 574 | &self, |
436 | 575 | input_path: PIn, |
|
0 commit comments