diff --git a/crates/validator/src/filtering.rs b/crates/validator/src/filtering.rs index c77ddcc7..d0ffe6b8 100644 --- a/crates/validator/src/filtering.rs +++ b/crates/validator/src/filtering.rs @@ -7,7 +7,10 @@ use crate::TestActors; use crate::common::*; use async_backtrace::framed; use e2etest::TestCase; +use httpapi::KeyspaceName; +use scylla::client::session::Session; use std::collections::HashSet; +use std::sync::Arc; use tracing::info; #[framed] @@ -57,6 +60,21 @@ pub(crate) async fn new() -> TestCase { timeout, ann_filter_by_non_indexed_column_fails, ) + .with_test( + "ann_filter_by_clustering_key_only_requires_allow_filtering", + timeout, + ann_filter_by_clustering_key_only_requires_allow_filtering, + ) + .with_test( + "ann_filter_by_non_pk_column_rejected_without_allow_filtering", + timeout, + ann_filter_by_non_pk_column_rejected_without_allow_filtering, + ) + .with_test( + "ann_filter_by_non_pk_column_rejected_with_allow_filtering", + timeout, + ann_filter_by_non_pk_column_rejected_with_allow_filtering, + ) .with_test( "local_index_filter_by_partition_key_eq", timeout, @@ -1024,3 +1042,202 @@ async fn local_ann_with_timestamp_gte_filter(actors: TestActors) { info!("finished"); } + +#[framed] +async fn ann_filter_by_clustering_key_only_requires_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, clients) = prepare_connection(&actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "p INT, v VECTOR, ck INT, PRIMARY KEY (p, ck)", + None, + ) + .await; + + insert_ck_only_test_rows(&session, &table).await; + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!(index_status.count, 3, "Expected 3 vectors to be indexed"); + } + + info!("Verify ANN query with only ck filtering is rejected without ALLOW FILTERING"); + session + .query_unpaged(ck_only_query(&table, false), ()) + .await + .expect_err("ANN query with ck-only filtering should fail without ALLOW FILTERING"); + + info!("Verify the same query with ALLOW FILTERING returns matching rows"); + let rows = fetch_ck_only_rows_with_retry(&session, &table, true).await; + assert_ck_only_rows( + &rows, + 1, + 2, + "Expected two rows with ck=1 when using ALLOW FILTERING", + ); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +#[framed] +async fn ann_filter_by_non_pk_column_rejected_without_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await; + + info!("Test ANN query with indexed non-PK column filtering"); + let query = + format!("SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5"); + + session + .query_unpaged(query, ()) + .await + .expect_err("ANN query with non-PK column filtering should fail"); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +#[framed] +async fn ann_filter_by_non_pk_column_rejected_with_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await; + + info!("Test ANN query with indexed non-PK column filtering and ALLOW FILTERING"); + let query = format!( + "SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5 ALLOW FILTERING" + ); + + session + .query_unpaged(query, ()) + .await + .expect_err("ANN query with non-PK column filtering and ALLOW FILTERING should fail"); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +async fn insert_ck_only_test_rows(session: &Session, table: &TableName) { + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (1, 1, [0.1, 0.2, 0.3])"), + (), + ) + .await + .expect("failed to insert row p=1, ck=1"); + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (2, 1, [5.0, 5.0, 5.0])"), + (), + ) + .await + .expect("failed to insert row p=2, ck=1"); + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (3, 2, [0.1, 0.2, 0.3])"), + (), + ) + .await + .expect("failed to insert row p=3, ck=2"); +} + +fn ck_only_query(table: &TableName, with_allow_filtering: bool) -> String { + let base = + format!("SELECT p, ck FROM {table} WHERE ck = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5"); + if with_allow_filtering { + format!("{base} ALLOW FILTERING") + } else { + base + } +} + +async fn fetch_ck_only_rows_with_retry( + session: &Session, + table: &TableName, + with_allow_filtering: bool, +) -> Vec<(i32, i32)> { + let query = ck_only_query(table, with_allow_filtering); + let wait_message = if with_allow_filtering { + "Waiting for filtered ANN query (ck=1 with ALLOW FILTERING) to be operational" + } else { + "Waiting for filtered ANN query (ck=1 only) to be operational" + }; + + wait_for( + || async { + get_opt_query_results(query.clone(), session) + .await + .is_some() + }, + wait_message, + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + + get_query_results(query, session) + .await + .rows::<(i32, i32)>() + .expect("failed to get rows") + .map(|row| row.expect("failed to get row")) + .collect() +} + +fn assert_ck_only_rows( + rows: &[(i32, i32)], + expected_ck: i32, + expected_len: usize, + expected_len_message: &str, +) { + assert_eq!(rows.len(), expected_len, "{expected_len_message}"); + assert!( + rows.iter().all(|(_, ck)| *ck == expected_ck), + "Expected only rows with ck={expected_ck}" + ); +} + +async fn prepare_non_pk_column_filter_test( + actors: &TestActors, +) -> (Arc, KeyspaceName, TableName) { + let (session, clients) = prepare_connection(actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "p INT PRIMARY KEY, c INT, v VECTOR", + None, + ) + .await; + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!(index_status.count, 0, "Index should start empty"); + } + + info!("Create index on non-PK column c"); + session + .query_unpaged(format!("CREATE INDEX ON {table}(c)"), ()) + .await + .expect("failed to create index on c"); + + (session, keyspace, table) +}