Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ public class QueryService {
@Getter(lazy = true)
private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(dataSourceService);

/** Helper: depending on the type of error, either re-raise or propagate to the listener. */
private void propagateCalciteError(Throwable t, ResponseListener<?> listener)
throws VirtualMachineError {
if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw (VirtualMachineError) t;
}
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof ExceptionInInitializerError
&& ((ExceptionInInitializerError) t).getException() instanceof Exception) {
listener.onFailure((Exception) ((ExceptionInInitializerError) t).getException());
} else {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
}
}

/** Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.<br> */
public void execute(
UnresolvedPlan plan,
Expand Down Expand Up @@ -112,18 +130,7 @@ public void executeWithCalcite(
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof ExceptionInInitializerError
&& ((ExceptionInInitializerError) t).getException() instanceof Exception) {
listener.onFailure((Exception) ((ExceptionInInitializerError) t).getException());
} else if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw t;
} else {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
}
propagateCalciteError(t, listener);
}
}
},
Expand Down Expand Up @@ -154,12 +161,7 @@ public void explainWithCalcite(
log.warn("Fallback to V2 query engine since got exception", t);
explainWithLegacy(plan, queryType, listener, mode, Optional.of(t));
} else {
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
} else {
listener.onFailure((Exception) t);
}
propagateCalciteError(t, listener);
}
}
},
Expand All @@ -174,11 +176,11 @@ public void executeWithLegacy(
try {
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
} catch (Exception e) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed(null)) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
t -> listener.onFailure(new RuntimeException(t)), () -> listener.onFailure(e));
if (calciteFailure.isPresent()) {
// This happens if Calcite fell back to V2 due to some issue, and then V2 also failed.
// Prefer the Calcite error.
// https://github.com/opensearch-project/sql/issues/5060
propagateCalciteError(calciteFailure.get(), listener);
} else {
listener.onFailure(e);
}
Expand Down Expand Up @@ -207,11 +209,11 @@ public void explainWithLegacy(
}
executionEngine.explain(plan(analyze(plan, queryType)), listener);
} catch (Exception e) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed(null)) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
t -> listener.onFailure(new RuntimeException(t)), () -> listener.onFailure(e));
if (calciteFailure.isPresent()) {
// This happens if Calcite fell back to V2 due to some issue, and then V2 also failed.
// Prefer the Calcite error.
// https://github.com/opensearch-project/sql/issues/5060
propagateCalciteError(calciteFailure.get(), listener);
} else {
listener.onFailure(e);
}
Expand Down
118 changes: 118 additions & 0 deletions core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,124 @@ public void analyzeExceptionShouldBeCached() {
queryService().analyzeFail().handledByOnFailure();
}

@Test
public void testExecuteWithLegacyShouldReturnCalciteErrorWhenBothFail() {
UnsupportedOperationException calciteException =
new UnsupportedOperationException("Calcite error");
IllegalStateException v2Exception = new IllegalStateException("V2 error");

ResponseListener<ExecutionEngine.QueryResponse> responseListener =
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
fail("Expected onFailure to be called");
}

@Override
public void onFailure(Exception e) {
// Should get the Calcite error directly (not wrapped), not the V2 error
assertNotNull(e);
assertTrue(e instanceof UnsupportedOperationException);
assertTrue(e.getMessage().contains("Calcite error"));
}
};

lenient().when(settings.getSettingValue(Key.CALCITE_ENGINE_ENABLED)).thenReturn(false);
lenient().when(analyzer.analyze(any(), any())).thenThrow(v2Exception);

QueryService service = new QueryService(analyzer, executionEngine, planner, null, settings);
service.executeWithLegacy(ast, queryType, responseListener, Optional.of(calciteException));
}

@Test
public void testExplainWithLegacyShouldReturnCalciteErrorWhenBothFail() {
UnsupportedOperationException calciteException =
new UnsupportedOperationException("Calcite error");
IllegalStateException v2Exception = new IllegalStateException("V2 error");

ResponseListener<ExecutionEngine.ExplainResponse> responseListener =
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.ExplainResponse explainResponse) {
fail("Expected onFailure to be called");
}

@Override
public void onFailure(Exception e) {
// Should get the Calcite error directly (not wrapped), not the V2 error
assertNotNull(e);
assertTrue(e instanceof UnsupportedOperationException);
assertTrue(e.getMessage().contains("Calcite error"));
}
};

