Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/isar/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,14 @@ packages:
path: "../ndk"
relative: true
source: path
version: "0.6.1-dev.7"
version: "0.7.1-dev.0"
ndk_cache_manager_test_suite:
dependency: "direct dev"
description:
path: "../ndk_cache_manager_test_suite"
relative: true
source: path
version: "1.0.0-dev.2"
version: "1.0.1-dev.0"
node_preamble:
dependency: transitive
description:
Expand Down Expand Up @@ -595,10 +595,10 @@ packages:
dependency: transitive
description:
name: web
sha256: cd3543bd5798f6ad290ea73d210f423502e71900302dde696f8bff84bf89a1cb
sha256: "868d88a33d8a87b18ffc05f9f030ba328ffefba92d6c127917a2ba740f9cfe4a"
url: "https://pub.dev"
source: hosted
version: "1.1.0"
version: "1.1.1"
web_socket:
dependency: transitive
description:
Expand Down
120 changes: 120 additions & 0 deletions packages/ndk/lib/data_layer/data_sources/http_request.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:http/http.dart' as http;
import 'package:rxdart/rxdart.dart';

/// Upload progress information
class UploadProgress {
final int sentBytes;
final int totalBytes;
final bool isComplete;
final http.Response? response;
final Object? error;

UploadProgress({
required this.sentBytes,
required this.totalBytes,
this.isComplete = false,
this.response,
this.error,
});

double get progress => totalBytes > 0 ? sentBytes / totalBytes : 0;
double get percentage => progress * 100;
}

