Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 15 additions & 30 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ macro_rules! implement_with_retry {
$vis:vis async fn $fn_name:ident $(< $t:ident $( : $bound:path )? >)?
(
$conn_var:ident $(, $param_name:ident : $param_type:ty)* $(,)?
) $(-> $return_type:ty)?
) -> $return_type:ty
$body:block
)+
) => {
Expand All @@ -256,15 +256,12 @@ macro_rules! implement_with_retry {
$vis async fn $fn_name $(< $t $( : $bound )? >)? (
&mut self,
$($param_name: $param_type),*
) -> Result<implement_with_retry!(@ret $($return_type)?)> {
) -> Result<$return_type> {
let deadline = Deadline::new(self.timeout);

loop {
let $conn_var = &mut *self;
let result = implement_with_retry!(@attempt $conn_var, $body, deadline, $($attr)?).await?;
if let Some(result) = result {
return Ok(result);
}
implement_with_retry!(@attempt $conn_var, $body, deadline, $($attr)?);
}
}
)+
Expand All @@ -279,19 +276,15 @@ macro_rules! implement_with_retry {
$vis async fn $fn_name $(< $t $( : $bound )? >)? (
&self,
$($param_name: $param_type),*
) -> Result<implement_with_retry!(@ret $($return_type)?)> {
) -> Result<$return_type> {
let deadline = Deadline::new(self.timeout);

loop {
let mut $conn_var = deadline.run(async {
Ok(self.connection.lock().await)
}).await?;
let get_conn = async { Ok(self.connection.lock().await) };
let mut $conn_var = deadline.run(get_conn).await?;

for _ in 0..4 {
let result = implement_with_retry!(@attempt $conn_var, $body, deadline, $($attr)?).await?;
if let Some(result) = result {
return Ok(result);
}
implement_with_retry!(@attempt $conn_var, $body, deadline, $($attr)?);
}

// Reacquire mutex after 4 failed attempts in case it's the request's fault.
Expand All @@ -303,7 +296,7 @@ macro_rules! implement_with_retry {
};

// === Core retryable operation ===
(@attempt $conn_var:ident, $body:block, $deadline:expr, $($attr:meta)?) => { async {
(@attempt $conn_var:ident, $body:block, $deadline:expr, $($attr:meta)?) => {
let __request = async {
let $conn_var = $conn_var.get_connection().await?;
Ok($body)
Expand All @@ -312,16 +305,14 @@ macro_rules! implement_with_retry {
match $deadline.run(__request).await {
Ok(__result) => {
$conn_var.retry_counter = 0;
return Ok::<_, Error>(Some(__result));
return Ok(__result);
}
Err(__err) => {
implement_with_retry!(@handle_reset $($attr)?, $conn_var);
$conn_var.handle_error(__err, $deadline).await?
}
}

Ok(None)
}};
};

// === Conditionally implement shared struct ===
(@maybe_shared_impl $shared_struct:ident { $($impl_tokens:tt)* }) => {
Expand All @@ -335,13 +326,7 @@ macro_rules! implement_with_retry {
(@handle_reset reset_connection_on_error, $conn_var:expr) => {
$conn_var.connection = None;
};
(@handle_reset $($other:meta)?, $conn_var:expr) => {
// No-op if no reset attribute is present
};

// === Return type resolver ===
(@ret $t:ty) => { $t };
(@ret) => { () };
(@handle_reset $($other:meta)?, $conn_var:expr) => {};
}

pub enum ChatState {
Expand All @@ -359,7 +344,7 @@ implement_with_retry! {
connection.sismember(KNOWN_ITEMS_KEY, volfdnr).await?
}

pub async fn add_known_volfdnr(connection, volfdnr: &str) {
pub async fn add_known_volfdnr(connection, volfdnr: &str) -> () {
connection.sadd(KNOWN_ITEMS_KEY, volfdnr).await?
}

Expand Down Expand Up @@ -552,7 +537,7 @@ implement_with_retry! {
.and_then(|(_, v)| v.into_iter().next())
}

pub async fn set_last_update(connection, timestamp: DateTime<Utc>) {
pub async fn set_last_update(connection, timestamp: DateTime<Utc>) -> () {
connection.set(LAST_UPDATE_KEY, timestamp.timestamp_millis()).await?
}

Expand Down Expand Up @@ -583,12 +568,12 @@ implement_with_retry! {
}
}

pub async fn update_dialogue(connection, chat_id: i64, dialogue: &impl Serialize) {
pub async fn update_dialogue(connection, chat_id: i64, dialogue: &impl Serialize) -> () {
let string = serde_json::to_string(dialogue)?;
connection.set_ex(dialogue_key(chat_id), &string, 60 * 60 * 24).await?
}

pub async fn remove_dialogue(connection, chat_id: i64) {
pub async fn remove_dialogue(connection, chat_id: i64) -> () {
connection.del(dialogue_key(chat_id)).await?
}

Expand Down
Loading