lenient().when(settings.getSettingValue(Key.CALCITE_ENGINE_ENABLED)).thenReturn(false);
lenient().when(analyzer.analyze(any(), any())).thenThrow(v2Exception);

QueryService service = new QueryService(analyzer, executionEngine, planner, null, settings);
service.explainWithLegacy(
ast, queryType, responseListener, ExplainMode.STANDARD, Optional.of(calciteException));
}

@Test
public void testExecuteWithLegacyShouldWrapCalciteErrorInCalciteUnsupportedException() {
AssertionError calciteError = new AssertionError("Calcite assertion failed");
IllegalStateException v2Exception = new IllegalStateException("V2 error");

ResponseListener<ExecutionEngine.QueryResponse> responseListener =
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
fail("Expected onFailure to be called");
}

@Override
public void onFailure(Exception e) {
// Errors should be wrapped in CalciteUnsupportedException
assertNotNull(e);
assertTrue(e instanceof org.opensearch.sql.exception.CalciteUnsupportedException);
assertTrue(e.getCause() instanceof AssertionError);
assertTrue(e.getMessage().contains("Calcite assertion failed"));
}
};

lenient().when(settings.getSettingValue(Key.CALCITE_ENGINE_ENABLED)).thenReturn(false);
lenient().when(analyzer.analyze(any(), any())).thenThrow(v2Exception);

QueryService service = new QueryService(analyzer, executionEngine, planner, null, settings);
service.executeWithLegacy(ast, queryType, responseListener, Optional.of(calciteError));
}

@Test
public void testExplainWithLegacyShouldWrapCalciteErrorInCalciteUnsupportedException() {
AssertionError calciteError = new AssertionError("Calcite assertion failed");
IllegalStateException v2Exception = new IllegalStateException("V2 error");

ResponseListener<ExecutionEngine.ExplainResponse> responseListener =
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.ExplainResponse explainResponse) {
fail("Expected onFailure to be called");
}

@Override
public void onFailure(Exception e) {
// Errors should be wrapped in CalciteUnsupportedException
assertNotNull(e);
assertTrue(e instanceof org.opensearch.sql.exception.CalciteUnsupportedException);
assertTrue(e.getCause() instanceof AssertionError);
assertTrue(e.getMessage().contains("Calcite assertion failed"));
}
};

lenient().when(settings.getSettingValue(Key.CALCITE_ENGINE_ENABLED)).thenReturn(false);
lenient().when(analyzer.analyze(any(), any())).thenThrow(v2Exception);

QueryService service = new QueryService(analyzer, executionEngine, planner, null, settings);
service.explainWithLegacy(
ast, queryType, responseListener, ExplainMode.STANDARD, Optional.of(calciteError));
}

Helper queryService() {
return new Helper();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Issue: https://github.com/opensearch-project/sql/issues/5060
# PR: https://github.com/opensearch-project/sql/pull/5133
# When Calcite falls back to V2 and V2 also fails, return the original Calcite error instead of V2's.
#
# The AD command forces a V2 fallback, then join is only supported in V3 (Calcite).
# This test verifies that when both Calcite and V2 fail, the error message correctly shows
# the Calcite error (CalciteUnsupportedException) rather than the V2 error.

setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true
- do:
indices.create:
index: test_join_ad_error_5133
body:
mappings:
properties:
"event.id":
type: keyword
"@timestamp":
type: date
message:
type: text
- do:
bulk:
index: test_join_ad_error_5133
refresh: true
body:
- '{"index": {}}'
- '{"event.id": "evt1", "@timestamp": "2025-01-15T10:30:00Z", "message": "test message 1"}'
- '{"index": {}}'
- '{"event.id": "evt2", "@timestamp": "2025-01-15T10:31:00Z", "message": "test message 2"}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: false
- do:
indices.delete:
index: test_join_ad_error_5133

---
"Join with AD command should return Calcite error when both Calcite and V2 fail":
- skip:
features:
- headers
- allowed_warnings
# Before the fix: Returns V2 error "Join is supported only when plugins.calcite.enabled=true" (status 500)
# After the fix: Returns Calcite error "AD command is unsupported in Calcite" (status 400)
- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
catch: bad_request
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_join_ad_error_5133 | join `event.id` [source = test_join_ad_error_5133] | ad time_field='@timestamp'
- match: { "$body": "/CalciteUnsupportedException/" }
- match: { "$body": "/AD\\s+command\\s+is\\s+unsupported\\s+in\\s+Calcite/" }
Loading