Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pvm/pvm-host/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ pub struct AccumulateHostContext<S: HostStateProvider> {
pub yielded_accumulate_hash: Option<AccumulationOutputHash>,
/// **`p`**: Provided preimage data
pub provided_preimages: HashSet<(ServiceId, Octets)>,
/// Service ids created during this accumulation
pub created_service_ids: HashSet<ServiceId>,
/// Accumulate entry-point function invocation args (read-only)
pub invoke_args: AccumulateInvokeArgs,
/// Current entropy value (`η0′`)
Expand All @@ -205,7 +203,6 @@ impl<S: HostStateProvider> Clone for AccumulateHostContext<S> {
deferred_transfers: self.deferred_transfers.clone(),
yielded_accumulate_hash: self.yielded_accumulate_hash.clone(),
provided_preimages: self.provided_preimages.clone(),
created_service_ids: self.created_service_ids.clone(),
invoke_args: self.invoke_args.clone(),
curr_entropy: self.curr_entropy.clone(),
}
Expand Down Expand Up @@ -235,7 +232,6 @@ impl<S: HostStateProvider> AccumulateHostContext<S> {
deferred_transfers: Vec::new(),
yielded_accumulate_hash: None,
provided_preimages: HashSet::new(),
created_service_ids: HashSet::new(),
invoke_args,
curr_entropy,
})
Expand Down
1 change: 0 additions & 1 deletion pvm/pvm-host/src/host_functions/accumulate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ impl<S: HostStateProvider> AccumulateHostFunction<S> {
x.rotate_new_account_id(state_provider).await?;
new_service_id
};
x.created_service_ids.insert(new_service_id);

