From 19d8a1a8c7897f37385ef4b62aaac45999081825 Mon Sep 17 00:00:00 2001 From: kelmith Date: Thu, 21 May 2026 11:09:01 +0530 Subject: [PATCH] feat: reliability on neo4j queries --- ast/src/lang/graphs/neo4j/connection.rs | 21 +- ast/src/lang/graphs/neo4j/executor.rs | 77 +++-- ast/src/lang/graphs/neo4j/graph.rs | 305 ++++++++---------- .../lang/graphs/neo4j/operations/coverage.rs | 38 +-- 4 files changed, 194 insertions(+), 247 deletions(-) diff --git a/ast/src/lang/graphs/neo4j/connection.rs b/ast/src/lang/graphs/neo4j/connection.rs index fa781e2e5..13cae0aa5 100644 --- a/ast/src/lang/graphs/neo4j/connection.rs +++ b/ast/src/lang/graphs/neo4j/connection.rs @@ -9,6 +9,7 @@ use crate::lang::graphs::{ }; use crate::lang::Neo4jGraph; +use crate::lang::graphs::executor::with_transient_retry_reconnect; pub struct Neo4jConnectionManager; impl Neo4jConnectionManager { @@ -50,16 +51,13 @@ impl Neo4jConnectionManager { impl Neo4jGraph { pub async fn create_indexes(&self) -> Result<()> { - let connection: Neo4jConnection = self.ensure_connected().await?; + use with_transient_retry_reconnect; + let queries = vec![ "CREATE INDEX data_bank_node_key_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.node_key)", "CREATE INDEX data_bank_ref_id_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.ref_id)", - // Range index on `file` so incremental sync deletions - // (`remove_nodes_by_files_query` -> `n.file IN $files`) and other - // file-scoped lookups can do an index seek instead of a full - // `:Data_Bank` label scan. Without this an incremental sync had - // to walk every project node for every modified file. "CREATE INDEX data_bank_file_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.file)", + "CREATE INDEX data_bank_name_index IF NOT EXISTS FOR (n:Data_Bank) ON (n.name)", "CREATE FULLTEXT INDEX bodyIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.body]", "CREATE FULLTEXT INDEX nameIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.name]", "CREATE FULLTEXT INDEX nameBodyFileIndex IF NOT EXISTS FOR (n:Data_Bank) ON EACH [n.name, n.body, n.file]", @@ -67,8 +65,15 @@ impl Neo4jGraph { ]; for q in queries { - if let Err(e) = connection.run(neo4rs::query(q)).await { - tracing::warn!("Error creating index: {:?}", e); + if let Err(e) = with_transient_retry_reconnect(self, "create_indexes", || { + async move { + let connection = self.ensure_connected().await?; + connection.run(neo4rs::query(q)).await.map_err(|e| { + shared::Error::dependency(format!("Index creation failed: {}", e)) + }) + } + }).await { + tracing::warn!(index = %q, error = %e, "index creation failed after retries"); } } Ok(()) diff --git a/ast/src/lang/graphs/neo4j/executor.rs b/ast/src/lang/graphs/neo4j/executor.rs index a46f5fed8..3b2703b11 100644 --- a/ast/src/lang/graphs/neo4j/executor.rs +++ b/ast/src/lang/graphs/neo4j/executor.rs @@ -195,30 +195,35 @@ fn extract_coverage_data(row: &neo4rs::Row) -> Result<(NodeData, usize, bool, us )) } -async fn execute_query( - conn: &Neo4jConnection, +async fn execute_query( + graph: &Neo4jGraph, query_str: String, params: BoltMap, - extractor: impl Fn(&neo4rs::Row) -> Result, + extractor: impl Fn(&neo4rs::Row) -> Result + Send + Sync + 'static, ) -> Result> { - let query_obj = bind_parameters(&query_str, params); - let mut results = Vec::new(); - - match conn.execute(query_obj).await { - Ok(mut stream) => { + let label = query_str.chars().take(80).collect::(); + let extractor = std::sync::Arc::new(extractor); + with_transient_retry_reconnect(graph, &label, || { + let query_str = query_str.clone(); + let params = params.clone(); + let extractor = extractor.clone(); + async move { + let conn = graph.ensure_connected().await?; + let query_obj = bind_parameters(&query_str, params); + let mut results = Vec::new(); + let mut stream = conn.execute(query_obj).await.map_err(|e| { + shared::Error::dependency(format!("[neo4j-read] {}: {}", query_str.chars().take(80).collect::(), e)) + })?; while let Ok(Some(row)) = stream.next().await { match extractor(&row) { Ok(item) => results.push(item), Err(_) => continue, } } + Ok(results) } - Err(e) => { - return Err(e.into()); - } - } - - Ok(results) + }) + .await } pub struct TransactionManager<'a> { @@ -340,62 +345,76 @@ pub async fn execute_queries_simple( } pub async fn execute_node_query( - conn: &Neo4jConnection, + graph: &Neo4jGraph, query_str: String, params: BoltMap, ) -> Vec { - execute_query(conn, query_str, params, extract_node_data) + let q = query_str.clone(); + execute_query(graph, query_str, params, extract_node_data) .await .unwrap_or_else(|e| { - warn!("Error executing query: {}", e); + warn!(query = %q.chars().take(120).collect::(), error = %e, "neo4j read failed after retries"); Vec::new() }) } pub async fn execute_nodes_with_coverage_query( - conn: &Neo4jConnection, + graph: &Neo4jGraph, query_str: String, params: BoltMap, ) -> Vec<(NodeData, usize, bool, usize, String)> { - execute_query(conn, query_str, params, extract_coverage_data) + let q = query_str.clone(); + execute_query(graph, query_str, params, extract_coverage_data) .await .unwrap_or_else(|e| { - warn!("Error executing nodes with coverage query: {}", e); + warn!(query = %q.chars().take(120).collect::(), error = %e, "neo4j coverage read failed after retries"); Vec::new() }) } pub async fn execute_muted_nodes_query( - conn: &Neo4jConnection, + graph: &Neo4jGraph, query_str: String, params: BoltMap, ) -> Vec { - execute_query(conn, query_str, params, extract_muted_identifier) + let q = query_str.clone(); + execute_query(graph, query_str, params, extract_muted_identifier) .await - .unwrap_or_else(|_| Vec::new()) + .unwrap_or_else(|e| { + warn!(query = %q.chars().take(120).collect::(), error = %e, "neo4j muted-nodes read failed after retries"); + Vec::new() + }) } pub async fn execute_count_query( - conn: &Neo4jConnection, + graph: &Neo4jGraph, query_str: String, params: BoltMap, ) -> usize { - execute_query(conn, query_str, params, extract_count) + let q = query_str.clone(); + execute_query(graph, query_str, params, extract_count) .await - .unwrap_or_else(|_| Vec::new()) + .unwrap_or_else(|e| { + warn!(query = %q.chars().take(120).collect::(), error = %e, "neo4j count read failed after retries"); + Vec::new() + }) .first() .copied() .unwrap_or(0) } pub async fn execute_boolean_query( - conn: &Neo4jConnection, + graph: &Neo4jGraph, query_str: String, params: BoltMap, ) -> bool { - execute_query(conn, query_str, params, extract_boolean) + let q = query_str.clone(); + execute_query(graph, query_str, params, extract_boolean) .await - .unwrap_or_else(|_| Vec::new()) + .unwrap_or_else(|e| { + warn!(query = %q.chars().take(120).collect::(), error = %e, "neo4j boolean read failed after retries"); + Vec::new() + }) .first() .copied() .unwrap_or(false) diff --git a/ast/src/lang/graphs/neo4j/graph.rs b/ast/src/lang/graphs/neo4j/graph.rs index 3c2f2a7a3..a33c4f91f 100644 --- a/ast/src/lang/graphs/neo4j/graph.rs +++ b/ast/src/lang/graphs/neo4j/graph.rs @@ -175,12 +175,8 @@ impl Neo4jGraph { node_type: NodeType, name_part: &str, ) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_by_name_contains_async"); - return vec![]; - }; let (query, params) = find_nodes_by_name_contains_query(&node_type, name_part); - let nodes = execute_node_query(&connection, query, params).await; + let nodes = execute_node_query(self, query, params).await; let lang_nodes: Vec = nodes .into_iter() @@ -190,24 +186,16 @@ impl Neo4jGraph { lang_nodes } pub async fn find_nodes_by_type_async(&self, node_type: NodeType) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_by_type_async"); - return vec![]; - }; let (query, params) = find_nodes_by_type_query(&node_type); - execute_node_query(&connection, query, params).await + execute_node_query(self, query, params).await } pub async fn find_nodes_by_file_ends_with_async( &self, node_type: NodeType, file: &str, ) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_by_file_ends_with_async"); - return vec![]; - }; let (query, params) = find_nodes_by_file_pattern_query(&node_type, file); - let nodes = execute_node_query(&connection, query, params).await; + let nodes = execute_node_query(self, query, params).await; let lang_nodes: Vec = nodes .into_iter() .filter(|n| self.lang_kind.is_from_language(&n.file)) @@ -222,59 +210,52 @@ impl Neo4jGraph { target_type: NodeType, edge_type: EdgeType, ) -> Vec<(NodeData, NodeData)> { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_with_edge_type_async"); - return vec![]; - }; let (query_str, params) = find_nodes_with_edge_type_query(&source_type, &target_type, &edge_type); - let mut query_obj = query(&query_str); - for (key, value) in params.value.iter() { - query_obj = query_obj.param(key.value.as_str(), value.clone()); - } - let mut node_pairs = Vec::new(); - match connection.execute(query_obj).await { - Ok(mut result) => { - while let Ok(Some(row)) = result.next().await { - let source_name: String = row.get("source_name").unwrap_or_default(); - let source_file: String = row.get("source_file").unwrap_or_default(); - let source_start: i64 = row.get("source_start").unwrap_or_default(); - let target_name: String = row.get("target_name").unwrap_or_default(); - let target_file: String = row.get("target_file").unwrap_or_default(); - let target_start: i64 = row.get("target_start").unwrap_or_default(); - + let result = with_transient_retry_reconnect(self, "find_nodes_with_edge_type", || { + let query_str = query_str.clone(); + let params = params.clone(); + async move { + let conn = self.ensure_connected().await?; + let mut query_obj = query(&query_str); + for (key, value) in params.value.iter() { + query_obj = query_obj.param(key.value.as_str(), value.clone()); + } + let mut node_pairs = Vec::new(); + let mut stream = conn.execute(query_obj).await.map_err(|e| { + Error::dependency(format!("[neo4j-read] find_nodes_with_edge_type: {}", e)) + })?; + while let Ok(Some(row)) = stream.next().await { let source_node = NodeData { - name: source_name, - file: source_file, - start: source_start as usize, + name: row.get("source_name").unwrap_or_default(), + file: row.get("source_file").unwrap_or_default(), + start: row.get::("source_start").unwrap_or_default() as usize, ..Default::default() }; let target_node = NodeData { - name: target_name, - file: target_file, - start: target_start as usize, + name: row.get("target_name").unwrap_or_default(), + file: row.get("target_file").unwrap_or_default(), + start: row.get::("target_start").unwrap_or_default() as usize, ..Default::default() }; node_pairs.push((source_node, target_node)); } + Ok(node_pairs) } - Err(e) => { - debug!("Error executing find_nodes_with_edge_type query: {}", e); - } - } - node_pairs + }) + .await; + + result.unwrap_or_else(|e| { + warn!(error = %e, "neo4j find_nodes_with_edge_type failed after retries"); + vec![] + }) } pub async fn find_nodes_by_name_async(&self, node_type: NodeType, name: &str) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_by_name_async"); - return vec![]; - }; - let (query_str, params_map) = find_nodes_by_name_query(&node_type, name, &self.root); - let nodes = execute_node_query(&connection, query_str, params_map).await; + let nodes = execute_node_query(self, query_str, params_map).await; let lang_nodes: Vec = nodes .into_iter() @@ -288,16 +269,8 @@ impl Neo4jGraph { node_type: NodeType, name: &str, ) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_nodes_by_name_async"); - return vec![]; - }; - let (query_str, params_map) = find_nodes_by_name_query(&node_type, name, &self.root); - - let nodes = execute_node_query(&connection, query_str, params_map).await; - - nodes + execute_node_query(self, query_str, params_map).await } pub async fn find_node_by_name_in_file_async( @@ -306,14 +279,9 @@ impl Neo4jGraph { name: &str, file: &str, ) -> Option { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_node_by_name_in_file_async"); - return None; - }; - let (query, params) = find_node_by_name_file_query(&node_type, name, file); - let nodes = execute_node_query(&connection, query, params) + let nodes = execute_node_query(self, query, params) .await .into_iter(); @@ -330,13 +298,9 @@ impl Neo4jGraph { file: &str, verb: &str, ) -> Option { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_endpoint_async"); - return None; - }; let (query, params) = find_endpoint_query(name, file, verb); - let nodes = execute_node_query(&connection, query, params).await; + let nodes = execute_node_query(self, query, params).await; nodes.into_iter().next() } pub async fn find_resource_nodes_async( @@ -345,22 +309,12 @@ impl Neo4jGraph { verb: &str, path: &str, ) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_resource_nodes_async"); - return vec![]; - }; let (query, params) = find_resource_nodes_query(&node_type, verb, path); - - execute_node_query(&connection, query, params).await + execute_node_query(self, query, params).await } pub async fn find_handlers_for_endpoint_async(&self, endpoint: &NodeData) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_handlers_for_endpoint_async"); - return vec![]; - }; let (query, params) = find_handlers_for_endpoint_query(endpoint); - - execute_node_query(&connection, query, params).await + execute_node_query(self, query, params).await } pub async fn check_direct_data_model_usage_async( @@ -368,32 +322,35 @@ impl Neo4jGraph { function_name: &str, data_model: &str, ) -> bool { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in check_direct_data_model_usage_async"); - return false; - }; let (query_str, params) = check_direct_data_model_usage_query(function_name, data_model); - let mut query_obj = query(&query_str); - for (key, value) in params.value.iter() { - query_obj = query_obj.param(key.value.as_str(), value.clone()); - } - - if let Ok(mut result) = connection.execute(query_obj).await { - if let Ok(Some(row)) = result.next().await { - return row.get::("exists").unwrap_or(false); + let result = with_transient_retry_reconnect(self, "check_direct_data_model_usage", || { + let query_str = query_str.clone(); + let params = params.clone(); + async move { + let conn = self.ensure_connected().await?; + let mut query_obj = query(&query_str); + for (key, value) in params.value.iter() { + query_obj = query_obj.param(key.value.as_str(), value.clone()); + } + let mut result = conn.execute(query_obj).await?; + if let Ok(Some(row)) = result.next().await { + Ok(row.get::("exists").unwrap_or(false)) + } else { + Ok(false) + } } - } - false + }) + .await; + + result.unwrap_or_else(|e| { + warn!(error = %e, "neo4j check_direct_data_model_usage failed after retries"); + false + }) } pub async fn find_functions_called_by_async(&self, function: &NodeData) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_functions_called_by_async"); - return vec![]; - }; let (query, params) = find_functions_called_by_query(function); - - execute_node_query(&connection, query, params).await + execute_node_query(self, query, params).await } pub async fn find_source_edge_by_name_and_file_async( &self, @@ -401,37 +358,37 @@ impl Neo4jGraph { target_name: &str, target_file: &str, ) -> Option { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_source_edge_by_name_and_file_async"); - return None; - }; let (query_str, params) = find_source_edge_by_name_and_file_query(&edge_type, target_name, target_file); - let mut query_obj = query(&query_str); - for (key, value) in params.value.iter() { - query_obj = query_obj.param(key.value.as_str(), value.clone()); - } - - if let Ok(mut result) = connection.execute(query_obj).await { - if let Ok(Some(row)) = result.next().await { - let name = row.get::("name").unwrap_or_default(); - let file = row.get::("file").unwrap_or_default(); - let start = row.get::("start").unwrap_or_default() as usize; - let verb = match row.get::("verb") { - Ok(value) => Some(value), - Err(_) => None, - }; - - return Some(NodeKeys { - name, - file, - start, - verb, - }); + let result = with_transient_retry_reconnect(self, "find_source_edge_by_name_and_file", || { + let query_str = query_str.clone(); + let params = params.clone(); + async move { + let conn = self.ensure_connected().await?; + let mut query_obj = query(&query_str); + for (key, value) in params.value.iter() { + query_obj = query_obj.param(key.value.as_str(), value.clone()); + } + let mut result = conn.execute(query_obj).await?; + if let Ok(Some(row)) = result.next().await { + Ok(Some(NodeKeys { + name: row.get::("name").unwrap_or_default(), + file: row.get::("file").unwrap_or_default(), + start: row.get::("start").unwrap_or_default() as usize, + verb: row.get::("verb").ok(), + })) + } else { + Ok(None) + } } - } - None + }) + .await; + + result.unwrap_or_else(|e| { + warn!(error = %e, "neo4j find_source_edge failed after retries"); + None + }) } pub async fn find_node_in_range_async( &self, @@ -439,13 +396,8 @@ impl Neo4jGraph { row: u32, file: &str, ) -> Option { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_node_in_range_async"); - return None; - }; let (query, params) = find_nodes_in_range_query(&node_type, file, row); - - let nodes = execute_node_query(&connection, query, params).await; + let nodes = execute_node_query(self, query, params).await; nodes.into_iter().next() } pub async fn find_node_at_async( @@ -454,12 +406,8 @@ impl Neo4jGraph { file: &str, line: u32, ) -> Option { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_node_at_async"); - return None; - }; let (query, params) = find_node_at_query(&node_type, file, line); - let nodes = execute_node_query(&connection, query, params).await; + let nodes = execute_node_query(self, query, params).await; nodes.into_iter().next() } pub async fn find_node_by_name_and_file_end_with_async( @@ -571,16 +519,19 @@ impl Neo4jGraph { } pub async fn get_graph_size_async(&self) -> Result<(u32, u32)> { - let connection = self.ensure_connected().await?; - let query_str = count_nodes_edges_query(); - let mut result = connection.execute(query(&query_str)).await?; - if let Some(row) = result.next().await? { - let nodes = row.get::("nodes").unwrap_or(0); - let edges = row.get::("edges").unwrap_or(0); - Ok((nodes, edges)) - } else { - Ok((0, 0)) - } + with_transient_retry_reconnect(self, "get_graph_size", || async { + let connection = self.ensure_connected().await?; + let query_str = count_nodes_edges_query(); + let mut result = connection.execute(query(&query_str)).await?; + if let Some(row) = result.next().await? { + let nodes = row.get::("nodes").unwrap_or(0); + let edges = row.get::("edges").unwrap_or(0); + Ok((nodes, edges)) + } else { + Ok((0, 0)) + } + }) + .await } pub async fn analysis_async(&self) -> Result<()> { let connection = self.ensure_connected().await?; @@ -782,38 +733,40 @@ impl Neo4jGraph { } pub async fn find_top_level_functions_async(&self) -> Vec { - let Ok(connection) = self.ensure_connected().await else { - warn!("Failed to connect to Neo4j in find_top_level_functions_async"); - return vec![]; - }; let (query, params) = find_top_level_functions_query(); - execute_node_query(&connection, query, params).await + execute_node_query(self, query, params).await } pub async fn set_missing_data_bank(&self) -> Result { - let connection = self.ensure_connected().await?; - let query_str = set_missing_data_bank_query(); - let mut result = connection.execute(query(&query_str)).await?; - if let Some(row) = result.next().await? { - let count = row.get::("updated_count").unwrap_or(0); - info!("Set Data_Bank property for {} nodes", count); - Ok(count as u32) - } else { - Ok(0) - } + with_transient_retry_reconnect(self, "set_missing_data_bank", || async { + let connection = self.ensure_connected().await?; + let query_str = set_missing_data_bank_query(); + let mut result = connection.execute(query(&query_str)).await?; + if let Some(row) = result.next().await? { + let count = row.get::("updated_count").unwrap_or(0); + info!("Set Data_Bank property for {} nodes", count); + Ok(count as u32) + } else { + Ok(0) + } + }) + .await } pub async fn set_default_namespace(&self) -> Result { - let connection = self.ensure_connected().await?; - let query_str = set_default_namespace_query(); - let mut result = connection.execute(query(&query_str)).await?; - if let Some(row) = result.next().await? { - let count = row.get::("updated_count").unwrap_or(0); - info!("Set namespace property to 'default' for {} nodes", count); - Ok(count as u32) - } else { - Ok(0) - } + with_transient_retry_reconnect(self, "set_default_namespace", || async { + let connection = self.ensure_connected().await?; + let query_str = set_default_namespace_query(); + let mut result = connection.execute(query(&query_str)).await?; + if let Some(row) = result.next().await? { + let count = row.get::("updated_count").unwrap_or(0); + info!("Set namespace property to 'default' for {} nodes", count); + Ok(count as u32) + } else { + Ok(0) + } + }) + .await } pub async fn get_data_models_within_async(&self, lang: &Lang) -> Result<()> { diff --git a/ast/src/lang/graphs/neo4j/operations/coverage.rs b/ast/src/lang/graphs/neo4j/operations/coverage.rs index 51c4e73b1..1032d5217 100644 --- a/ast/src/lang/graphs/neo4j/operations/coverage.rs +++ b/ast/src/lang/graphs/neo4j/operations/coverage.rs @@ -1,6 +1,5 @@ use shared::Result; use std::collections::HashSet; -use tracing::warn; use crate::lang::graphs::utils::tests_sources; use crate::lang::{ @@ -45,16 +44,8 @@ impl Neo4jGraph { &self, files: &[String], ) -> Result> { - let conn = match self.ensure_connected().await { - Ok(conn) => conn, - Err(e) => { - warn!("Failed to connect to graph database: {:?}", e); - return Ok(Vec::new()); - } - }; - let (query_str, params) = get_muted_nodes_for_files_query(files); - Ok(execute_muted_nodes_query(&conn, query_str, params).await) + Ok(execute_muted_nodes_query(self, query_str, params).await) } pub async fn restore_muted_nodes_async( @@ -65,16 +56,8 @@ impl Neo4jGraph { return Ok(0); } - let conn = match self.ensure_connected().await { - Ok(conn) => conn, - Err(e) => { - warn!("Failed to connect to graph database: {:?}", e); - return Ok(0); - } - }; - let (query_str, params) = restore_muted_status_query(identifiers); - Ok(execute_count_query(&conn, query_str, params).await) + Ok(execute_count_query(self, query_str, params).await) } pub async fn set_node_muted_async( @@ -84,15 +67,8 @@ impl Neo4jGraph { file: &str, is_muted: bool, ) -> Result { - let conn = match self.ensure_connected().await { - Ok(conn) => conn, - Err(_) => { - return Ok(0); - } - }; - let (query_str, params) = set_node_muted_query(node_type, name, file, is_muted); - let result = execute_count_query(&conn, query_str, params).await; + let result = execute_count_query(self, query_str, params).await; Ok(result) } @@ -102,14 +78,8 @@ impl Neo4jGraph { name: &str, file: &str, ) -> Result { - let conn = match self.ensure_connected().await { - Ok(conn) => conn, - Err(_) => { - return Ok(false); - } - }; let (query_str, params) = check_node_muted_query(node_type, name, file); - let is_muted = execute_boolean_query(&conn, query_str, params).await; + let is_muted = execute_boolean_query(self, query_str, params).await; Ok(is_muted) } }