Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
* implement basic producer and consumer -> every message gets commited blocking and consumer can stream that data


When I comeback I need to setup MinIO on my ssd then find a library that will allow me to stream data to it, with retry and multipart
When I comeback I need to setup MinIO on my ssd then find a library that will allow me to stream data to it, with retry and multipart

TODO:
The design of the Agent could be Agent {Router; Service (service is currently agent, need to chnage it to service then wrap it in agent)}
40 changes: 25 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod metadata;
mod producer;
mod agent;

use std::{thread, sync::Arc};
use std::{thread, sync::Arc, vec};

use bytes::Bytes;
use rand::{Rng, seq::SliceRandom};
Expand All @@ -19,28 +19,37 @@ fn main() -> anyhow::Result<()> {
info!("starting up");

let _guard = unsafe { foundationdb::boot() };
let (tx, rx) = tokio::sync::mpsc::channel(10_000);

let mut handles = vec![];
let mut senders = vec![];
for _ in 0..5 {
let (tx, rx) = tokio::sync::mpsc::channel(10_000);
let handle = thread::spawn( move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
let agent = agent::Agent::new();
agent.start(rx).await;
});
});
senders.push(tx.clone());
handles.push(handle);
}

thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(produce_msg(tx));
rt.block_on(produce_msg(senders));
});

let handle = thread::spawn( move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
let agent = agent::Agent::new();
agent.start(rx).await;
});
});

handle.join().expect("could not join thread");
for handle in handles {
handle.join().expect("could not join thread");
}

return Ok(());

}

async fn produce_msg(tx: tokio::sync::mpsc::Sender<agent::Command>) {
async fn produce_msg(senders: Vec<tokio::sync::mpsc::Sender<agent::Command>>) {
debug!("creating topics...");
let topic_names = (0..100_000).map(|i| {
format!("test_topic_{i}")
Expand All @@ -59,12 +68,13 @@ async fn produce_msg(tx: tokio::sync::mpsc::Sender<agent::Command>) {
debug!("topics created...");

let mut n = 0;
let num_senders = senders.len();

loop {

if n >= 50_000 {
if n >= 500_000 {
n = 0;
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}

let topic_name = topic_names.choose(&mut rand::thread_rng()).unwrap();
Expand All @@ -76,7 +86,7 @@ async fn produce_msg(tx: tokio::sync::mpsc::Sender<agent::Command>) {
message,
};

let t = tx.clone();
let t = senders[n % num_senders].clone();
tokio::spawn(async move {
t.clone().send(command).await.expect("could not send command");
});
Expand Down