tracing::debug!(
"NEW service_id={new_service_id} parent={}",
Expand Down
1 change: 0 additions & 1 deletion pvm/pvm-host/src/host_functions/general/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ mod fetch_tests {
deferred_transfers: vec![],
yielded_accumulate_hash: None,
provided_preimages: HashSet::new(),
created_service_ids: HashSet::new(),
invoke_args: AccumulateInvokeArgs {
inputs: self.accumulate_inputs.clone(),
..Default::default()
Expand Down
6 changes: 0 additions & 6 deletions pvm/pvm-invocation/src/accumulate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ pub struct AccumulateResult<S: HostStateProvider> {
pub gas_used: UnsignedGas,
/// **`p`**: Provided preimage entries during accumulation
pub provided_preimages: HashSet<(ServiceId, Octets)>,
/// Service ids created during accumulation
pub created_service_ids: HashSet<ServiceId>,
pub accumulate_host: ServiceId,
}

Expand All @@ -61,7 +59,6 @@ impl<S: HostStateProvider> Default for AccumulateResult<S> {
yielded_accumulate_hash: None,
gas_used: UnsignedGas::default(),
provided_preimages: HashSet::new(),
created_service_ids: HashSet::new(),
accumulate_host: ServiceId::default(),
}
}
Expand Down Expand Up @@ -239,7 +236,6 @@ impl<S: HostStateProvider> AccumulateInvocation<S> {
gas_used: result.gas_used,
accumulate_host: x.accumulate_host,
provided_preimages: x.provided_preimages,
created_service_ids: x.created_service_ids,
}))
}
PVMInvocationOutput::OutputUnavailable => Ok(Some(AccumulateResult {
Expand All @@ -249,7 +245,6 @@ impl<S: HostStateProvider> AccumulateInvocation<S> {
gas_used: result.gas_used,
accumulate_host: x.accumulate_host,
provided_preimages: x.provided_preimages,
created_service_ids: x.created_service_ids,
})),
PVMInvocationOutput::OutOfGas(_) | PVMInvocationOutput::Panic(_) => {
Ok(Some(AccumulateResult {
Expand All @@ -259,7 +254,6 @@ impl<S: HostStateProvider> AccumulateInvocation<S> {
gas_used: result.gas_used, // Note: taking gas usage from the `x` context
accumulate_host: x.accumulate_host,
provided_preimages: y.provided_preimages,
created_service_ids: y.created_service_ids,
}))
}
}
Expand Down
92 changes: 70 additions & 22 deletions pvm/pvm-invocation/src/accumulate/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ async fn merge_partial_state_change(
prev_assigns: &AssignServices,
prev_designate: ServiceId,
prev_registrar: ServiceId,
created_service_ids: &HashSet<ServiceId>,
base_present_service_ids: &HashSet<ServiceId>,
base_removed_service_ids: &HashSet<ServiceId>,
new_service_ids_this_round: &mut HashSet<ServiceId>,
removed_service_ids_this_round: &mut HashSet<ServiceId>,
) -> Result<(), PVMInvokeError> {
// Accumulating service sandbox
let accumulate_host_sandbox = partial_state_union
Expand Down Expand Up @@ -297,7 +300,7 @@ async fn merge_partial_state_change(
// Accumulate host state change
*accumulate_host_sandbox = accumulate_result_partial_state
.accounts_sandbox
.get_account_sandbox(state_manager, accumulate_host)
.get_account_sandbox(state_manager.clone(), accumulate_host)
.await?
.cloned()
.ok_or(PVMInvokeError::MissingAccumulateHostSandbox(
Expand All @@ -312,33 +315,52 @@ async fn merge_partial_state_change(
.iter()
.filter(|(&service_id, _)| service_id != accumulate_host)
{
// Check whether the service ID existed in the base state at the start of the current
// parallel accumulation round.
let base_exists = if base_removed_service_ids.contains(&service_id) {
false
} else if base_present_service_ids.contains(&service_id) {
true
} else {
state_manager.account_exists(service_id).await?
};

match sandbox.metadata.status() {
SandboxEntryStatus::Added => {
match partial_state_union.accounts_sandbox.get(&service_id) {
None => {
partial_state_union
.accounts_sandbox
.insert(service_id, sandbox.clone());
}
Some(existing) => {
if matches!(existing.metadata.status(), SandboxEntryStatus::Removed) {
// Allow re-creating a previously removed account.
partial_state_union
.accounts_sandbox
.insert(service_id, sandbox.clone());
} else if created_service_ids.contains(&service_id) {
// If NEW service ids collide, block should be considered invalid.
return Err(PVMInvokeError::DuplicateNewServiceId(service_id));
} else {
// Inherited Added entry from a prior round; skip to avoid overwriting.
}
}
// Not a new ID for this round
if base_exists {
continue;
}
// Reject block if the same service ID is created or removed within the same round
if new_service_ids_this_round.contains(&service_id)
|| removed_service_ids_this_round.contains(&service_id)
{
return Err(PVMInvokeError::DuplicateServiceIdWithinAccumulateRound(
service_id,
));
}
partial_state_union
.accounts_sandbox
.insert(service_id, sandbox.clone());
new_service_ids_this_round.insert(service_id);
}
SandboxEntryStatus::Removed => {
// Ignore removals of accounts that didn't exist at round start
if !base_exists {
continue;
}
// Reject block if the same service ID is created or removed within the same round
if new_service_ids_this_round.contains(&service_id)
|| removed_service_ids_this_round.contains(&service_id)
{
return Err(PVMInvokeError::DuplicateServiceIdWithinAccumulateRound(
service_id,
));
}
partial_state_union
.accounts_sandbox
.insert(service_id, sandbox.clone());
removed_service_ids_this_round.insert(service_id);
}
_ => {}
}
Expand Down Expand Up @@ -419,6 +441,29 @@ async fn accumulate_parallel(
let prev_designate = partial_state_union.designate_service.last_confirmed;
let prev_registrar = partial_state_union.registrar_service.last_confirmed;

// It is not allowed to reuse service IDs (Create/Remove) that were modified
// by other accumulate hosts in the current parallel accumulation round.
// Modifications from previous rounds do not cause such conflicts.
// We track the set of IDs that are present/removed from the base state.
// Also, we track service IDs that are modified in the current round to enforce this rule.
let base_present_service_ids: HashSet<ServiceId> = partial_state_union
.accounts_sandbox
.iter()
.filter_map(|(&service_id, sandbox)| {
(!matches!(sandbox.metadata.status(), SandboxEntryStatus::Removed))
.then_some(service_id)
})
.collect();
let base_removed_service_ids: HashSet<ServiceId> = partial_state_union
.accounts_sandbox
.iter()
.filter_map(|(&service_id, sandbox)| {
matches!(sandbox.metadata.status(), SandboxEntryStatus::Removed).then_some(service_id)
})
.collect();
let mut new_service_ids_this_round = HashSet::new();
let mut removed_service_ids_this_round = HashSet::new();

// Accumulating service groups: 1) With Digests 2) Transfer Receivers 3) Always-accumulates 4) Privileged Services
let services_with_digests: BTreeSet<ServiceId> = reports
.iter()
Expand Down Expand Up @@ -497,7 +542,10 @@ async fn accumulate_parallel(
&prev_assigns,
prev_designate,
prev_registrar,
&accumulate_result.created_service_ids,
&base_present_service_ids,
&base_removed_service_ids,
&mut new_service_ids_this_round,
&mut removed_service_ids_this_round,
)
.await?;

Expand Down
4 changes: 2 additions & 2 deletions pvm/pvm-invocation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ pub enum PVMInvokeError {
WorkReportBlobTooLarge,
#[error("Account sandbox for accumulate host (s={0}) is missing")]
MissingAccumulateHostSandbox(ServiceId),
#[error("Duplicate new service id (s={0})")]
DuplicateNewServiceId(ServiceId),
#[error("Number of work digests exceeds maximum")]
WorkDigestsOverflow,
#[error("Gas usage or gas limit overflowed")]
GasOverflow,
#[error("Zero padded justification data size exceeds SEGMENT_SIZE")]
PagedProofJustificationBlobTooLarge,
#[error("Attempted to add or remove service (s={0}) that was removed or added in the same parallel accumulation round")]
DuplicateServiceIdWithinAccumulateRound(ServiceId),
#[error("JamCodecError: {0}")]
JamCodecError(#[from] JamCodecError),
#[error("CryptoError: {0}")]
Expand Down