Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ast/src/lang/graphs/neo4j/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::lang::graphs::{
};

use crate::lang::Neo4jGraph;
use crate::lang::graphs::executor::with_transient_retry_reconnect;
pub struct Neo4jConnectionManager;

impl Neo4jConnectionManager {
Expand Down Expand Up @@ -50,25 +51,29 @@ impl Neo4jConnectionManager {

impl Neo4jGraph {
pub async fn create_indexes(&self) -> Result<()> {
let connection: Neo4jConnection = self.ensure_connected().await?;
use with_transient_retry_reconnect;

let queries = vec![
"CREATE INDEX data_bank_node_key_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.node_key)",
"CREATE INDEX data_bank_ref_id_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.ref_id)",
// Range index on `file` so incremental sync deletions
// (`remove_nodes_by_files_query` -> `n.file IN $files`) and other
// file-scoped lookups can do an index seek instead of a full
// `:Data_Bank` label scan. Without this an incremental sync had
// to walk every project node for every modified file.
"CREATE INDEX data_bank_file_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.file)",
"CREATE INDEX data_bank_name_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.name)",
"CREATE FULLTEXT INDEX bodyIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.body]",
"CREATE FULLTEXT INDEX nameIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.name]",
"CREATE FULLTEXT INDEX nameBodyFileIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.name, n.body, n.file]",
"CREATE VECTOR INDEX vectorIndex IF NOT EXISTS FOR (n:Data_Bank) ON (n.embeddings) OPTIONS {indexConfig: {`vector.dimensions`: 384, `vector.similarity_function`: 'cosine'}}"
];

for q in queries {
if let Err(e) = connection.run(neo4rs::query(q)).await {
tracing::warn!("Error creating index: {:?}", e);
if let Err(e) = with_transient_retry_reconnect(self, "create_indexes", || {
async move {
let connection = self.ensure_connected().await?;
connection.run(neo4rs::query(q)).await.map_err(|e| {
shared::Error::dependency(format!("Index creation failed: {}", e))
})
}
}).await {
tracing::warn!(index = %q, error = %e, "index creation failed after retries");
}
}
Ok(())
Expand Down
77 changes: 48 additions & 29 deletions ast/src/lang/graphs/neo4j/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,30 +195,35 @@ fn extract_coverage_data(row: &neo4rs::Row) -> Result<(NodeData, usize, bool, us
))
}

async fn execute_query<T>(
conn: &Neo4jConnection,
async fn execute_query<T: Send + 'static>(
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
extractor: impl Fn(&neo4rs::Row) -> Result<T>,
extractor: impl Fn(&neo4rs::Row) -> Result<T> + Send + Sync + 'static,
) -> Result<Vec<T>> {
let query_obj = bind_parameters(&query_str, params);
let mut results = Vec::new();

match conn.execute(query_obj).await {
Ok(mut stream) => {
let label = query_str.chars().take(80).collect::<String>();
let extractor = std::sync::Arc::new(extractor);
with_transient_retry_reconnect(graph, &label, || {
let query_str = query_str.clone();
let params = params.clone();
let extractor = extractor.clone();
async move {
let conn = graph.ensure_connected().await?;
let query_obj = bind_parameters(&query_str, params);
let mut results = Vec::new();
let mut stream = conn.execute(query_obj).await.map_err(|e| {
shared::Error::dependency(format!("[neo4j-read] {}: {}", query_str.chars().take(80).collect::<String>(), e))
})?;
while let Ok(Some(row)) = stream.next().await {
match extractor(&row) {
Ok(item) => results.push(item),
Err(_) => continue,
}
}
Ok(results)
}
Err(e) => {
return Err(e.into());
}
}

Ok(results)
})
.await
}

pub struct TransactionManager<'a> {
Expand Down Expand Up @@ -340,62 +345,76 @@ pub async fn execute_queries_simple(
}

pub async fn execute_node_query(
conn: &Neo4jConnection,
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
) -> Vec<NodeData> {
execute_query(conn, query_str, params, extract_node_data)
let q = query_str.clone();
execute_query(graph, query_str, params, extract_node_data)
.await
.unwrap_or_else(|e| {
warn!("Error executing query: {}", e);
warn!(query = %q.chars().take(120).collect::<String>(), error = %e, "neo4j read failed after retries");
Vec::new()
})
}

pub async fn execute_nodes_with_coverage_query(
conn: &Neo4jConnection,
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
) -> Vec<(NodeData, usize, bool, usize, String)> {
execute_query(conn, query_str, params, extract_coverage_data)
let q = query_str.clone();
execute_query(graph, query_str, params, extract_coverage_data)
.await
.unwrap_or_else(|e| {
warn!("Error executing nodes with coverage query: {}", e);
warn!(query = %q.chars().take(120).collect::<String>(), error = %e, "neo4j coverage read failed after retries");
Vec::new()
})
}

pub async fn execute_muted_nodes_query(
conn: &Neo4jConnection,
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
) -> Vec<MutedNodeIdentifier> {
execute_query(conn, query_str, params, extract_muted_identifier)
let q = query_str.clone();
execute_query(graph, query_str, params, extract_muted_identifier)
.await
.unwrap_or_else(|_| Vec::new())
.unwrap_or_else(|e| {
warn!(query = %q.chars().take(120).collect::<String>(), error = %e, "neo4j muted-nodes read failed after retries");
Vec::new()
})
}

pub async fn execute_count_query(
conn: &Neo4jConnection,
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
) -> usize {
execute_query(conn, query_str, params, extract_count)
let q = query_str.clone();
execute_query(graph, query_str, params, extract_count)
.await
.unwrap_or_else(|_| Vec::new())
.unwrap_or_else(|e| {
warn!(query = %q.chars().take(120).collect::<String>(), error = %e, "neo4j count read failed after retries");
Vec::new()
})
.first()
.copied()
.unwrap_or(0)
}

pub async fn execute_boolean_query(
conn: &Neo4jConnection,
graph: &Neo4jGraph,
query_str: String,
params: BoltMap,
) -> bool {
execute_query(conn, query_str, params, extract_boolean)
let q = query_str.clone();
execute_query(graph, query_str, params, extract_boolean)
.await
.unwrap_or_else(|_| Vec::new())
.unwrap_or_else(|e| {
warn!(query = %q.chars().take(120).collect::<String>(), error = %e, "neo4j boolean read failed after retries");
Vec::new()
})
.first()
.copied()
.unwrap_or(false)
Expand Down
Loading
Loading