Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
647 changes: 310 additions & 337 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions libs/opsqueue_python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ crate-type = ["cdylib"]
opsqueue = {path = "../../opsqueue/", default-features = false, features = ["client-logic"] }

# General-purpose datatypes:
uuid = {version = "1.11.0", features = [
uuid = {version = "1.19.0", features = [
"v7", # Temporally orderable UUIDs with random component
"fast-rng", # Use a faster (but still sufficiently random) RNG
]}
chrono = { version = "0.4.38"}
ux = "0.1.6"

# Concurrency:
tokio = {version = "1.38", features = ["macros", "rt-multi-thread"]}
tokio = {version = "1.49", features = ["macros", "rt-multi-thread"]}
futures = "0.3.30"

# Error handling:
anyhow = "1.0.86"
thiserror = "1.0.65"
thiserror = "2.0.17"

# Python FFI:
pyo3 = { version = "0.23.4", features = ["chrono"] }
Expand All @@ -36,4 +36,4 @@ once_cell = "1.21.3" # Only currently used for `unsync::OnceCell` as part of PyO

# Logging/tracing:
pyo3-log = "0.12.1"
tracing = { version = "0.1.41", features = ["log"] }
tracing = { version = "0.1.44", features = ["log"] }
46 changes: 23 additions & 23 deletions opsqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = "2021"
description = "lightweight batch processing queue for heavy loads"
repository = "https://github.com/channable/opsqueue"
license = "MIT"
include=["opsqueue_example_database_schema.db"]

[lib]
name="opsqueue"
path="src/lib.rs"
include=["opsqueue_example_database_schema.db"]

[[bin]]
name="opsqueue"
Expand All @@ -19,12 +19,12 @@ required-features = ["server-logic"]
[dependencies]
# Datatypes and concurrency:
itertools = "0.14.0"
arc-swap = {version = "1.7.1", optional = true}
moka = { version = "0.12.8", features = ["sync"], optional = true }
arc-swap = {version = "1.8.0", optional = true}
moka = { version = "0.12.12", features = ["sync"], optional = true }
chrono = { version = "0.4.38", features = ["serde"]}
futures = "0.3.30"
tokio = { version = "1.38.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.11.0", features = [
tokio = { version = "1.49.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.19.0", features = [
"v7", # Timestamp-sortable UUIDs with random component
"fast-rng", # Use a faster (but still sufficiently random) RNG
"serde",
Expand All @@ -36,42 +36,42 @@ anyhow = "1.0.86"
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"], optional = true }
# Serialization:
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.124"
serde_json = "1.0.149"
ciborium = "0.2.2"
# Webservers/clients
http = "1.2.0"
object_store = {version = "0.11.1", features = ["gcp", "http"]}
http = "1.4.0"
object_store = {version = "0.13.0", features = ["gcp", "http"]}
snowflaked = {version = "1.0.3", features = ["sync"] }
tokio-tungstenite = {version = "0.24.0", optional = true}
axum = { version = "0.7.5", features = ["ws", "macros"], optional = true }
tokio-tungstenite = {version = "0.28.0", optional = true}
axum = { version = "0.8.8", features = ["ws", "macros"], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"], optional = true }
url = {version = "2.5.2"}
tokio-util = { version = "0.7.11", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.1", features = ["trace", "catch-panic"], optional = true }
url = {version = "2.5.8"}
tokio-util = { version = "0.7.18", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.8", features = ["trace", "catch-panic"], optional = true }
# Logging and tracing:
tracing = {version = "0.1", features = ["log"] }
tracing-subscriber = {version = "0.3", features = ["std", "env-filter"] }
sentry = {version = "0.35", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.35", optional = true}
sentry = {version = "0.46", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.46", optional = true}
# Exporting traces to Opentelemetry:
axum-tracing-opentelemetry = {version = "0.24.0", optional = true }
axum-tracing-opentelemetry = {version = "0.32.2", optional = true }
opentelemetry = { version = "0.26", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.26", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-http = { version = "0.26" }
opentelemetry-otlp = { version = "0.26", optional = true }
tracing-opentelemetry = {version = "0.27.0" }
opentelemetry-semantic-conventions = {version = "0.26.0", features = ["semconv_experimental"], optional = true}
moro-local = "0.4.0"
thiserror = "1.0.65"
thiserror = "2.0.17"
either = "1.13.0"
serde-error = "0.1.3"
backon = { version = "1.3.0", features = ["tokio-sleep"] }
rand = "0.8.5"
backon = { version = "1.6.0", features = ["tokio-sleep"] }
rand = "0.9.2"
rustc-hash = "2.0.0"
axum-prometheus = {version = "0.7.0", optional = true}
axum-prometheus = {version = "0.10.0", optional = true}

# Configuration:
clap = { version = "4.5.21", features = ["derive"] }
clap = { version = "4.5.54", features = ["derive"] }
humantime = "2.1.0"

dashmap = "6.1.0"
Expand All @@ -80,8 +80,8 @@ crossbeam-skiplist = "0.1.3"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dev-dependencies]
criterion = {version = "0.3", features = ["async_tokio"]}
insta = { version = "1.41.1" }
criterion = {version = "0.8", features = ["async_tokio"]}
insta = { version = "1.46.0" }
assert_matches = { version = "1.5.0" }

# [[bench]]
Expand Down
8 changes: 4 additions & 4 deletions opsqueue/src/consumer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl From<ServerToClientMessage> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand All @@ -114,7 +114,7 @@ impl From<Envelope<ClientToServerMessage>> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl From<ServerToClientMessage> for tokio_tungstenite::tungstenite::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}

Expand All @@ -157,6 +157,6 @@ impl From<Envelope<ClientToServerMessage>> for tokio_tungstenite::tungstenite::M
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}
4 changes: 2 additions & 2 deletions opsqueue/src/consumer/dispatcher/metastate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mod tests {
.collect();

// Increment in one order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.increment(key, val);
}
Expand All @@ -153,7 +153,7 @@ mod tests {
assert_eq!(too_highs.len(), n_groups);

// Decrement in a different order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.decrement(key, val);
}
Expand Down
3 changes: 2 additions & 1 deletion opsqueue/src/consumer/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use axum::extract::ws::{Message, WebSocket};
use futures::SinkExt;
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -134,7 +135,7 @@ impl ConsumerConn {
}
}

async fn graceful_shutdown(self) {
async fn graceful_shutdown(mut self) {
const GRACEFUL_WEBSOCKET_CLOSE_TIMEOUT: Duration = Duration::from_millis(100);
select! {
_ = self.ws_stream.close() => {},
Expand Down
1 change: 1 addition & 0 deletions opsqueue/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::common::chunk;
use futures::stream::{self, TryStreamExt};
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::ObjectStoreExt;
use reqwest::Url;
use ux::u63;

Expand Down
4 changes: 2 additions & 2 deletions opsqueue/src/producer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ impl ServerState {
)
.route("/submissions/count", get(submissions_count))
.route(
"/submissions/lookup_id_by_prefix/:prefix",
"/submissions/lookup_id_by_prefix/{prefix}",
get(lookup_submission_id_by_prefix),
)
.route("/submissions/:submission_id", get(submission_status))
.route("/submissions/{submission_id}", get(submission_status))
.route("/version", get(crate::server::version_endpoint)) // We're also exposing it here so the producer client can view it
.with_state(self)
}
Expand Down