From 599924d9980a467ddfbaf90fcc445964d6da6643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hudobski?= Date: Mon, 4 May 2026 11:25:49 +0200 Subject: [PATCH] validator: add filtering tests that were removed from test.py We add two test cases from test.py that were removed when filtering support was added. We change the ck test, as now we support it. Fixes: VECTOR-479 Co-authored-by: Copilot --- crates/validator/src/filtering.rs | 217 ++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) diff --git a/crates/validator/src/filtering.rs b/crates/validator/src/filtering.rs index c77ddcc7..d0ffe6b8 100644 --- a/crates/validator/src/filtering.rs +++ b/crates/validator/src/filtering.rs @@ -7,7 +7,10 @@ use crate::TestActors; use crate::common::*; use async_backtrace::framed; use e2etest::TestCase; +use httpapi::KeyspaceName; +use scylla::client::session::Session; use std::collections::HashSet; +use std::sync::Arc; use tracing::info; #[framed] @@ -57,6 +60,21 @@ pub(crate) async fn new() -> TestCase { timeout, ann_filter_by_non_indexed_column_fails, ) + .with_test( + "ann_filter_by_clustering_key_only_requires_allow_filtering", + timeout, + ann_filter_by_clustering_key_only_requires_allow_filtering, + ) + .with_test( + "ann_filter_by_non_pk_column_rejected_without_allow_filtering", + timeout, + ann_filter_by_non_pk_column_rejected_without_allow_filtering, + ) + .with_test( + "ann_filter_by_non_pk_column_rejected_with_allow_filtering", + timeout, + ann_filter_by_non_pk_column_rejected_with_allow_filtering, + ) .with_test( "local_index_filter_by_partition_key_eq", timeout, @@ -1024,3 +1042,202 @@ async fn local_ann_with_timestamp_gte_filter(actors: TestActors) { info!("finished"); } + +#[framed] +async fn ann_filter_by_clustering_key_only_requires_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, clients) = prepare_connection(&actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "p INT, v VECTOR, ck INT, PRIMARY KEY (p, ck)", + None, + ) + .await; + + insert_ck_only_test_rows(&session, &table).await; + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!(index_status.count, 3, "Expected 3 vectors to be indexed"); + } + + info!("Verify ANN query with only ck filtering is rejected without ALLOW FILTERING"); + session + .query_unpaged(ck_only_query(&table, false), ()) + .await + .expect_err("ANN query with ck-only filtering should fail without ALLOW FILTERING"); + + info!("Verify the same query with ALLOW FILTERING returns matching rows"); + let rows = fetch_ck_only_rows_with_retry(&session, &table, true).await; + assert_ck_only_rows( + &rows, + 1, + 2, + "Expected two rows with ck=1 when using ALLOW FILTERING", + ); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +#[framed] +async fn ann_filter_by_non_pk_column_rejected_without_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await; + + info!("Test ANN query with indexed non-PK column filtering"); + let query = + format!("SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5"); + + session + .query_unpaged(query, ()) + .await + .expect_err("ANN query with non-PK column filtering should fail"); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +#[framed] +async fn ann_filter_by_non_pk_column_rejected_with_allow_filtering(actors: TestActors) { + info!("started"); + + let (session, keyspace, table) = prepare_non_pk_column_filter_test(&actors).await; + + info!("Test ANN query with indexed non-PK column filtering and ALLOW FILTERING"); + let query = format!( + "SELECT * FROM {table} WHERE c = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5 ALLOW FILTERING" + ); + + session + .query_unpaged(query, ()) + .await + .expect_err("ANN query with non-PK column filtering and ALLOW FILTERING should fail"); + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +} + +async fn insert_ck_only_test_rows(session: &Session, table: &TableName) { + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (1, 1, [0.1, 0.2, 0.3])"), + (), + ) + .await + .expect("failed to insert row p=1, ck=1"); + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (2, 1, [5.0, 5.0, 5.0])"), + (), + ) + .await + .expect("failed to insert row p=2, ck=1"); + session + .query_unpaged( + format!("INSERT INTO {table} (p, ck, v) VALUES (3, 2, [0.1, 0.2, 0.3])"), + (), + ) + .await + .expect("failed to insert row p=3, ck=2"); +} + +fn ck_only_query(table: &TableName, with_allow_filtering: bool) -> String { + let base = + format!("SELECT p, ck FROM {table} WHERE ck = 1 ORDER BY v ANN OF [0.1, 0.2, 0.3] LIMIT 5"); + if with_allow_filtering { + format!("{base} ALLOW FILTERING") + } else { + base + } +} + +async fn fetch_ck_only_rows_with_retry( + session: &Session, + table: &TableName, + with_allow_filtering: bool, +) -> Vec<(i32, i32)> { + let query = ck_only_query(table, with_allow_filtering); + let wait_message = if with_allow_filtering { + "Waiting for filtered ANN query (ck=1 with ALLOW FILTERING) to be operational" + } else { + "Waiting for filtered ANN query (ck=1 only) to be operational" + }; + + wait_for( + || async { + get_opt_query_results(query.clone(), session) + .await + .is_some() + }, + wait_message, + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + + get_query_results(query, session) + .await + .rows::<(i32, i32)>() + .expect("failed to get rows") + .map(|row| row.expect("failed to get row")) + .collect() +} + +fn assert_ck_only_rows( + rows: &[(i32, i32)], + expected_ck: i32, + expected_len: usize, + expected_len_message: &str, +) { + assert_eq!(rows.len(), expected_len, "{expected_len_message}"); + assert!( + rows.iter().all(|(_, ck)| *ck == expected_ck), + "Expected only rows with ck={expected_ck}" + ); +} + +async fn prepare_non_pk_column_filter_test( + actors: &TestActors, +) -> (Arc, KeyspaceName, TableName) { + let (session, clients) = prepare_connection(actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "p INT PRIMARY KEY, c INT, v VECTOR", + None, + ) + .await; + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!(index_status.count, 0, "Index should start empty"); + } + + info!("Create index on non-PK column c"); + session + .query_unpaged(format!("CREATE INDEX ON {table}(c)"), ()) + .await + .expect("failed to create index on c"); + + (session, keyspace, table) +}