diff --git a/core/src/main/java/org/opensearch/sql/monitor/AlwaysHealthyMonitor.java b/core/src/main/java/org/opensearch/sql/monitor/AlwaysHealthyMonitor.java index 84cec4c9c79..1c3716c07dc 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/AlwaysHealthyMonitor.java +++ b/core/src/main/java/org/opensearch/sql/monitor/AlwaysHealthyMonitor.java @@ -11,7 +11,12 @@ public class AlwaysHealthyMonitor extends ResourceMonitor { /** always healthy. */ @Override - public boolean isHealthy() { + protected boolean isHealthyImpl() { return true; } + + @Override + public ResourceStatus getStatus() { + return ResourceStatus.healthy(ResourceStatus.ResourceType.OTHER); + } } diff --git a/core/src/main/java/org/opensearch/sql/monitor/ResourceMonitor.java b/core/src/main/java/org/opensearch/sql/monitor/ResourceMonitor.java index bbd1c67a62d..3a7db905b58 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/ResourceMonitor.java +++ b/core/src/main/java/org/opensearch/sql/monitor/ResourceMonitor.java @@ -14,6 +14,43 @@ public abstract class ResourceMonitor { * Is the resource healthy. * * @return true for healthy, otherwise false. + * @throws UnsupportedOperationException if the subclass doesn't override getStatus() or + * isHealthyImpl() */ - public abstract boolean isHealthy(); + public boolean isHealthy() { + return getStatus().isHealthy(); + } + + /** + * Get detailed resource status including context for error messages. Subclasses should override + * this method to provide rich status information. + * + * @return ResourceStatus with health state and detailed context + * @throws UnsupportedOperationException if the subclass doesn't override getStatus() or + * isHealthyImpl() + */ + public ResourceStatus getStatus() { + // Default implementation for backwards compatibility + // Subclasses should override this to provide detailed status + boolean healthy = isHealthyImpl(); + return healthy + ? ResourceStatus.healthy(ResourceStatus.ResourceType.OTHER) + : ResourceStatus.builder() + .healthy(false) + .type(ResourceStatus.ResourceType.OTHER) + .description("Resource monitor reports unhealthy status") + .build(); + } + + /** + * Internal implementation for health check. Subclasses that don't override getStatus() should + * override this instead. + * + * @return true for healthy, otherwise false. + */ + protected boolean isHealthyImpl() { + // Subclass must override either getStatus() or isHealthyImpl() + throw new UnsupportedOperationException( + "Subclass must override either getStatus() or isHealthyImpl()"); + } } diff --git a/core/src/main/java/org/opensearch/sql/monitor/ResourceStatus.java b/core/src/main/java/org/opensearch/sql/monitor/ResourceStatus.java new file mode 100644 index 00000000000..5e6c320c11d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/ResourceStatus.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor; + +import lombok.Builder; +import lombok.Getter; + +/** + * Represents the health status of a resource with detailed context information. This wrapper allows + * error messages to include actionable details about resource exhaustion instead of just boolean + * health checks. + */ +@Getter +@Builder +public class ResourceStatus { + /** Type of resource being monitored. */ + public enum ResourceType { + MEMORY, + CPU, + DISK, + OTHER + } + + /** Whether the resource is healthy (within limits). */ + private final boolean healthy; + + /** Type of resource (memory, CPU, etc.). */ + private final ResourceType type; + + /** Human-readable description of resource state. */ + private final String description; + + /** Current resource usage value (optional, for metrics). */ + private final Long currentUsage; + + /** Maximum allowed resource value (optional, for metrics). */ + private final Long maxLimit; + + /** Additional contextual information (optional). */ + private final String additionalContext; + + /** + * Creates a healthy status with minimal information. + * + * @param type Resource type + * @return Healthy ResourceStatus + */ + public static ResourceStatus healthy(ResourceType type) { + return ResourceStatus.builder() + .healthy(true) + .type(type) + .description(type + " resources are healthy") + .build(); + } + + /** + * Creates an unhealthy status with detailed context. + * + * @param type Resource type + * @param currentUsage Current usage value + * @param maxLimit Maximum allowed limit + * @param description Human-readable description + * @return Unhealthy ResourceStatus with context + */ + public static ResourceStatus unhealthy( + ResourceType type, long currentUsage, long maxLimit, String description) { + return ResourceStatus.builder() + .healthy(false) + .type(type) + .currentUsage(currentUsage) + .maxLimit(maxLimit) + .description(description) + .build(); + } + + /** + * Gets a formatted description including usage metrics if available. + * + * @return Formatted description string + */ + public String getFormattedDescription() { + if (currentUsage != null && maxLimit != null) { + if (maxLimit <= 0) { + // Treat invalid limit as 0, don't compute percentage + return String.format( + "%s (current: %s, limit: %s)", description, formatBytes(currentUsage), formatBytes(0)); + } + double percentage = (double) currentUsage / maxLimit * 100; + return String.format( + "%s (current: %s, limit: %s, usage: %.1f%%)", + description, formatBytes(currentUsage), formatBytes(maxLimit), percentage); + } + return description; + } + + /** + * Formats byte values into human-readable format (KB, MB, GB). + * + * @param bytes Byte value + * @return Formatted string + */ + private String formatBytes(long bytes) { + if (bytes < 1024) { + return bytes + "B"; + } else if (bytes < 1024 * 1024) { + return String.format("%.1fKB", bytes / 1024.0); + } else if (bytes < 1024 * 1024 * 1024) { + return String.format("%.1fMB", bytes / (1024.0 * 1024)); + } else { + return String.format("%.1fGB", bytes / (1024.0 * 1024 * 1024)); + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/monitor/ResourceMonitorTest.java b/core/src/test/java/org/opensearch/sql/monitor/ResourceMonitorTest.java new file mode 100644 index 00000000000..aea4b3af213 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/monitor/ResourceMonitorTest.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class ResourceMonitorTest { + + @Test + void testDefaultImplementationThrowsException() { + // Create a minimal subclass that doesn't override getStatus() or isHealthyImpl() + ResourceMonitor monitor = new ResourceMonitor() { + // Intentionally empty - doesn't override anything + }; + + // Attempting to use the default path should throw UnsupportedOperationException + assertThrows(UnsupportedOperationException.class, monitor::isHealthy); + } +} diff --git a/core/src/test/java/org/opensearch/sql/monitor/ResourceStatusTest.java b/core/src/test/java/org/opensearch/sql/monitor/ResourceStatusTest.java new file mode 100644 index 00000000000..84e146b0142 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/monitor/ResourceStatusTest.java @@ -0,0 +1,252 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.monitor.ResourceStatus.ResourceType; + +class ResourceStatusTest { + + @Test + void testHealthyFactoryMethod() { + // Test healthy() for each ResourceType + ResourceStatus memoryStatus = ResourceStatus.healthy(ResourceType.MEMORY); + assertTrue(memoryStatus.isHealthy()); + assertEquals(ResourceType.MEMORY, memoryStatus.getType()); + assertEquals("MEMORY resources are healthy", memoryStatus.getDescription()); + + ResourceStatus cpuStatus = ResourceStatus.healthy(ResourceType.CPU); + assertTrue(cpuStatus.isHealthy()); + assertEquals(ResourceType.CPU, cpuStatus.getType()); + assertEquals("CPU resources are healthy", cpuStatus.getDescription()); + + ResourceStatus diskStatus = ResourceStatus.healthy(ResourceType.DISK); + assertTrue(diskStatus.isHealthy()); + assertEquals(ResourceType.DISK, diskStatus.getType()); + assertEquals("DISK resources are healthy", diskStatus.getDescription()); + + ResourceStatus otherStatus = ResourceStatus.healthy(ResourceType.OTHER); + assertTrue(otherStatus.isHealthy()); + assertEquals(ResourceType.OTHER, otherStatus.getType()); + assertEquals("OTHER resources are healthy", otherStatus.getDescription()); + } + + @Test + void testUnhealthyFactoryMethod() { + // Test unhealthy() for each ResourceType + ResourceStatus memoryStatus = + ResourceStatus.unhealthy(ResourceType.MEMORY, 1024, 2048, "Memory limit exceeded"); + assertFalse(memoryStatus.isHealthy()); + assertEquals(ResourceType.MEMORY, memoryStatus.getType()); + assertEquals("Memory limit exceeded", memoryStatus.getDescription()); + assertEquals(1024L, memoryStatus.getCurrentUsage()); + assertEquals(2048L, memoryStatus.getMaxLimit()); + + ResourceStatus cpuStatus = + ResourceStatus.unhealthy(ResourceType.CPU, 95, 100, "CPU usage too high"); + assertFalse(cpuStatus.isHealthy()); + assertEquals(ResourceType.CPU, cpuStatus.getType()); + assertEquals("CPU usage too high", cpuStatus.getDescription()); + + ResourceStatus diskStatus = + ResourceStatus.unhealthy(ResourceType.DISK, 900, 1000, "Disk space low"); + assertFalse(diskStatus.isHealthy()); + assertEquals(ResourceType.DISK, diskStatus.getType()); + assertEquals("Disk space low", diskStatus.getDescription()); + + ResourceStatus otherStatus = + ResourceStatus.unhealthy(ResourceType.OTHER, 50, 100, "Other resource issue"); + assertFalse(otherStatus.isHealthy()); + assertEquals(ResourceType.OTHER, otherStatus.getType()); + assertEquals("Other resource issue", otherStatus.getDescription()); + } + + @Test + void testGetFormattedDescriptionWithMetrics() { + // Test with metrics set (using values that will format as GB) + ResourceStatus status = + ResourceStatus.unhealthy( + ResourceType.MEMORY, 1536L * 1024 * 1024, 2048L * 1024 * 1024, "Memory usage high"); + + String formatted = status.getFormattedDescription(); + assertNotNull(formatted); + assertTrue(formatted.contains("Memory usage high")); + assertTrue(formatted.contains("current:")); + assertTrue(formatted.contains("limit:")); + assertTrue(formatted.contains("usage:")); + assertTrue(formatted.contains("75.0%")); // 1536/2048 = 0.75 + assertTrue(formatted.contains("1.5GB")); // 1536 MB = 1.5 GB + assertTrue(formatted.contains("2.0GB")); // 2048 MB = 2.0 GB + } + + @Test + void testGetFormattedDescriptionWithoutMetrics() { + // Test with null metrics + ResourceStatus status = + ResourceStatus.builder() + .healthy(false) + .type(ResourceType.MEMORY) + .description("Memory issue detected") + .build(); + + String formatted = status.getFormattedDescription(); + assertEquals("Memory issue detected", formatted); + } + + @Test + void testByteFormattingLessThan1KB() { + // Test < 1KB + ResourceStatus status = ResourceStatus.unhealthy(ResourceType.MEMORY, 512, 1024, "Low memory"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("512B")); + assertTrue(formatted.contains("1.0KB")); + } + + @Test + void testByteFormattingExactly1KB() { + // Test exactly 1KB + ResourceStatus status = + ResourceStatus.unhealthy(ResourceType.MEMORY, 1024, 2048, "Memory warning"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("1.0KB")); + assertTrue(formatted.contains("2.0KB")); + } + + @Test + void testByteFormattingExactly1MB() { + // Test exactly 1MB + long oneMB = 1024 * 1024; + ResourceStatus status = + ResourceStatus.unhealthy(ResourceType.MEMORY, oneMB, oneMB * 2, "Memory warning"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("1.0MB")); + assertTrue(formatted.contains("2.0MB")); + } + + @Test + void testByteFormattingExactly1GB() { + // Test exactly 1GB + long oneGB = 1024L * 1024 * 1024; + ResourceStatus status = + ResourceStatus.unhealthy(ResourceType.MEMORY, oneGB, oneGB * 2, "Memory warning"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("1.0GB")); + assertTrue(formatted.contains("2.0GB")); + } + + @Test + void testByteFormattingZeroUsage() { + // Test with 0 usage + ResourceStatus status = ResourceStatus.unhealthy(ResourceType.MEMORY, 0, 1024, "Zero usage"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("0B")); + assertTrue(formatted.contains("0.0%")); + } + + @Test + void testByteFormattingUsageEqualsLimit() { + // Test when currentUsage == maxLimit + ResourceStatus status = ResourceStatus.unhealthy(ResourceType.MEMORY, 2048, 2048, "At limit"); + + String formatted = status.getFormattedDescription(); + assertTrue(formatted.contains("2.0KB")); + assertTrue(formatted.contains("100.0%")); + } + + @Test + void testByteFormattingMultipleSizes() { + // Test various sizes to ensure proper formatting + // Small bytes + ResourceStatus status1 = ResourceStatus.unhealthy(ResourceType.MEMORY, 100, 1000, "Test"); + assertTrue(status1.getFormattedDescription().contains("100B")); + + // KB range + ResourceStatus status2 = + ResourceStatus.unhealthy(ResourceType.MEMORY, 500 * 1024, 1000 * 1024, "Test"); + assertTrue(status2.getFormattedDescription().contains("500.0KB")); + + // MB range + ResourceStatus status3 = + ResourceStatus.unhealthy( + ResourceType.MEMORY, 500L * 1024 * 1024, 1000L * 1024 * 1024, "Test"); + assertTrue(status3.getFormattedDescription().contains("500.0MB")); + + // GB range + ResourceStatus status4 = + ResourceStatus.unhealthy( + ResourceType.MEMORY, 5L * 1024 * 1024 * 1024, 10L * 1024 * 1024 * 1024, "Test"); + assertTrue(status4.getFormattedDescription().contains("5.0GB")); + } + + @Test + void testMaxLimitZeroOrNegative() { + // Test with maxLimit = 0 (should not compute percentage) + ResourceStatus status1 = + ResourceStatus.unhealthy(ResourceType.MEMORY, 1024, 0, "Invalid limit"); + String formatted1 = status1.getFormattedDescription(); + assertTrue(formatted1.contains("Invalid limit")); + assertTrue(formatted1.contains("current:")); + assertTrue(formatted1.contains("limit:")); + assertFalse(formatted1.contains("%")); // Should not contain percentage + + // Test with maxLimit < 0 (should not compute percentage) + ResourceStatus status2 = + ResourceStatus.unhealthy(ResourceType.MEMORY, 1024, -100, "Negative limit"); + String formatted2 = status2.getFormattedDescription(); + assertTrue(formatted2.contains("Negative limit")); + assertTrue(formatted2.contains("current:")); + assertTrue(formatted2.contains("limit:")); + assertFalse(formatted2.contains("%")); // Should not contain percentage + } + + @Test + void testHealthyWithNullResourceType() { + // Test healthy() with null ResourceType + ResourceStatus status = ResourceStatus.healthy(null); + assertTrue(status.isHealthy()); + assertNull(status.getType()); + assertEquals("null resources are healthy", status.getDescription()); + } + + @Test + void testUnhealthyWithNullResourceType() { + // Test unhealthy() with null ResourceType + ResourceStatus status = ResourceStatus.unhealthy(null, 1024, 2048, "Resource limit exceeded"); + assertFalse(status.isHealthy()); + assertNull(status.getType()); + assertEquals("Resource limit exceeded", status.getDescription()); + assertEquals(1024L, status.getCurrentUsage()); + assertEquals(2048L, status.getMaxLimit()); + } + + @Test + void testUnhealthyWithNullDescription() { + // Test unhealthy() with null description + ResourceStatus status = ResourceStatus.unhealthy(ResourceType.MEMORY, 1024, 2048, null); + assertFalse(status.isHealthy()); + assertEquals(ResourceType.MEMORY, status.getType()); + assertNull(status.getDescription()); + assertEquals(1024L, status.getCurrentUsage()); + assertEquals(2048L, status.getMaxLimit()); + + // Formatted description should handle null description gracefully + String formatted = status.getFormattedDescription(); + assertNotNull(formatted); + assertTrue(formatted.contains("current:")); + assertTrue(formatted.contains("limit:")); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ResourceMonitorIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ResourceMonitorIT.java index ea1fde77493..5da796a5e41 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ResourceMonitorIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ResourceMonitorIT.java @@ -30,9 +30,8 @@ public void queryExceedResourceLimitShouldFail() throws IOException { new ClusterSetting("persistent", Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), "1%")); ResponseException exception = expectThrows(ResponseException.class, this::executeQuery); assertEquals(500, exception.getResponse().getStatusLine().getStatusCode()); - assertThat( - exception.getMessage(), - Matchers.containsString("insufficient resources to run the query, quit.")); + assertThat(exception.getMessage(), Matchers.containsString("Insufficient resources to")); + assertThat(exception.getMessage(), Matchers.containsString("plugins.query.memory_limit")); // update plugins.ppl.query.memory_limit to default value 85% updateClusterSettings( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java index 150a7493580..7ad87409ca6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java @@ -5,6 +5,8 @@ package org.opensearch.sql.opensearch.executor.protector; +import static org.opensearch.common.settings.Settings.EMPTY; + import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -12,8 +14,11 @@ import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; import lombok.ToString; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.monitor.ResourceMonitor; +import org.opensearch.sql.monitor.ResourceStatus; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.planner.SerializablePlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; @@ -36,6 +41,15 @@ public class ResourceMonitorPlan extends PhysicalPlan implements SerializablePla /** Count how many calls to delegate's next() already. */ @EqualsAndHashCode.Exclude private long nextCallCount = 0L; + /** + * Helper method to get the default memory limit string from the Setting constant. + * + * @return Default memory limit string (e.g., "85%") + */ + private static String getDefaultMemoryLimit() { + return OpenSearchSettings.QUERY_MEMORY_LIMIT_SETTING.getDefault(EMPTY).toString(); + } + @Override public R accept(PhysicalPlanNodeVisitor visitor, C context) { return delegate.accept(visitor, context); @@ -43,8 +57,15 @@ public R accept(PhysicalPlanNodeVisitor visitor, C context) { @Override public void open() { - if (!this.monitor.isHealthy()) { - throw new IllegalStateException("insufficient resources to run the query, quit."); + ResourceStatus status = this.monitor.getStatus(); + if (!status.isHealthy()) { + throw new IllegalStateException( + String.format( + "Insufficient resources to start query: %s. " + + "To increase the limit, adjust the '%s' setting (default: %s).", + status.getFormattedDescription(), + Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), + getDefaultMemoryLimit())); } delegate.open(); } @@ -67,8 +88,19 @@ public boolean hasNext() { @Override public ExprValue next() { boolean shouldCheck = (++nextCallCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0); - if (shouldCheck && !this.monitor.isHealthy()) { - throw new IllegalStateException("insufficient resources to load next row, quit."); + if (shouldCheck) { + ResourceStatus status = this.monitor.getStatus(); + if (!status.isHealthy()) { + throw new IllegalStateException( + String.format( + "Insufficient resources to continue processing query: %s. " + + "Rows processed: %d. " + + "To increase the limit, adjust the '%s' setting (default: %s).", + status.getFormattedDescription(), + nextCallCount, + Settings.Key.QUERY_MEMORY_LIMIT.getKeyValue(), + getDefaultMemoryLimit())); + } } return delegate.next(); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java index 2d462f07cc4..cefb1d6cdca 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchMemoryHealthy.java @@ -10,6 +10,7 @@ import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.monitor.ResourceStatus; /** OpenSearch Memory Monitor. */ @Log4j2 @@ -63,6 +64,41 @@ public boolean isMemoryHealthy(long limitBytes) { } } + /** + * Get detailed memory health status with usage metrics. + * + * @param limitBytes Memory limit in bytes + * @return ResourceStatus with detailed memory information + */ + public ResourceStatus getMemoryStatus(long limitBytes) { + final long currentMemoryUsage = this.memoryUsage.usage(); + log.debug("Memory usage:{}, limit:{}", currentMemoryUsage, limitBytes); + + if (currentMemoryUsage < limitBytes) { + return ResourceStatus.builder() + .healthy(true) + .type(ResourceStatus.ResourceType.MEMORY) + .currentUsage(currentMemoryUsage) + .maxLimit(limitBytes) + .description("Memory usage is within limits") + .build(); + } else { + log.warn("Memory usage:{} exceed limit:{}", currentMemoryUsage, limitBytes); + String description = + String.format( + "Memory usage exceeds limit: %d bytes used, %d bytes limit", + currentMemoryUsage, limitBytes); + + return ResourceStatus.builder() + .healthy(false) + .type(ResourceStatus.ResourceType.MEMORY) + .currentUsage(currentMemoryUsage) + .maxLimit(limitBytes) + .description(description) + .build(); + } + } + static class RandomFail { public boolean shouldFail() { return ThreadLocalRandom.current().nextBoolean(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitor.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitor.java index 97bc4771091..6140c0de41e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitor.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitor.java @@ -13,6 +13,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.monitor.ResourceMonitor; +import org.opensearch.sql.monitor.ResourceStatus; /** * {@link ResourceMonitor} implementation on Elasticsearch. When the heap memory usage exceeds @@ -23,19 +24,27 @@ public class OpenSearchResourceMonitor extends ResourceMonitor { private final Settings settings; private final Retry retry; + private final Retry statusRetry; private final OpenSearchMemoryHealthy memoryMonitor; /** Constructor. */ public OpenSearchResourceMonitor(Settings settings, OpenSearchMemoryHealthy memoryMonitor) { this.settings = settings; - RetryConfig config = + RetryConfig booleanRetryConfig = RetryConfig.custom() .maxAttempts(3) .intervalFunction(IntervalFunction.ofExponentialRandomBackoff(1000)) .retryExceptions(OpenSearchMemoryHealthy.MemoryUsageExceedException.class) .ignoreExceptions(OpenSearchMemoryHealthy.MemoryUsageExceedFastFailureException.class) .build(); - retry = Retry.of("mem", config); + RetryConfig statusRetryConfig = + RetryConfig.custom() + .maxAttempts(3) + .intervalFunction(IntervalFunction.ofExponentialRandomBackoff(1000)) + .retryOnResult(status -> status != null && !status.isHealthy()) + .build(); + retry = Retry.of("mem", booleanRetryConfig); + statusRetry = Retry.of("memStatus", statusRetryConfig); this.memoryMonitor = memoryMonitor; } @@ -60,4 +69,31 @@ public boolean isHealthy() { return false; } } + + /** + * Get detailed resource status with memory usage metrics. + * + * @return ResourceStatus with health state and detailed context + */ + @Override + public ResourceStatus getStatus() { + try { + ByteSizeValue limit = settings.getSettingValue(Settings.Key.QUERY_MEMORY_LIMIT); + if (limit == null) { + // undefined, be always healthy + return ResourceStatus.healthy(ResourceStatus.ResourceType.MEMORY); + } + Supplier statusSupplier = + Retry.decorateSupplier( + statusRetry, () -> memoryMonitor.getMemoryStatus(limit.getBytes())); + return statusSupplier.get(); + } catch (Exception e) { + // If we can't determine status, report as unhealthy with error context + return ResourceStatus.builder() + .healthy(false) + .type(ResourceStatus.ResourceType.MEMORY) + .description("Failed to determine memory status: " + e.getMessage()) + .build(); + } + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 05bd00dcf2c..a679261e548 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -62,8 +62,14 @@ public OpenSearchIndexEnumerator( int queryBucketSize, OpenSearchRequest request, ResourceMonitor monitor) { - if (!monitor.isHealthy()) { - throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); + org.opensearch.sql.monitor.ResourceStatus status = monitor.getStatus(); + if (!status.isHealthy()) { + throw new NonFallbackCalciteException( + String.format( + "Insufficient resources to start query: %s. " + + "To increase the limit, adjust the 'plugins.query.memory_limit' setting " + + "(default: 85%%).", + status.getFormattedDescription())); } this.fields = fields; @@ -102,8 +108,17 @@ public boolean moveNext() { } boolean shouldCheck = (queryCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0); - if (shouldCheck && !this.monitor.isHealthy()) { - throw new NonFallbackCalciteException("insufficient resources to load next row, quit."); + if (shouldCheck) { + org.opensearch.sql.monitor.ResourceStatus status = this.monitor.getStatus(); + if (!status.isHealthy()) { + throw new NonFallbackCalciteException( + String.format( + "Insufficient resources to continue processing query: %s. " + + "Rows processed: %d. " + + "To increase the limit, adjust the 'plugins.query.memory_limit' setting " + + "(default: 85%%).", + status.getFormattedDescription(), queryCount)); + } } if (iterator == null || (!iterator.hasNext() && !this.bgScanner.isScanDone())) { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/ResourceMonitorPlanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/ResourceMonitorPlanTest.java index 82062bee76d..310930ffe70 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/ResourceMonitorPlanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/ResourceMonitorPlanTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,16 +44,32 @@ public void setup() { @Test void openExceedResourceLimit() { - when(resourceMonitor.isHealthy()).thenReturn(false); + when(resourceMonitor.getStatus()) + .thenReturn( + org.opensearch.sql.monitor.ResourceStatus.builder() + .healthy(false) + .type(org.opensearch.sql.monitor.ResourceStatus.ResourceType.MEMORY) + .currentUsage(900L * 1024 * 1024) // 900MB + .maxLimit(850L * 1024 * 1024) // 850MB + .description("Memory usage exceeds limit") + .build()); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> monitorPlan.open()); - assertEquals("insufficient resources to run the query, quit.", exception.getMessage()); + assertTrue( + exception.getMessage().contains("Insufficient resources to start query"), + "Expected enriched error message, got: " + exception.getMessage()); + assertTrue( + exception.getMessage().contains("plugins.query.memory_limit"), + "Expected config suggestion in message"); } @Test void openSuccess() { - when(resourceMonitor.isHealthy()).thenReturn(true); + when(resourceMonitor.getStatus()) + .thenReturn( + org.opensearch.sql.monitor.ResourceStatus.healthy( + org.opensearch.sql.monitor.ResourceStatus.ResourceType.MEMORY)); monitorPlan.open(); verify(plan, times(1)).open(); @@ -60,18 +77,29 @@ void openSuccess() { @Test void nextSuccess() { - when(resourceMonitor.isHealthy()).thenReturn(true); + when(resourceMonitor.getStatus()) + .thenReturn( + org.opensearch.sql.monitor.ResourceStatus.healthy( + org.opensearch.sql.monitor.ResourceStatus.ResourceType.MEMORY)); for (int i = 1; i <= 1000; i++) { monitorPlan.next(); } - verify(resourceMonitor, times(1)).isHealthy(); + verify(resourceMonitor, times(1)).getStatus(); verify(plan, times(1000)).next(); } @Test void nextExceedResourceLimit() { - when(resourceMonitor.isHealthy()).thenReturn(false); + when(resourceMonitor.getStatus()) + .thenReturn( + org.opensearch.sql.monitor.ResourceStatus.builder() + .healthy(false) + .type(org.opensearch.sql.monitor.ResourceStatus.ResourceType.MEMORY) + .currentUsage(900L * 1024 * 1024) // 900MB + .maxLimit(850L * 1024 * 1024) // 850MB + .description("Memory usage exceeds limit") + .build()); for (int i = 1; i < 1000; i++) { monitorPlan.next(); @@ -79,7 +107,14 @@ void nextExceedResourceLimit() { IllegalStateException exception = assertThrows(IllegalStateException.class, () -> monitorPlan.next()); - assertEquals("insufficient resources to load next row, quit.", exception.getMessage()); + assertTrue( + exception.getMessage().contains("Insufficient resources to continue processing"), + "Expected enriched error message, got: " + exception.getMessage()); + assertTrue( + exception.getMessage().contains("Rows processed: 1000"), "Expected row count in message"); + assertTrue( + exception.getMessage().contains("plugins.query.memory_limit"), + "Expected config suggestion in message"); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitorTest.java index f56d8cb81b1..6b396692a8b 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitorTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.opensearch.monitor; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyLong; @@ -19,6 +20,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.monitor.ResourceStatus; @ExtendWith(MockitoExtension.class) class OpenSearchResourceMonitorTest { @@ -34,7 +36,7 @@ public void setup() { } @Test - void isHealthy() { + void testIsHealthy() { when(memoryMonitor.isMemoryHealthy(anyLong())).thenReturn(true); OpenSearchResourceMonitor resourceMonitor = @@ -43,7 +45,7 @@ void isHealthy() { } @Test - void notHealthyFastFailure() { + void testNotHealthyFastFailure() { when(memoryMonitor.isMemoryHealthy(anyLong())) .thenThrow(OpenSearchMemoryHealthy.MemoryUsageExceedFastFailureException.class); @@ -54,7 +56,7 @@ void notHealthyFastFailure() { } @Test - void notHealthyWithRetry() { + void testNotHealthyWithRetry() { when(memoryMonitor.isMemoryHealthy(anyLong())) .thenThrow(OpenSearchMemoryHealthy.MemoryUsageExceedException.class); @@ -65,7 +67,7 @@ void notHealthyWithRetry() { } @Test - void healthyWithRetry() { + void testHealthyWithRetry() { when(memoryMonitor.isMemoryHealthy(anyLong())) .thenThrow(OpenSearchMemoryHealthy.MemoryUsageExceedException.class) @@ -76,4 +78,68 @@ void healthyWithRetry() { assertTrue(resourceMonitor.isHealthy()); verify(memoryMonitor, times(2)).isMemoryHealthy(anyLong()); } + + @Test + void testGetStatusHealthy() { + ResourceStatus healthyStatus = + ResourceStatus.builder() + .healthy(true) + .type(ResourceStatus.ResourceType.MEMORY) + .description("Memory is healthy") + .build(); + when(memoryMonitor.getMemoryStatus(anyLong())).thenReturn(healthyStatus); + + OpenSearchResourceMonitor resourceMonitor = + new OpenSearchResourceMonitor(settings, memoryMonitor); + ResourceStatus status = resourceMonitor.getStatus(); + assertTrue(status.isHealthy()); + assertEquals("Memory is healthy", status.getDescription()); + verify(memoryMonitor, times(1)).getMemoryStatus(anyLong()); + } + + @Test + void testGetStatusUnhealthyWithRetry() { + ResourceStatus unhealthyStatus = + ResourceStatus.builder() + .healthy(false) + .type(ResourceStatus.ResourceType.MEMORY) + .description("Memory usage exceeds limit") + .build(); + when(memoryMonitor.getMemoryStatus(anyLong())).thenReturn(unhealthyStatus); + + OpenSearchResourceMonitor resourceMonitor = + new OpenSearchResourceMonitor(settings, memoryMonitor); + ResourceStatus status = resourceMonitor.getStatus(); + assertFalse(status.isHealthy()); + assertEquals("Memory usage exceeds limit", status.getDescription()); + // Should retry 3 times when status is unhealthy + verify(memoryMonitor, times(3)).getMemoryStatus(anyLong()); + } + + @Test + void testGetStatusBecomesHealthyAfterRetry() { + ResourceStatus unhealthyStatus = + ResourceStatus.builder() + .healthy(false) + .type(ResourceStatus.ResourceType.MEMORY) + .description("Memory usage exceeds limit") + .build(); + ResourceStatus healthyStatus = + ResourceStatus.builder() + .healthy(true) + .type(ResourceStatus.ResourceType.MEMORY) + .description("Memory is healthy") + .build(); + when(memoryMonitor.getMemoryStatus(anyLong())) + .thenReturn(unhealthyStatus) + .thenReturn(healthyStatus); + + OpenSearchResourceMonitor resourceMonitor = + new OpenSearchResourceMonitor(settings, memoryMonitor); + ResourceStatus status = resourceMonitor.getStatus(); + assertTrue(status.isHealthy()); + assertEquals("Memory is healthy", status.getDescription()); + // Should call twice: first unhealthy, then healthy + verify(memoryMonitor, times(2)).getMemoryStatus(anyLong()); + } }