Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions crates/validator/src/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::TestActors;
use crate::common::*;
use async_backtrace::framed;
use e2etest::TestCase;
use httpapi::KeyspaceName;
use scylla::client::session::Session;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::info;

#[framed]
Expand Down Expand Up @@ -57,6 +60,21 @@ pub(crate) async fn new() -> TestCase<TestActors> {
timeout,
ann_filter_by_non_indexed_column_fails,
)
.with_test(
"ann_filter_by_clustering_key_only_requires_allow_filtering",
timeout,
ann_filter_by_clustering_key_only_requires_allow_filtering,
)
.with_test(
"ann_filter_by_non_pk_column_rejected_without_allow_filtering",
timeout,
ann_filter_by_non_pk_column_rejected_without_allow_filtering,
)
.with_test(
"ann_filter_by_non_pk_column_rejected_with_allow_filtering",
timeout,
ann_filter_by_non_pk_column_rejected_with_allow_filtering,
)
.with_test(
"local_index_filter_by_partition_key_eq",
timeout,
Expand Down Expand Up @@ -1024,3 +1042,202 @@ async fn local_ann_with_timestamp_gte_filter(actors: TestActors) {

info!("finished");
}

#[framed]
async fn ann_filter_by_clustering_key_only_requires_allow_filtering(actors: TestActors) {
info!("started");

let (session, clients) = prepare_connection(&actors).await;

let keyspace = create_keyspace(&session).await;
let table = create_table(
&session,
"p INT, v VECTOR<FLOAT, 3>, ck INT, PRIMARY KEY (p, ck)",
None,
)
.await;

insert_ck_only_test_rows(&session, &table).await;

let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await;

for client in &clients {
let index_status = wait_for_index(client, &index).await;
assert_eq!(index_status.count, 3, "Expected 3 vectors to be indexed");
}

info!("Verify ANN query with only ck filtering is rejected without ALLOW FILTERING");
session
.query_unpaged(ck_only_query(&table, false), ())
.await
.expect_err("ANN query with ck-only filtering should fail without ALLOW FILTERING");

info!("Verify the same query with ALLOW FILTERING returns matching rows");
let rows = fetch_ck_only_rows_with_retry(&session, &table, true).await;
assert_ck_only_rows(
&rows,
1,
2,
"Expected two rows with ck=1 when using ALLOW FILTERING",
);

session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");

info!("finished");
}

#[framed]
async fn ann_filter_by_non_pk_column_rejected_without_allow_filtering(actors: TestActors) {
info!("started");

let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await;

info!("Test ANN query with indexed non-PK column filtering");
let query =
format!("SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5");

session
.query_unpaged(query, ())
.await
.expect_err("ANN query with non-PK column filtering should fail");

session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");

info!("finished");
}

#[framed]
async fn ann_filter_by_non_pk_column_rejected_with_allow_filtering(actors: TestActors) {
info!("started");

let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await;

info!("Test ANN query with indexed non-PK column filtering and ALLOW FILTERING");
let query = format!(
"SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5 ALLOW FILTERING"
);

session
.query_unpaged(query, ())
.await
.expect_err("ANN query with non-PK column filtering and ALLOW FILTERING should fail");

session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");

info!("finished");
}

async fn insert_ck_only_test_rows(session: &Session, table: &TableName) {
session
.query_unpaged(
format!("INSERT INTO {table} (p, ck, v) VALUES (1, 1, [0.1, 0.2, 0.3])"),
(),
)
.await
.expect("failed to insert row p=1, ck=1");
session
.query_unpaged(
format!("INSERT INTO {table} (p, ck, v) VALUES (2, 1, [5.0, 5.0, 5.0])"),
(),
)
.await
.expect("failed to insert row p=2, ck=1");
session
.query_unpaged(
format!("INSERT INTO {table} (p, ck, v) VALUES (3, 2, [0.1, 0.2, 0.3])"),
(),
)
.await
.expect("failed to insert row p=3, ck=2");
}

fn ck_only_query(table: &TableName, with_allow_filtering: bool) -> String {
let base =
format!("SELECT p, ck FROM {table} WHERE ck = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5");
if with_allow_filtering {
format!("{base} ALLOW FILTERING")
} else {
base
}
}

async fn fetch_ck_only_rows_with_retry(
session: &Session,
table: &TableName,
with_allow_filtering: bool,
) -> Vec<(i32, i32)> {
let query = ck_only_query(table, with_allow_filtering);
let wait_message = if with_allow_filtering {
"Waiting for filtered ANN query (ck=1 with ALLOW FILTERING) to be operational"
} else {
"Waiting for filtered ANN query (ck=1 only) to be operational"
};

wait_for(
|| async {
get_opt_query_results(query.clone(), session)
.await
.is_some()
},
wait_message,
DEFAULT_OPERATION_TIMEOUT,
)
.await;

get_query_results(query, session)
.await
.rows::<(i32, i32)>()
.expect("failed to get rows")
.map(|row| row.expect("failed to get row"))
.collect()
}

fn assert_ck_only_rows(
rows: &[(i32, i32)],
expected_ck: i32,
expected_len: usize,
expected_len_message: &str,
) {
assert_eq!(rows.len(), expected_len, "{expected_len_message}");
assert!(
rows.iter().all(|(_, ck)| *ck == expected_ck),
"Expected only rows with ck={expected_ck}"
);
}

async fn prepare_non_pk_column_filter_test(
actors: &TestActors,
) -> (Arc<Session>, KeyspaceName, TableName) {
let (session, clients) = prepare_connection(actors).await;

let keyspace = create_keyspace(&session).await;
let table = create_table(
&session,
"p INT PRIMARY KEY, c INT, v VECTOR<FLOAT, 3>",
None,
)
.await;

let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await;
for client in &clients {
let index_status = wait_for_index(client, &index).await;
assert_eq!(index_status.count, 0, "Index should start empty");
}

info!("Create index on non-PK column c");
session
.query_unpaged(format!("CREATE INDEX ON {table}(c)"), ())
.await
.expect("failed to create index on c");

(session, keyspace, table)
}
Loading