/// Data source for making http requests
class HttpRequestDS {
Expand Down Expand Up @@ -42,6 +63,84 @@ class HttpRequestDS {
return response;
}

/// Upload data using streaming to avoid loading entire file into memory
/// Returns a stream of [UploadProgress] that emits progress updates and completes when upload is done
/// Uses BehaviorSubject so new listeners get the latest progress immediately
Stream<UploadProgress> putStream({
required Uri url,
required Stream<List<int>> body,
required Map<String, String> headers,
int? contentLength,
}) {
final progressSubject = BehaviorSubject<UploadProgress>.seeded(
UploadProgress(sentBytes: 0, totalBytes: contentLength ?? 0),
);

() async {
try {
final request = http.StreamedRequest('PUT', url);

// Add headers
request.headers.addAll(headers);

// Set content length if provided (required by some servers)
if (contentLength != null) {
request.contentLength = contentLength;
}

// Track progress
int bytesSent = 0;
final totalBytes = contentLength ?? 0;

final progressStream = body.map((chunk) {
bytesSent += chunk.length;
progressSubject.add(UploadProgress(
sentBytes: bytesSent,
totalBytes: totalBytes,
));
return chunk;
});

// Pipe the stream to the request
progressStream.listen(
request.sink.add,
onError: (error) {
request.sink.addError(error);
progressSubject.addError(error);
},
onDone: request.sink.close,
cancelOnError: true,
);

// Send the request and get response
final streamedResponse = await _client.send(request);
final response = await http.Response.fromStream(streamedResponse);

if (response.statusCode != 200) {
final error = Exception(
"error fetching STATUS: ${response.statusCode}, Link: $url");
progressSubject.addError(error);
await progressSubject.close();
return;
}

// Upload complete
progressSubject.add(UploadProgress(
sentBytes: totalBytes,
totalBytes: totalBytes,
isComplete: true,
response: response,
));
await progressSubject.close();
} catch (error) {
progressSubject.addError(error);
await progressSubject.close();
}
}();

return progressSubject.stream;
}

Future<http.Response> post({
required Uri url,
required Uint8List body,
Expand Down Expand Up @@ -95,6 +194,27 @@ class HttpRequestDS {
return response;
}

/// Get data as a stream to avoid loading entire file into memory
/// Returns a stream of bytes that can be written directly to a file
Stream<List<int>> getStream({
required Uri url,
Map<String, String>? headers,
}) async* {
final request = http.Request('GET', url);
if (headers != null) {
request.headers.addAll(headers);
}

final streamedResponse = await _client.send(request);

if (streamedResponse.statusCode != 200) {
throw Exception(
"error fetching STATUS: ${streamedResponse.statusCode}, Link: $url");
}

yield* streamedResponse.stream;
}

Future<http.Response> delete({
required Uri url,
required headers,
Expand Down
32 changes: 32 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import 'dart:typed_data';

import '../../domain_layer/entities/file_hash_progress.dart';

/// Platform-agnostic file I/O interface for reading and writing files
/// Implementations use dart:io for native and dart:html for web
abstract class FileIO {
/// Reads a file in chunks and returns a stream of bytes
/// [chunkSize] determines the size of each chunk (default 8KB)
Stream<Uint8List> readFileAsStream(String filePath, {int chunkSize = 8192});

/// Writes bytes to a file at the given path
/// Creates the file if it doesn't exist, overwrites if it does
Future<void> writeFile(String filePath, Uint8List data);

/// Writes a stream of bytes to a file at the given path
/// Creates the file if it doesn't exist, overwrites if it does
Future<void> writeFileStream(String filePath, Stream<Uint8List> dataStream);

/// Gets the size of a file in bytes
Future<int> getFileSize(String filePath);

/// Checks if a file exists at the given path
Future<bool> fileExists(String filePath);

/// Reads entire file into memory as Uint8List
Future<Uint8List> readFile(String filePath);

/// Computes SHA256 hash of a file by reading it in chunks
/// Returns stream updates with progress and a final event containing [FileHashProgress.hash]
Stream<FileHashProgress> computeFileHash(String filePath);
}
4 changes: 4 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io_factory_native.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import 'file_io.dart';
import 'file_io_native.dart';

FileIO createFileIO() => FileIONative();
6 changes: 6 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io_factory_stub.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import 'file_io.dart';

/// Stub implementation used when neither `dart:io` nor web interop is available.
FileIO createFileIO() {
throw UnsupportedError('Cannot create FileIO on this platform');
}
4 changes: 4 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io_factory_web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import 'file_io.dart';
import 'file_io_web.dart';

FileIO createFileIO() => FileIOWeb();
112 changes: 112 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io_native.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import 'dart:io';
import 'dart:typed_data';

import 'package:crypto/crypto.dart';

import '../../domain_layer/entities/file_hash_progress.dart';
import '../../shared/isolates/isolate_manager.dart';
import 'file_io.dart';

/// Native platform implementation using dart:io
/// Works on: Windows, macOS, Linux, Android, iOS
class FileIONative implements FileIO {
@override
Stream<Uint8List> readFileAsStream(String filePath, {int chunkSize = 8192}) {
final file = File(filePath);
return file.openRead().map((chunk) => Uint8List.fromList(chunk));
}

@override
Future<void> writeFile(String filePath, Uint8List data) async {
final file = File(filePath);
await file.create(recursive: true);
await file.writeAsBytes(data);
}

@override
Future<void> writeFileStream(
String filePath, Stream<Uint8List> dataStream) async {
final file = File(filePath);
await file.create(recursive: true);
final sink = file.openWrite();

await for (final chunk in dataStream) {
sink.add(chunk);
}

await sink.flush();
await sink.close();
}

@override
Future<int> getFileSize(String filePath) async {
final file = File(filePath);
return await file.length();
}

@override
Future<bool> fileExists(String filePath) async {
final file = File(filePath);
return await file.exists();
}

@override
Future<Uint8List> readFile(String filePath) async {
final file = File(filePath);
return await file.readAsBytes();
}

@override
Stream<FileHashProgress> computeFileHash(String filePath) {
return IsolateManager.instance
.runInComputeIsolateStream<String, FileHashProgress>(
_computeFileHashTask, filePath);
}
}

Future<void> _computeFileHashTask(
String filePath,
void Function(FileHashProgress progress) emit,
) async {
final file = File(filePath);
final totalBytes = await file.length();

final digestSink = _DigestSink();
final input = sha256.startChunkedConversion(digestSink);

int processedBytes = 0;
emit(FileHashProgress(
processedBytes: processedBytes,
totalBytes: totalBytes,
));

await for (final chunk in file.openRead()) {
input.add(chunk);
processedBytes += chunk.length;
emit(FileHashProgress(
processedBytes: processedBytes,
totalBytes: totalBytes,
));
}

input.close();

emit(FileHashProgress(
processedBytes: totalBytes,
totalBytes: totalBytes,
isComplete: true,
hash: digestSink.value?.toString(),
));
}

class _DigestSink implements Sink<Digest> {
Digest? value;

@override
void add(Digest data) {
value = data;
}

@override
void close() {}
}
11 changes: 11 additions & 0 deletions packages/ndk/lib/data_layer/io/file_io_platform.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export 'file_io.dart';

/// Exports the correct FileIO implementation based on platform.
export 'file_io_stub.dart'
if (dart.library.io) 'file_io_native.dart'
if (dart.library.js_interop) 'file_io_web.dart';

/// Exposes `createFileIO()` implemented per platform.
export 'file_io_factory_stub.dart'
if (dart.library.io) 'file_io_factory_native.dart'
if (dart.library.js_interop) 'file_io_factory_web.dart';
Loading