Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/bootstrap_mtc_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ impl LogEntry for BootstrapMtcLogEntry {
const REQUIRE_CHECKPOINT_TIMESTAMP: bool = false;
type Pending = BootstrapMtcPendingLogEntry;
type ParseError = MtcError;
type Metadata = SequenceMetadata;

fn make_metadata(
leaf_index: LeafIndex,
timestamp: UnixTimestamp,
_old_tree_size: u64,
_new_tree_size: u64,
) -> Self::Metadata {
(leaf_index, timestamp)
}

fn initial_entry() -> Option<Self::Pending> {
Some(Self::Pending {
Expand All @@ -184,7 +194,7 @@ impl LogEntry for BootstrapMtcLogEntry {
})
}

fn new(pending: Self::Pending, metadata: SequenceMetadata) -> Self {
fn new(pending: Self::Pending, metadata: Self::Metadata) -> Self {
Self(TlogTilesLogEntry::new(pending.entry, metadata))
}

Expand Down
4 changes: 2 additions & 2 deletions crates/bootstrap_mtc_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use generic_log_worker::{get_durable_object_name, BatcherConfig, GenericBatcher,
use worker::*;

#[durable_object(fetch)]
struct Batcher(GenericBatcher);
struct Batcher(GenericBatcher<tlog_tiles::SequenceMetadata>);

impl DurableObject for Batcher {
fn new(state: State, env: Env) -> Self {
Expand All @@ -25,7 +25,7 @@ impl DurableObject for Batcher {
enable_dedup: false, // deduplication is not currently supported
location_hint: params.location_hint.clone(),
};
Batcher(GenericBatcher::new(state, env, config))
Batcher(GenericBatcher::<tlog_tiles::SequenceMetadata>::new(state, env, config))
}

async fn fetch(&self, req: Request) -> Result<Response> {
Expand Down
2 changes: 1 addition & 1 deletion crates/bootstrap_mtc_worker/src/sequencer_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
let params = &CONFIG.logs[name];
let bucket = load_public_bucket(env, name).unwrap();
Box::new(
move |old_time: UnixTimestamp, new_time: UnixTimestamp, new_checkpoint_bytes: &[u8]| {
move |old_time: UnixTimestamp, new_time: UnixTimestamp, _old_tree_size: u64, _new_tree_size: u64, new_checkpoint_bytes: &[u8]| {
let new_checkpoint = {
// TODO: Make more efficient. There are two unnecessary allocations here.

Expand Down
4 changes: 2 additions & 2 deletions crates/ct_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use generic_log_worker::{get_durable_object_name, BatcherConfig, GenericBatcher,
use worker::*;

#[durable_object(fetch)]
struct Batcher(GenericBatcher);
struct Batcher(GenericBatcher<tlog_tiles::SequenceMetadata>);

impl DurableObject for Batcher {
fn new(state: State, env: Env) -> Self {
Expand All @@ -25,7 +25,7 @@ impl DurableObject for Batcher {
enable_dedup: params.enable_dedup,
location_hint: params.location_hint.clone(),
};
Batcher(GenericBatcher::new(state, env, config))
Batcher(GenericBatcher::<tlog_tiles::SequenceMetadata>::new(state, env, config))
}

async fn fetch(&self, req: Request) -> Result<Response> {
Expand Down
33 changes: 20 additions & 13 deletions crates/generic_log_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.

use crate::{
deserialize, get_durable_object_stub, load_cache_kv, obs, serialize, LookupKey,
SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
deserialize, get_durable_object_stub, load_cache_kv, obs, serialize, CacheSerialize,
LookupKey, BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
};
use serde::{de::DeserializeOwned, Serialize};
use base64::prelude::*;
use futures_util::future::{join_all, select, Either};
use std::{
Expand All @@ -23,12 +24,17 @@ use worker::kv::KvStore;
#[allow(clippy::wildcard_imports)]
use worker::*;

pub struct GenericBatcher {
/// A Durable Object that buffers incoming log entries and submits them to the
/// Sequencer in batches.
///
/// `M` is the sequence metadata type produced for each entry after sequencing
/// (i.e., [`LogEntry::Metadata`](tlog_tiles::LogEntry::Metadata)).
pub struct GenericBatcher<M> {
Comment thread
lukevalenta marked this conversation as resolved.
env: Env,
config: BatcherConfig,
state: State,
kv: Option<KvStore>,
batch: RefCell<Batch>,
batch: RefCell<Batch<M>>,
in_flight: RefCell<usize>,
processed: RefCell<usize>,
wshim: Option<obs::Wshim>,
Expand All @@ -42,14 +48,15 @@ pub struct BatcherConfig {
pub location_hint: Option<String>,
}

// A batch of entries to be submitted to the Sequencer together.
struct Batch {
// A batch of entries to be submitted to the Sequencer together. `M` is the
// sequence metadata type returned to waiters once the batch is sequenced.
struct Batch<M> {
Comment thread
lukevalenta marked this conversation as resolved.
entries: Vec<PendingLogEntryBlob>,
by_hash: HashSet<LookupKey>,
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
done: Sender<HashMap<LookupKey, M>>,
}

impl Default for Batch {
impl<M: Clone + Default> Default for Batch<M> {
/// Returns a batch initialized with a watch channel.
fn default() -> Self {
Self {
Expand All @@ -60,7 +67,7 @@ impl Default for Batch {
}
}

impl GenericBatcher {
impl<M: CacheSerialize + Serialize + DeserializeOwned + Clone + Default + Copy + 'static> GenericBatcher<M> {
/// Returns a new batcher with the given config.
///
/// # Panics
Expand Down Expand Up @@ -189,7 +196,7 @@ impl GenericBatcher {
}
}

impl GenericBatcher {
impl<M: CacheSerialize + Serialize + DeserializeOwned + Clone + Default + Copy + 'static> GenericBatcher<M> {
/// Submit the current pending batch to be sequenced.
///
/// # Errors
Expand Down Expand Up @@ -217,8 +224,8 @@ impl GenericBatcher {
..Default::default()
},
)?;
let sequenced_entries: HashMap<LookupKey, SequenceMetadata> =
deserialize::<Vec<(LookupKey, SequenceMetadata)>>(
let sequenced_entries: HashMap<LookupKey, M> =
deserialize::<Vec<(LookupKey, M)>>(
&get_durable_object_stub(
&self.env,
&self.config.name,
Expand All @@ -244,7 +251,7 @@ impl GenericBatcher {
.map(|(k, v)| {
Ok(kv
.put(&BASE64_STANDARD.encode(k), "")?
.metadata::<SequenceMetadata>(v)?
.metadata::<M>(v)?
.execute())
})
.collect::<Result<Vec<_>>>()?;
Expand Down
Loading
Loading