Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions mgmtd/src/app/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use shared::conn::msg_dispatch::test::TestRequest;
use shared::nic::{NicFilter, query_nics};
use shared::types::AuthSecret;
use sqlite::Connections;
use std::any::Any;
use std::net::Ipv4Addr;
use std::sync::Mutex;

Expand All @@ -16,17 +17,36 @@ use std::sync::Mutex;
pub struct TestApp {
pub db: Connections,
pub info: Arc<StaticInfo>,
#[allow(clippy::type_complexity)]
pub notifications: Arc<Mutex<Vec<(MsgId, Vec<NodeType>)>>>,
data: Arc<Mutex<TestData>>,
}

type RequestHandler = dyn FnMut(&dyn Any) -> Result<Box<dyn Any>> + Send;

#[derive(Default)]
struct TestData {
pub notifications: Vec<(MsgId, Vec<NodeType>)>,
request_handler: Option<Box<RequestHandler>>,
}

impl Debug for TestData {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("TestData")
.field("notifications", &self.notifications)
.finish()
}
}

impl TestApp {
pub async fn new() -> Self {
Self::with_config(Config::default()).await
}

pub async fn with_config(user_config: Config) -> Self {
let db = crate::db::test::setup_with_test_data().await;
Self {
db,
info: Arc::new(StaticInfo {
user_config: Config::default(),
user_config,
auth_secret: Some(AuthSecret::hash_from_bytes("secret")),
network_addrs: query_nics(
&[NicFilter {
Expand All @@ -38,14 +58,24 @@ impl TestApp {
.unwrap(),
use_ipv6: false,
}),
notifications: Arc::new(Mutex::new(vec![])),
data: Arc::new(Mutex::new(TestData::default())),
}
}

pub fn set_request_handler<T: FnMut(&dyn Any) -> Result<Box<dyn Any>> + Send + 'static>(
&self,
handler: T,
) {
self.data.lock().unwrap().request_handler = Some(Box::new(handler));
}
}

impl TestApp {
pub fn has_sent_notification<M: Msg>(&self, receivers: &[NodeType]) -> bool {
self.notifications
self.data
.lock()
.unwrap()
.notifications
.contains(&(M::ID, receivers.to_vec()))
}
}
Expand Down Expand Up @@ -90,19 +120,25 @@ impl App for TestApp {
async fn request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
_node_uid: Uid,
_msg: &M,
msg: &M,
) -> Result<R> {
Ok(R::default())
let mut d = self.data.lock().unwrap();
if let Some(ref mut h) = d.request_handler {
h(msg).map(|r| *r.downcast().unwrap())
} else {
Ok(R::default())
}
}

async fn send_notifications<M: Msg + Serializable>(
&self,
node_types: &'static [NodeType],
_msg: &M,
) {
self.notifications
self.data
.lock()
.unwrap()
.notifications
.push((M::ID, node_types.to_owned()));
}

Expand Down
165 changes: 154 additions & 11 deletions mgmtd/src/bee_msg/request_exceeded_quota.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use crate::db::quota_usage::PoolOrTargetId;
use rusqlite::params;
use shared::bee_msg::quota::*;

impl HandleWithResponse for RequestExceededQuota {
Expand All @@ -15,22 +15,41 @@ impl HandleWithResponse for RequestExceededQuota {
async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result<Self::Response> {
let inner = app
.read_tx(move |tx| {
let exceeded_ids = db::quota_usage::exceeded_quota_ids(
tx,
if self.pool_id != 0 {
PoolOrTargetId::PoolID(self.pool_id)
} else {
PoolOrTargetId::TargetID(self.target_id)
},
self.id_type,
self.quota_type,
// Quota is calculated per pool, so if a target ID is given, use its assigned pools
// ID.
let pool_id = if self.pool_id != 0 {
self.pool_id
} else {
tx.query_row_cached(
sql!("SELECT pool_id FROM storage_targets WHERE target_id = ?1"),
[self.target_id],
|row| row.get(0),
)?
};

let exceeded_quota_ids = tx.query_map_collect(
sql!(
"SELECT DISTINCT e.quota_id FROM quota_usage AS e
INNER JOIN targets AS st USING(node_type, target_id)
LEFT JOIN quota_default_limits AS d USING(id_type, quota_type, pool_id)
LEFT JOIN quota_limits AS l USING(quota_id, id_type, quota_type, pool_id)
WHERE e.id_type = ?1 AND e.quota_type = ?2 AND st.pool_id = ?3
GROUP BY e.quota_id, e.id_type, e.quota_type, st.pool_id
HAVING SUM(e.value) > COALESCE(l.value, d.value)"
),
params![
self.id_type.sql_variant(),
self.quota_type.sql_variant(),
pool_id
],
|row| row.get(0),
)?;

Ok(SetExceededQuota {
pool_id: self.pool_id,
id_type: self.id_type,
quota_type: self.quota_type,
exceeded_quota_ids: exceeded_ids,
exceeded_quota_ids,
})
})
.await?;
Expand All @@ -41,3 +60,127 @@ impl HandleWithResponse for RequestExceededQuota {
})
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::app::test::*;
use crate::bee_msg::HandleWithResponse;

#[tokio::test]
async fn request_exceeded_quota() {
let app = TestApp::new().await;
let mut req = TestRequest::new(RequestExceededQuota::ID);

let tests: &[(_, &[u32])] = &[
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Space,
pool_id: 1,
target_id: 0,
},
&[2, 4, 10],
),
(
RequestExceededQuota {
id_type: QuotaIdType::Group,
quota_type: QuotaType::Space,
pool_id: 1,
target_id: 0,
},
&[2, 4, 11],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Inode,
pool_id: 1,
target_id: 0,
},
&[2, 4, 12],
),
(
RequestExceededQuota {
id_type: QuotaIdType::Group,
quota_type: QuotaType::Inode,
pool_id: 1,
target_id: 0,
},
&[2, 4, 13],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Space,
pool_id: 2,
target_id: 0,
},
&[20],
),
(
RequestExceededQuota {
id_type: QuotaIdType::Group,
quota_type: QuotaType::Space,
pool_id: 2,
target_id: 0,
},
&[],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Inode,
pool_id: 2,
target_id: 0,
},
&[],
),
(
RequestExceededQuota {
id_type: QuotaIdType::Group,
quota_type: QuotaType::Inode,
pool_id: 2,
target_id: 0,
},
&[],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Space,
pool_id: 0,
target_id: 2,
},
&[20],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Space,
pool_id: 4,
target_id: 0,
},
&[],
),
(
RequestExceededQuota {
id_type: QuotaIdType::User,
quota_type: QuotaType::Space,
pool_id: 0,
target_id: 12, // Pool 4
},
&[],
),
];

for (msg, exp) in tests {
let resp = msg.clone().handle(&app, &mut req).await.unwrap();
assert_eq!(resp.result, OpsErr::SUCCESS, "{msg:?}");
assert_eq!(resp.inner.pool_id, msg.pool_id, "{msg:?}");
assert_eq!(resp.inner.id_type, msg.id_type, "{msg:?}");
assert_eq!(resp.inner.quota_type, msg.quota_type, "{msg:?}");
assert_eq!(&resp.inner.exceeded_quota_ids, exp, "{msg:?}");
}
}
}
1 change: 0 additions & 1 deletion mgmtd/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ mod import_v7;
pub(crate) mod misc;
pub(crate) mod node;
pub(crate) mod node_nic;
pub(crate) mod quota_usage;
pub(crate) mod storage_pool;
pub(crate) mod target;

Expand Down
Loading
Loading