Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions pvm/src/pvm/node_pvm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use octez_riscv_data::mode::Normal;
use octez_riscv_data::mode::Provable;
use octez_riscv_data::mode::Prove;
use octez_riscv_data::mode::Verify;
use octez_riscv_data::store::BlobStore;
use octez_riscv_durable_storage::registry::CloneRegistryMode;
use perfect_derive::perfect_derive;
use thiserror::Error;
Expand All @@ -46,6 +47,7 @@ use crate::pvm::hooks::PvmHooks;
use crate::state_backend::proof_backend::proof::Proof;
use crate::state_backend::verify_backend::ProofVerificationFailure;
use crate::storage;
use crate::storage::PersistentBlobStore;
use crate::storage::Repo;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -321,17 +323,11 @@ pub enum PvmStorageError {
PvmError(#[from] PvmError),
}

pub struct PvmStorage {
repo: Repo,
pub struct PvmStorage<BS> {
repo: Repo<BS>,
}

impl PvmStorage {
/// Load or create new repo at `path`.
pub fn load(path: impl AsRef<Path>) -> Result<PvmStorage, PvmStorageError> {
let repo = Repo::load(path)?;
Ok(PvmStorage { repo })
}

impl<BS: BlobStore> PvmStorage<BS> {
pub fn close(self) {
self.repo.close()
}
Expand All @@ -346,13 +342,21 @@ impl PvmStorage {
let pvm = self.repo.checkout_serialised(id)?;
Ok(NodePvm::wrap(pvm))
}
}

impl<BS: PersistentBlobStore> PvmStorage<BS> {
/// Load or create new repo at `path`.
pub fn load(path: impl AsRef<Path>) -> Result<Self, PvmStorageError> {
let repo = Repo::load(path)?;
Ok(Self { repo })
}

/// A snapshot is a new repo to which only `id` has been committed.
pub fn export_snapshot(
&self,
id: &Hash,
path: impl AsRef<Path>,
) -> Result<(), PvmStorageError> {
Ok(self.repo.export_snapshot(id, path)?)
Ok(self.repo.export_snapshot_chunked(id, path)?)
}
}
189 changes: 116 additions & 73 deletions pvm/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use bincode::Encode;
use bincode::error::DecodeError;
use bincode::error::EncodeError;
use octez_riscv_data::hash::Hash;
use octez_riscv_data::hash::HashError;
use octez_riscv_data::hash::HashedData;
use octez_riscv_data::serialisation;
use octez_riscv_data::store::BlobStore;
Expand All @@ -38,14 +37,34 @@ pub enum StorageError {
#[error("Invalid repo")]
InvalidRepo,

#[error("Hashing error")]
HashError(#[from] HashError),

#[error("Data for hash {0} not found")]
NotFound(String),

#[error("Committed chunk {0} not found")]
ChunkNotFound(String),

#[error("Blob store error")]
BlobStore(#[from] BlobStoreError),
}

/// A subtrait for `BlobStore` to provide extra functionality required by the PVM storage to export
/// PVM snapshots.
pub trait PersistentBlobStore: BlobStore {
/// Initialise a store. Either create a new directory if `path` does not exist or initialise in
/// an existing directory.
///
/// Should throw [`StorageError::InvalidRepo`] if `path` is a file.
fn init_from_path(path: impl AsRef<Path>) -> Result<Self, StorageError>
where
Self: Sized;

/// Copy a specific blob across to a different store. While this should be functionally
/// equivalent to using `blob_get` followed by `blob_set` to copy the blob across, it could in
/// many impls be more efficient (especially for large blobs) by using `std::fs::copy` or
/// equivalent instead.
fn export_blob(&self, other: &mut Self, hash: &Hash) -> Result<(), StorageError> {
let blob = self.blob_get(*hash)?;
other.blob_set(&HashedData::from_data(blob.as_ref()))?;

Ok(())
}
}

#[derive(Debug, PartialEq)]
Expand All @@ -54,21 +73,6 @@ pub struct Store {
}

impl Store {
/// Initialise a store. Either create a new directory if `path` does not
/// exist or initialise in an existing directory.
/// Throws `StorageError::InvalidRepo` if `path` is a file.
pub fn init(path: impl AsRef<Path>) -> Result<Self, StorageError> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
std::fs::create_dir(&path)?;
} else if path.metadata()?.is_file() {
return Err(StorageError::InvalidRepo);
}
Ok(Store {
path: path.into_boxed_path(),
})
}

fn file_name_of_hash(hash: &Hash) -> String {
hex::encode(hash)
}
Expand Down Expand Up @@ -98,23 +102,25 @@ impl Store {
self.write_data_if_new(file_name, data)?;
Ok(hash)
}
}

/// Load data corresponding to `hash`, if found.
pub fn load(&self, hash: &Hash) -> Result<Vec<u8>, StorageError> {
let file_name = self.path_of_hash(hash);
std::fs::read(file_name).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
StorageError::NotFound(hex::encode(hash))
} else {
StorageError::IoError(e)
}
impl PersistentBlobStore for Store {
fn init_from_path(path: impl AsRef<Path>) -> Result<Self, StorageError> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
std::fs::create_dir(&path)?;
} else if path.metadata()?.is_file() {
return Err(StorageError::InvalidRepo);
}

Ok(Store {
Comment thread
emturner marked this conversation as resolved.
path: path.into_boxed_path(),
})
}

/// Copy the data corresponding to `hash` to `path`.
pub fn copy(&self, hash: &Hash, path: impl AsRef<Path>) -> Result<(), StorageError> {
fn export_blob(&self, other: &mut Self, hash: &Hash) -> Result<(), StorageError> {
let source_path = self.path_of_hash(hash);
let target_path = path.as_ref().join(Self::file_name_of_hash(hash));
let target_path = other.path_of_hash(hash);
std::fs::copy(source_path, target_path)?;
Ok(())
}
Expand Down Expand Up @@ -150,61 +156,115 @@ impl BlobStore for Store {
}

#[derive(Debug, PartialEq)]
pub struct Repo {
backend: Store,
pub struct Repo<BS> {
backend: BS,
}

impl Repo {
impl<BS: PersistentBlobStore> Repo<BS> {
/// Load or create new repo at `path`.
pub fn load(path: impl AsRef<Path>) -> Result<Repo, StorageError> {
pub fn load(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Repo {
backend: Store::init(path)?,
backend: BS::init_from_path(path)?,
})
}

/// A snapshot is a new repo to which only `id` has been committed. This method exports an `id`
/// assuming that it represents a Merkle tree of uniform depth one (the result of using
/// `commit_serialised`, which chunks the serialisation).
pub fn export_snapshot_chunked(
&self,
id: &Hash,
path: impl AsRef<Path>,
) -> Result<(), StorageError> {
// Only export a snapshot to a new or empty directory
let path = path.as_ref();
if !path.exists() || path.read_dir()?.next().is_none() {
std::fs::create_dir_all(path)?;
} else {
return Err(StorageError::InvalidRepo);
};
let mut other = BS::init_from_path(path)?;
let bytes = self.backend.blob_get(*id)?;
let commit: Vec<Hash> = serialisation::deserialise(bytes.as_ref())?;
Comment thread
emturner marked this conversation as resolved.
for chunk in commit {
self.backend.export_blob(&mut other, &chunk)?;
}
self.backend.export_blob(&mut other, id)?;
Comment thread
thomasathorne marked this conversation as resolved.
Ok(())
}

/// A snapshot is a new repo to which only `id` has been committed. This method exports an `id`
/// which represents any Merkle tree structure.
///
/// Currently unimplemented.
///
/// TODO (TZX-121)
pub fn export_snapshot_folded(
&self,
_id: &Hash,
_path: impl AsRef<Path>,
) -> Result<(), StorageError> {
todo!()
}
}

impl<BS: BlobStore> Repo<BS> {
pub fn new(store: BS) -> Self {
Repo { backend: store }
}

pub fn close(self) {}

/// Create a new commit for `bytes` and return the commit id.
pub fn commit(&mut self, bytes: &[u8]) -> Result<Hash, StorageError> {
/// Create a new commit for `bytes` and return the commit id.
pub fn commit(&self, bytes: &[u8]) -> Result<Hash, StorageError> {
let mut commit = Vec::with_capacity(bytes.len().div_ceil(CHUNK_SIZE) * Hash::DIGEST_SIZE);

for chunk in bytes.chunks(CHUNK_SIZE) {
let chunk_hash = self.backend.store(chunk)?;
commit.push(chunk_hash);
let hashed_chunk = HashedData::from_data(chunk);
self.backend.blob_set(&hashed_chunk)?;
commit.push(hashed_chunk.hash());
}

// A commit contains the list of all chunks needed to reconstruct `data`.
let commit_bytes = serialisation::serialise(&commit)?;
self.backend.store(&commit_bytes)
let hashed_commit_bytes = HashedData::from_data(commit_bytes);
self.backend.blob_set(&hashed_commit_bytes)?;

Ok(hashed_commit_bytes.hash())
}

/// Commit something serialisable and return the commit ID.
pub fn commit_serialised(&mut self, subject: &impl Encode) -> Result<Hash, StorageError> {
pub fn commit_serialised(&self, subject: &impl Encode) -> Result<Hash, StorageError> {
let chunk_hashes = {
let mut writer = chunked_io::ChunkWriter::new(&mut self.backend);
let mut writer = chunked_io::ChunkWriter::new(&self.backend);
serialisation::serialise_into(subject, &mut writer)?;
writer.finalise()?
};

// A commit contains the list of all chunks needed to reconstruct the underlying data.
let commit_bytes = serialisation::serialise(&chunk_hashes)?;
self.backend.store(&commit_bytes)
let hashed_commit_bytes = HashedData::from_data(commit_bytes);
self.backend.blob_set(&hashed_commit_bytes)?;

Ok(hashed_commit_bytes.hash())
}

/// Checkout the bytes committed under `id`, if the commit exists.
pub fn checkout(&self, id: &Hash) -> Result<Vec<u8>, StorageError> {
let bytes = self.backend.load(id)?;
let commit: Vec<Hash> = serialisation::deserialise(&bytes)?;
let bytes = self.backend.blob_get(*id)?;

let commit: Vec<Hash> = serialisation::deserialise(bytes.as_ref())?;
let mut bytes = Vec::new();

for hash in commit {
let mut chunk = self.backend.load(&hash).map_err(|e| {
if let StorageError::NotFound(hash) = e {
StorageError::ChunkNotFound(hash)
let chunk = self.backend.blob_get(hash).map_err(|e| {
if let BlobStoreError::NotFound(hash) = e {
StorageError::ChunkNotFound(hash.to_string())
} else {
e
StorageError::BlobStore(e)
}
})?;
bytes.append(&mut chunk);
bytes.extend_from_slice(chunk.as_ref());
}
Ok(bytes)
}
Expand All @@ -214,24 +274,6 @@ impl Repo {
let mut reader = chunked_io::ChunkedReader::new(&self.backend, id)?;
Ok(serialisation::deserialise_from(&mut reader)?)
}

/// A snapshot is a new repo to which only `id` has been committed.
pub fn export_snapshot(&self, id: &Hash, path: impl AsRef<Path>) -> Result<(), StorageError> {
// Only export a snapshot to a new or empty directory
let path = path.as_ref();
if !path.exists() || path.read_dir()?.next().is_none() {
std::fs::create_dir_all(path)?;
} else {
return Err(StorageError::InvalidRepo);
};
let bytes = self.backend.load(id)?;
let commit: Vec<Hash> = serialisation::deserialise(&bytes)?;
for chunk in commit {
self.backend.copy(&chunk, path)?;
}
self.backend.copy(id, path)?;
Ok(())
}
}

#[cfg(test)]
Expand All @@ -241,12 +283,13 @@ mod tests {
use octez_riscv_data::store::BlobStore;
use octez_riscv_data::store::BlobStoreError;

use super::PersistentBlobStore;
use super::Store;

#[test]
fn blob_store_test() {
let tmp_dir = tempfile::tempdir().unwrap();
let store = Store::init(tmp_dir.path()).unwrap();
let store = Store::init_from_path(tmp_dir.path()).unwrap();

let data1: &[u8] = &[3, 4, 5, 6, 8];
let data2: &[u8] = b"Hi";
Expand Down
Loading
Loading