diff --git a/cwms-data-api/build.gradle b/cwms-data-api/build.gradle index f89f388f14..130f69ac8f 100644 --- a/cwms-data-api/build.gradle +++ b/cwms-data-api/build.gradle @@ -304,6 +304,34 @@ task integrationTests(type: Test) { jvmArgs += "-Dcatalina.base=$buildDir/tomcat" } +task timeseriesReadBenchmark(type: JavaExec) { + group "verification" + description = "Run the local time-series read benchmark harness" + dependsOn generateConfig + dependsOn war + dependsOn testClasses + + workingDir = projectDir + classpath = sourceSets.test.runtimeClasspath + classpath += configurations.baseLibs + classpath += configurations.tomcatLibs + + mainClass = "helpers.TimeSeriesReadBenchmark" + + systemProperties += project.properties.findAll { k, v -> k.startsWith("CDA") && !k.startsWith("CDA_JDBC") } + systemProperties += project.properties.findAll { k, v -> k.startsWith("testcontainer") } + systemProperties += project.properties.findAll { k, v -> k.startsWith("benchmark.") } + + jvmArgs += "-DwarFile=$buildDir/libs/${project.name}-${project.version}.war" + jvmArgs += "-DwarContext=/cwms-data" + jvmArgs += "-Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager" + jvmArgs += "-Djava.util.logging.config.file=$projectDir/logging.properties" + jvmArgs += "-Dorg.apache.tomcat.util.digester.PROPERTY_SOURCE=org.apache.tomcat.util.digester.EnvironmentPropertySource" + jvmArgs += "-Dcwms.dataapi.access.provider=MultipleAccessManager" + jvmArgs += "-Dcwms.dataapi.access.providers=KeyAccessManager,CwmsAccessManager" + jvmArgs += "-Dcatalina.base=$buildDir/tomcat" +} + task prepareDockerBuild(type: Copy, dependsOn: war) { doFirst { project.mkdir("$buildDir/docker") diff --git a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java index 04bcf6eb82..fd9d40553b 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java @@ -257,6 +257,13 @@ private Controllers() { } + public static int validateTimeSeriesPageSize(int pageSize) { + if (pageSize < -1) { + throw new IllegalArgumentException(PAGE_SIZE + " must be -1, 0, or a positive integer"); + } + return pageSize; + } + /** * Marks a meter and starts a timer. * diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java index d729568380..80594f448d 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java @@ -354,7 +354,8 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { + "offset and timezone."), @OpenApiParam(name = Controllers.TRIM, type = Boolean.class, description = "Specifies " + "whether to trim missing values from the beginning and end of the " - + "retrieved values. " + + "retrieved values. When true and values are returned, the response " + + BEGIN + " and " + END + " fields reflect the returned data window. " + "Only supported for:" + Formats.JSONV2 + " and " + Formats.XMLV2 + ". " + "Default is true."), @OpenApiParam(name = FORMAT, description = "Specifies the" @@ -383,7 +384,9 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { @OpenApiParam(name = PAGE_SIZE, type = Integer.class, description = "How many entries per page returned. " - + "Default " + DEFAULT_PAGE_SIZE + ".") + + "Default " + DEFAULT_PAGE_SIZE + ". Use 0 to return an empty values array, " + + "or -1 to return the entire window in one response without a next-page cursor. " + + "Values less than -1 are invalid.") }, responses = { @OpenApiResponse(status = STATUS_200, @@ -435,9 +438,10 @@ public void getAll(@NotNull Context ctx) { String.class, "", metrics, name(TimeSeriesController.class.getName(), GET_ALL)); - int pageSize = queryParamAsClass(ctx, new String[]{PAGE_SIZE }, + final int pageSize = Controllers.validateTimeSeriesPageSize(queryParamAsClass(ctx, + new String[]{PAGE_SIZE}, Integer.class, DEFAULT_PAGE_SIZE, metrics, - name(TimeSeriesController.class.getName(), GET_ALL)); + name(TimeSeriesController.class.getName(), GET_ALL))); String acceptHeader = ctx.header(Header.ACCEPT); ContentType contentType = Formats.parseHeaderAndQueryParm(acceptHeader, format, TimeSeries.class); @@ -468,7 +472,8 @@ public void getAll(@NotNull Context ctx) { .build(); // Execute DAO call with a timeout so we can return a clearer message instead of a generic 500 int apiTimeoutMs = Integer.getInteger("cwms.cda.api.apiTimeoutMs", 45000); - CompletableFuture daoFuture = CompletableFuture.supplyAsync(() -> dao.getTimeseries(cursor, pageSize, requestParameters)); + CompletableFuture daoFuture = CompletableFuture.supplyAsync( + () -> dao.getTimeseries(cursor, pageSize, requestParameters)); TimeSeries ts; try { ts = daoFuture.get(apiTimeoutMs, TimeUnit.MILLISECONDS); diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesFilteredController.java b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesFilteredController.java index 3393c23898..9dc34ab180 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesFilteredController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesFilteredController.java @@ -117,7 +117,9 @@ private TimeSeriesDao getTimeSeriesDao(DSLContext dsl) { + "offset and timezone."), @OpenApiParam(name = Controllers.TRIM, type = Boolean.class, description = "Specifies " + "whether to trim missing values from the beginning and end of the " - + "retrieved values. " + + "retrieved values. When true and values are returned, the contained time-series " + + Controllers.BEGIN + " and " + Controllers.END + + " fields reflect the returned data window. " + "Only supported for:" + Formats.JSONV2 + " and " + Formats.XMLV2 + ". " + "Default is true."), @OpenApiParam(name = INCLUDE_ENTRY_DATE, type = Boolean.class, description = "Specifies " @@ -149,7 +151,9 @@ private TimeSeriesDao getTimeSeriesDao(DSLContext dsl) { @OpenApiParam(name = PAGE_SIZE, type = Integer.class, description = "How many entries per page returned. " - + "Default " + DEFAULT_PAGE_SIZE + ".") + + "Default " + DEFAULT_PAGE_SIZE + + ". Use 0 to return an empty values array, or -1 to return the entire window " + + "in one response without a next-page cursor. Values less than -1 are invalid.") }, responses = { @OpenApiResponse(status = STATUS_200, @@ -202,6 +206,7 @@ public void handle(@NotNull Context ctx) { int pageSize = queryParamAsClass(ctx, new String[]{PAGE_SIZE}, Integer.class, DEFAULT_PAGE_SIZE, metrics, name(TimeSeriesController.class.getName(), GET_ALL)); + pageSize = Controllers.validateTimeSeriesPageSize(pageSize); String acceptHeader = ctx.header(Header.ACCEPT); ContentType contentType = Formats.parseHeaderAndQueryParm(acceptHeader, format, TimeSeries.class); diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java index 2102f58eb5..02de50ec45 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java @@ -43,6 +43,7 @@ import cwms.cda.data.dto.filteredtimeseries.FilteredTimeSeries; import cwms.cda.formatters.xml.XMLv1; import cwms.cda.helpers.DateUtils; +import cwms.cda.helpers.ZoneIdHelper; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; @@ -51,7 +52,9 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -72,6 +75,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import mil.army.usace.hec.metadata.Interval; +import mil.army.usace.hec.metadata.IntervalFactory; +import mil.army.usace.hec.metadata.IntervalOffset; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jooq.*; @@ -88,6 +94,7 @@ import usace.cwms.db.jooq.codegen.tables.AV_TSV_DQU; import usace.cwms.db.jooq.codegen.tables.AV_TS_GRP_ASSGN; import usace.cwms.db.jooq.codegen.udt.records.DATE_TABLE_TYPE; +import usace.cwms.db.jooq.codegen.udt.records.DATE_RANGE_T; import usace.cwms.db.jooq.codegen.udt.records.ZTSV_ARRAY; import usace.cwms.db.jooq.codegen.udt.records.ZTSV_TYPE; @@ -127,6 +134,9 @@ public class TimeSeriesDaoImpl extends JooqDao implements TimeSeries ); public static final String VERSIONED_NAME = "isVersioned"; + private static final long UTC_OFFSET_IRREGULAR = -2147483648L; + private static final long UTC_OFFSET_UNDEFINED = 2147483647L; + private static final String UTC = "UTC"; /** To be able to use a named inner table (otherwise JOOQ creates a random alias which messes * with the planner) we need to use fixed names to be able to reference the required columns. @@ -154,6 +164,29 @@ public class TimeSeriesDaoImpl extends JooqDao implements TimeSeries private static final FieldMapping AV_CWMS_TS_ID2_FIELD_MAP = new CwmsTsId2FieldMapping(); private static final FieldMapping AV_CWMS_TS_ID_FIELD_MAP = new CwmsTsIdFieldMapping(); + private static final class DirectReadMetadata { + private final long tsCode; + private final String tsId; + private final String officeId; + private final String units; + private final long intervalMinutes; + private final long intervalUtcOffset; + private final String timeZoneId; + private final String versionFlag; + + private DirectReadMetadata(long tsCode, String tsId, String officeId, String units, + long intervalMinutes, long intervalUtcOffset, + String timeZoneId, String versionFlag) { + this.tsCode = tsCode; + this.tsId = tsId; + this.officeId = officeId; + this.units = units; + this.intervalMinutes = intervalMinutes; + this.intervalUtcOffset = intervalUtcOffset; + this.timeZoneId = timeZoneId; + this.versionFlag = versionFlag; + } + } @NotNull private final Timer getRequestedTimeSeriesTotalQueryTimer; @@ -259,8 +292,18 @@ public FilteredTimeSeries getTimeseries(String page, int pageSize, TimeSeriesReq return fts; } - protected TimeSeries getRequestedTimeSeries(String page, int pageSize, @NotNull TimeSeriesRequestParameters requestParameters, - @Nullable FilteredTimeSeriesParameters fp) { + protected TimeSeries getRequestedTimeSeries(String page, int pageSize, + @NotNull TimeSeriesRequestParameters requestParameters, + @Nullable FilteredTimeSeriesParameters fp) { + if (fp != null) { + return getRequestedTimeSeriesLegacy(page, pageSize, requestParameters, fp); + } + return getRequestedTimeSeriesDirect(page, pageSize, requestParameters); + } + + protected TimeSeries getRequestedTimeSeriesLegacy(String page, int pageSize, + @NotNull TimeSeriesRequestParameters requestParameters, + @Nullable FilteredTimeSeriesParameters fp) { String names = requestParameters.getNames(); String office = requestParameters.getOffice(); @@ -455,6 +498,13 @@ protected TimeSeries getRequestedTimeSeries(String page, int pageSize, @NotNull Record tsMetadata = metadataQuery.fetchOne(); + if (pageSize == 0) { + Integer resolvedTotal = resolveTotalQueryFuture(totalQueryFuture, totalQueryDeadlineNanos, + names, office, beginTime, endTime); + return buildTimeSeriesFromMetadata(tsMetadata, resolvedTotal, names, office, + beginTime, endTime, units, versionDate, recordCursor, recordPageSize, tzName); + } + String retrievalMethod; if (includeEntryDate) { retrievalMethod = "cwms_20.cwms_ts.retrieve_ts_entry_out_tab"; // New method that supports entry date @@ -556,6 +606,10 @@ protected TimeSeries getRequestedTimeSeries(String page, int pageSize, @NotNull getRequestedTimeSeriesResultsReturnedHistogram.update(timeseries.getValues().size()); } + if (retVal != null) { + retVal.alignWindowToReturnedValues(shouldTrim); + } + return retVal; } @@ -636,6 +690,540 @@ private TimeSeries buildTimeSeriesFromMetadata(Record tsMetadata, @Nullable Inte ); } + private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize, + @NotNull TimeSeriesRequestParameters requestParameters) { + String names = requestParameters.getNames(); + String office = requestParameters.getOffice(); + String requestedUnits = requestParameters.getUnits(); + ZonedDateTime beginTime = requestParameters.getBeginTime(); + ZonedDateTime endTime = requestParameters.getEndTime(); + ZonedDateTime versionDate = requestParameters.getVersionDate(); + boolean includeEntryDate = requestParameters.isIncludeEntryDate(); + String cursor = null; + Timestamp tsCursor = null; + + if (page != null && !page.isEmpty()) { + final String[] parts = CwmsDTOPaginated.decodeCursor(page); + + logger.atFine().log("Decoded cursor"); + logger.atFinest().log("%s", lazy(() -> { + StringBuilder sb = new StringBuilder(); + for (String p : parts) { + sb.append(p).append("\n"); + } + return sb.toString(); + })); + + if (parts.length > 1) { + cursor = parts[0]; + tsCursor = Timestamp.from(Instant.ofEpochMilli(Long.parseLong(parts[0]))); + pageSize = Integer.parseInt(parts[parts.length - 1]); + } + } + + DirectReadMetadata metadata = fetchRequestedTimeSeriesMetadataRecord(requestParameters); + if (metadata == null) { + throw new DataAccessException("Unable to resolve time series metadata for " + names); + } + + long tsCode = metadata.tsCode; + String tsId = metadata.tsId; + String[] tsIdParts = splitTimeSeriesId(tsId); + String metadataOfficeId = metadata.officeId; + String metadataUnits = metadata.units; + String locPart = getTimeSeriesIdPart(tsIdParts, 0); + String parmPart = getTimeSeriesIdPart(tsIdParts, 1); + String intervalPart = getTimeSeriesIdPart(tsIdParts, 3); + long intervalMinutes = metadata.intervalMinutes; + long intervalOffset = metadata.intervalUtcOffset; + String timeZoneId = metadata.timeZoneId; + boolean isLrts = parseBool(CWMS_TS_PACKAGE.call_IS_LRTS__2(dsl.configuration(), tsCode)); + + VerticalDatumInfo verticalDatumInfo = null; + if (shouldFetchVerticalDatum(parmPart)) { + verticalDatumInfo = fetchVerticalDatumInfoSeparately(locPart, requestedUnits, office); + } + + VersionType finalDateVersionType = getDirectReadVersionType( + metadata.versionFlag, versionDate != null); + + // Pagination happens after regular-interval gap rows are merged + // fetch the full raw window first + List rawRows = fetchRequestedTimeSeriesRows(tsCode, metadataOfficeId, metadataUnits, + requestParameters, includeEntryDate); + long effectiveIntervalOffset = intervalOffset; + if (isRegularSeries(intervalMinutes, intervalPart)) { + effectiveIntervalOffset = resolveIntervalOffset(intervalOffset, timeZoneId, intervalPart, isLrts, rawRows); + } + + List expectedTimes = fetchExpectedRegularTimes(intervalMinutes, effectiveIntervalOffset, timeZoneId, + intervalPart, isLrts, requestParameters, rawRows); + int total = countMergedRows(rawRows, expectedTimes); + + TimeSeries timeseries = new TimeSeries( + cursor, + pageSize, + total, + tsId, + metadataOfficeId, + beginTime, + endTime, + metadataUnits, + resolveIntervalDuration(intervalMinutes, intervalPart), + verticalDatumInfo, + effectiveIntervalOffset, + timeZoneId, + versionDate, + finalDateVersionType + ); + + if (pageSize == 0) { + return timeseries; + } + + populateTimeSeriesValues(timeseries, rawRows, expectedTimes, tsCursor, includeEntryDate); + return timeseries.alignWindowToReturnedValues(requestParameters.isShouldTrim()); + } + + private DirectReadMetadata fetchRequestedTimeSeriesMetadataRecord( + TimeSeriesRequestParameters requestParameters) { + String names = requestParameters.getNames(); + String office = requestParameters.getOffice(); + String units = requestParameters.getUnits(); + + final Field officeId = CWMS_UTIL_PACKAGE.call_GET_DB_OFFICE_ID( + office != null ? DSL.val(office) : CWMS_UTIL_PACKAGE.call_USER_OFFICE_ID()); + final Field tsId = CWMS_TS_PACKAGE.call_GET_TS_ID__2(DSL.val(names), officeId); + final Field tsCode = CWMS_TS_PACKAGE.call_GET_TS_CODE__2(DSL.val(names), officeId); + + Table> validTs = + select(tsCode.as("tscode"), + tsId.as("tsid"), + officeId.as("office_id")) + .asTable("validts"); + + Field unit = units.compareToIgnoreCase("SI") == 0 + || units.compareToIgnoreCase("EN") == 0 + ? CWMS_UTIL_PACKAGE.call_GET_DEFAULT_UNITS( + CWMS_TS_PACKAGE.call_GET_BASE_PARAMETER_ID(tsCode), + DSL.val(units, String.class)) + : DSL.val(units, String.class); + + Field interval = CWMS_TS_PACKAGE.call_GET_TS_INTERVAL__2(validTs.field("tsid", String.class)); + + CommonTableExpression valid = + name("valid").fields("tscode", "tsid", "office_id", "units", "interval") + .as( + select( + validTs.field("tscode", BigDecimal.class).as("tscode"), + validTs.field("tsid", String.class).as("tsid"), + validTs.field("office_id", String.class).as("office_id"), + unit.as("units"), + interval.as("interval")) + .from(validTs)); + + var tsIdView = AV_CWMS_TS_ID.AV_CWMS_TS_ID; + + SelectJoinStep metadataQuery = + dsl.with(valid) + .select( + valid.field("tscode", BigDecimal.class).as("tscode"), + valid.field("tsid", String.class).as("tsid"), + valid.field("office_id", String.class).as("office_id"), + valid.field("units", String.class).as("units"), + valid.field("interval", BigDecimal.class).as("interval"), + tsIdView.INTERVAL_UTC_OFFSET.as("interval_utc_offset"), + tsIdView.TIME_ZONE_ID.as("time_zone_id"), + tsIdView.field("VERSION_FLAG", String.class).as("version_flag")) + .from(valid) + .leftOuterJoin(tsIdView) + .on(tsIdView.DB_OFFICE_ID.eq(valid.field("office_id", String.class)) + .and(tsIdView.TS_CODE.eq(valid.field("tscode", BigDecimal.class)))); + + logger.atFine().log("%s", lazy(() -> metadataQuery.getSQL(ParamType.INLINED))); + + return metadataQuery.fetchOne(record -> new DirectReadMetadata( + record.getValue("tscode", BigDecimal.class).longValue(), + record.getValue("tsid", String.class), + record.getValue("office_id", String.class), + record.getValue("units", String.class), + record.getValue("interval", BigDecimal.class) == null + ? 0L + : record.getValue("interval", BigDecimal.class).longValue(), + record.getValue("interval_utc_offset", Number.class) == null + ? UTC_OFFSET_IRREGULAR + : record.getValue("interval_utc_offset", Number.class).longValue(), + record.getValue("time_zone_id", String.class) == null + ? UTC + : record.getValue("time_zone_id", String.class), + record.getValue("version_flag", String.class))); + } + + private List fetchRequestedTimeSeriesRows(long tsCode, String officeId, String units, + TimeSeriesRequestParameters requestParameters, + boolean includeEntryDate) { + ZonedDateTime beginTime = requestParameters.getBeginTime(); + ZonedDateTime endTime = requestParameters.getEndTime(); + ZonedDateTime versionDate = requestParameters.getVersionDate(); + Timestamp beginTimestamp = Timestamp.from(beginTime.toInstant()); + Timestamp endTimestamp = Timestamp.from(endTime.toInstant()); + + AV_TSV_DQU view = AV_TSV_DQU.AV_TSV_DQU; + Field qualityForNormalization = DSL.nvl( + view.QUALITY_CODE.cast(BigDecimal.class), + DSL.val(BigDecimal.valueOf(5)) + ); + Field normalizedQuality = CWMS_TS_PACKAGE.call_NORMALIZE_QUALITY( + qualityForNormalization).as("quality_norm"); + + Condition baseCondition = view.ALIASED_ITEM.isNull() + .and(view.TS_CODE.eq(tsCode)) + .and(view.OFFICE_ID.eq(officeId)) + // Invalid unit requests surface as a database error rather than an empty result set. + .and(view.UNIT_ID.equalIgnoreCase(units)) + .and(view.DATE_TIME.ge(beginTimestamp)) + .and(view.DATE_TIME.le(endTimestamp)) + .and(view.START_DATE.le(endTimestamp)) + .and(view.END_DATE.gt(beginTimestamp)); + + ResultQuery> query; + if (versionDate != null) { + query = buildVersionedRowsQuery(view, normalizedQuality, baseCondition, versionDate, includeEntryDate); + } else { + query = buildMaxVersionRowsQuery(view, normalizedQuality, baseCondition, includeEntryDate); + } + + logger.atFine().log("%s", lazy(() -> query.getSQL(ParamType.INLINED))); + + return query.fetch(record -> { + Timestamp dateTime = record.getValue(0, Timestamp.class); + Double value = record.getValue(1, Double.class); + int qualityCode = record.getValue(2, BigDecimal.class).intValue(); + Timestamp dataEntryDate = record.getValue(3, Timestamp.class); + if (dataEntryDate != null) { + return new TimeSeries.Record(dateTime, value, qualityCode, dataEntryDate); + } + return new TimeSeries.Record(dateTime, value, qualityCode); + }); + } + + private ResultQuery> buildVersionedRowsQuery( + AV_TSV_DQU view, + Field normalizedQuality, + Condition baseCondition, + ZonedDateTime versionDate, + boolean includeEntryDate) { + Field versionTimestamp = CWMS_UTIL_PACKAGE.call_TO_TIMESTAMP__2( + DSL.val(versionDate.toInstant().toEpochMilli())); + Field dataEntryDateField = includeEntryDate + ? view.DATA_ENTRY_DATE + : DSL.castNull(Timestamp.class).as(DATA_ENTRY_DATE); + + return dsl.select( + view.DATE_TIME, + view.VALUE, + normalizedQuality, + dataEntryDateField) + .from(view) + .where(baseCondition.and(view.VERSION_DATE.eq(versionTimestamp))) + .orderBy(view.DATE_TIME.asc()); + } + + private ResultQuery> buildMaxVersionRowsQuery( + AV_TSV_DQU view, + Field normalizedQuality, + Condition baseCondition, + boolean includeEntryDate) { + var rankedRows = dsl.select( + view.DATE_TIME.as(DATE_TIME), + view.VALUE.as(VALUE), + normalizedQuality, + includeEntryDate + ? view.DATA_ENTRY_DATE.as(DATA_ENTRY_DATE) + : DSL.castNull(Timestamp.class).as(DATA_ENTRY_DATE), + DSL.rowNumber() + .over(partitionBy(view.DATE_TIME) + .orderBy(view.VERSION_DATE.desc(), view.DATA_ENTRY_DATE.desc())) + .as("version_rank")) + .from(view) + .where(baseCondition) + .asTable("ranked_rows"); + + Field dateTimeCol = rankedRows.field(DATE_TIME, Timestamp.class); + Field valueCol = rankedRows.field(VALUE, Double.class); + Field qualityCol = rankedRows.field("quality_norm", BigDecimal.class); + Field dataEntryDateCol = rankedRows.field(DATA_ENTRY_DATE, Timestamp.class); + Field versionRankCol = rankedRows.field("version_rank", Integer.class); + + return dsl.select(dateTimeCol, valueCol, qualityCol, dataEntryDateCol) + .from(rankedRows) + .where(versionRankCol.eq(1)) + .orderBy(dateTimeCol.asc()); + } + + private List fetchExpectedRegularTimes(long intervalMinutes, long intervalOffset, String timeZoneId, + String intervalPart, boolean isLrts, + TimeSeriesRequestParameters requestParameters, + List rawRows) { + boolean shouldTrim = requestParameters.isShouldTrim(); + if (!isRegularSeries(intervalMinutes, intervalPart)) { + return Collections.emptyList(); + } + // Trimmed requests collapse to the observed data window + // there is nothing to expand if no rows matched + if (rawRows.isEmpty() && shouldTrim) { + return Collections.emptyList(); + } + + Timestamp rangeStart = shouldTrim + ? rawRows.get(0).getDateTime() + : Timestamp.from(requestParameters.getBeginTime().toInstant()); + Timestamp rangeEnd = shouldTrim + ? rawRows.get(rawRows.size() - 1).getDateTime() + : Timestamp.from(requestParameters.getEndTime().toInstant()); + + Interval expectedInterval = resolveExpectedInterval(intervalPart); + if (expectedInterval != null) { + return buildExpectedRegularTimes(rangeStart, rangeEnd, intervalOffset, expectedInterval, + getExpectedTimeZone(timeZoneId, isLrts)); + } + + String intervalTimeZone = isLrts ? timeZoneId : UTC; + DATE_RANGE_T dateRange = new DATE_RANGE_T(rangeStart, rangeEnd, UTC, "T", "T", null); + DATE_TABLE_TYPE expectedTimeTable = CWMS_TS_PACKAGE.call_GET_REG_TS_TIMES_UTC_F( + dsl.configuration(), + dateRange, + intervalPart, + String.valueOf(intervalOffset), + intervalTimeZone + ); + + List retVal = new ArrayList<>(); + if (expectedTimeTable != null) { + expectedTimeTable.forEach(timestamp -> { + if (timestamp != null) { + retVal.add(normalizeOracleUtcTimestamp(timestamp)); + } + }); + } + return retVal; + } + + private long resolveIntervalOffset(long intervalOffset, String timeZoneId, + String intervalPart, boolean isLrts, List rawRows) { + if (intervalOffset != UTC_OFFSET_UNDEFINED && intervalOffset != UTC_OFFSET_IRREGULAR) { + return intervalOffset; + } + if (rawRows.isEmpty()) { + return 0L; + } + + Interval expectedInterval = resolveExpectedInterval(intervalPart); + if (expectedInterval != null) { + try { + Instant firstTime = rawRows.get(0).getDateTime().toInstant(); + Instant topOfInterval = expectedInterval.getTimeOnPreviousOrCurrentInterval( + firstTime, + IntervalOffset.zeroOffset(), + getExpectedTimeZone(timeZoneId, isLrts) + ); + return TimeUnit.MILLISECONDS.toMinutes(firstTime.toEpochMilli() - topOfInterval.toEpochMilli()); + } catch (mil.army.usace.hec.metadata.DataSetIllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to resolve interval offset for " + intervalPart, ex); + } + } + + String intervalTimeZone = isLrts ? timeZoneId : UTC; + Timestamp topOfInterval = normalizeOracleUtcTimestamp(CWMS_TS_PACKAGE.call_TOP_OF_INTERVAL_UTC( + dsl.configuration(), + rawRows.get(0).getDateTime(), + intervalPart, + intervalTimeZone, + "F" + )); + return (rawRows.get(0).getDateTime().getTime() - topOfInterval.getTime()) / TimeUnit.MINUTES.toMillis(1); + } + + private boolean isRegularSeries(long intervalMinutes, String intervalPart) { + return intervalMinutes != 0L || isLocalRegularInterval(intervalPart); + } + + private Duration resolveIntervalDuration(long intervalMinutes, String intervalPart) { + if (intervalMinutes != 0L) { + return Duration.ofMinutes(intervalMinutes); + } + + Interval interval = resolveExpectedInterval(intervalPart); + if (interval != null) { + return Duration.ofSeconds(interval.getSeconds()); + } + + return Duration.ZERO; + } + + private int countMergedRows(List rawRows, List expectedTimes) { + if (expectedTimes.isEmpty()) { + return rawRows.size(); + } + + int total = 0; + int rawIndex = 0; + int expectedIndex = 0; + while (rawIndex < rawRows.size() || expectedIndex < expectedTimes.size()) { + Timestamp rawTime = rawIndex < rawRows.size() ? rawRows.get(rawIndex).getDateTime() : null; + Timestamp expectedTime = expectedIndex < expectedTimes.size() ? expectedTimes.get(expectedIndex) : null; + + if (rawTime == null) { + expectedIndex++; + } else if (expectedTime == null) { + rawIndex++; + } else { + int compare = compareTimestampOrder(expectedTime, rawTime); + if (compare < 0) { + expectedIndex++; + } else if (compare > 0) { + rawIndex++; + } else { + expectedIndex++; + rawIndex++; + } + } + total++; + } + return total; + } + + private void populateTimeSeriesValues(TimeSeries timeseries, + List rawRows, + List expectedTimes, + Timestamp tsCursor, + boolean includeEntryDate) { + int rawIndex = 0; + int expectedIndex = 0; + int collected = 0; + int maxRecords = timeseries.getPageSize() > 0 ? timeseries.getPageSize() + 1 : Integer.MAX_VALUE; + + while ((rawIndex < rawRows.size() || expectedIndex < expectedTimes.size()) && collected < maxRecords) { + TimeSeries.Record rawRow = rawIndex < rawRows.size() ? rawRows.get(rawIndex) : null; + Timestamp expectedTime = expectedIndex < expectedTimes.size() ? expectedTimes.get(expectedIndex) : null; + + Timestamp candidateTime; + TimeSeries.Record candidateRow = null; + boolean syntheticRow = false; + + if (rawRow == null) { + candidateTime = expectedTime; + syntheticRow = true; + expectedIndex++; + } else if (expectedTime == null) { + candidateTime = rawRow.getDateTime(); + candidateRow = rawRow; + rawIndex++; + } else { + int compare = compareTimestampOrder(expectedTime, rawRow.getDateTime()); + if (compare < 0) { + candidateTime = expectedTime; + syntheticRow = true; + expectedIndex++; + } else if (compare > 0) { + candidateTime = rawRow.getDateTime(); + candidateRow = rawRow; + rawIndex++; + } else { + candidateTime = rawRow.getDateTime(); + candidateRow = rawRow; + rawIndex++; + expectedIndex++; + } + } + + if (tsCursor != null && compareTimestampOrder(candidateTime, tsCursor) < 0) { + continue; + } + + if (syntheticRow) { + if (includeEntryDate) { + timeseries.addValue(candidateTime, null, 0, null); + } else { + timeseries.addValue(candidateTime, null, 0); + } + } else if (includeEntryDate) { + timeseries.addValue(candidateRow.getDateTime(), candidateRow.getValue(), + candidateRow.getQualityCode(), candidateRow.getDataEntryDate()); + } else { + timeseries.addValue(candidateRow.getDateTime(), candidateRow.getValue(), + candidateRow.getQualityCode()); + } + collected++; + } + } + + private int compareTimestampOrder(Timestamp left, Timestamp right) { + return Long.compare(left.getTime(), right.getTime()); + } + + private Timestamp normalizeOracleUtcTimestamp(Timestamp timestamp) { + LocalDateTime utcWallTime = timestamp.toLocalDateTime(); + return Timestamp.from(utcWallTime.toInstant(ZoneOffset.UTC)); + } + + @Nullable + private Interval resolveExpectedInterval(String intervalPart) { + if (intervalPart == null) { + return null; + } + + return IntervalFactory.findAny(IntervalFactory.equalsName(normalizeIntervalNameForNucleus(intervalPart))) + .orElse(null); + } + + private List buildExpectedRegularTimes(Timestamp rangeStart, + Timestamp rangeEnd, + long offsetMinutes, + Interval interval, + ZoneId intervalTimeZone) { + List expectedTimes = new ArrayList<>(); + IntervalOffset intervalOffset = IntervalOffset.fromSeconds(Math.toIntExact( + TimeUnit.MINUTES.toSeconds(offsetMinutes))); + Instant endTime = rangeEnd.toInstant(); + + try { + Instant nextTime = interval.getTimeOnNextOrCurrentInterval(rangeStart.toInstant(), intervalOffset, + intervalTimeZone); + while (!nextTime.isAfter(endTime)) { + expectedTimes.add(Timestamp.from(nextTime)); + nextTime = interval.getNextIntervalTime(nextTime, intervalTimeZone); + } + } catch (mil.army.usace.hec.metadata.DataSetIllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to build expected times for " + interval.getInterval(), ex); + } + return expectedTimes; + } + + private ZoneId getExpectedTimeZone(String timeZoneId, boolean isLrts) { + if (!isLrts) { + return ZoneOffset.UTC; + } + return ZoneIdHelper.parseZoneIdWithAliases(timeZoneId); + } + + private String normalizeIntervalNameForNucleus(String intervalPart) { + if (intervalPart.startsWith("~")) { + return intervalPart; + } + if (intervalPart.length() > 5 + && intervalPart.regionMatches(true, intervalPart.length() - 5, "Local", 0, 5)) { + return "~" + intervalPart.substring(0, intervalPart.length() - 5); + } + return intervalPart; + } + + private boolean isLocalRegularInterval(String intervalPart) { + if (intervalPart == null) { + return false; + } + return normalizeIntervalNameForNucleus(intervalPart).startsWith("~"); + } private boolean shouldFetchVerticalDatum(String parmPart) { // Check if parameter requires vertical datum (e.g., "ELEV") @@ -681,9 +1269,23 @@ private static String getVersionPart(ZonedDateTime versionDate) { return "?"; } + private static VersionType getDirectReadVersionType(String versionFlag, boolean versionDateProvided) { + if (versionDateProvided) { + return VersionType.SINGLE_VERSION; + } + return parseBool(versionFlag) ? VersionType.MAX_AGGREGATE : VersionType.UNVERSIONED; + } + + private static String[] splitTimeSeriesId(String tsId) { + return tsId.split("\\.", 6); + } + + private static String getTimeSeriesIdPart(String[] tsIdParts, int index) { + return tsIdParts.length > index ? tsIdParts[index] : null; + } + public static String parseLocFromTimeSeriesId(String tsId) { - String[] parts = tsId.split("\\."); - return parts[0]; + return getTimeSeriesIdPart(splitTimeSeriesId(tsId), 0); } public static String getTimeZoneId(DSLContext dsl, String tsId, String officeId) { diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/TimeSeries.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/TimeSeries.java index 05ee21c8f7..ef6856ce07 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dto/TimeSeries.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/TimeSeries.java @@ -24,6 +24,7 @@ import java.lang.reflect.Field; import java.sql.Timestamp; import java.time.Duration; +import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; @@ -68,14 +69,16 @@ public class TimeSeries extends CwmsDTOPaginated { @JsonFormat(shape = Shape.STRING) @Schema( accessMode = AccessMode.READ_ONLY, - description = "The requested start time of the data, in ISO-8601 format with offset and timezone ('" + ZONED_DATE_TIME_FORMAT + "')" + description = "The start time represented by the values in this response, in ISO-8601 format with offset and timezone ('" + + ZONED_DATE_TIME_FORMAT + "'). When trim=true and values are returned, this reflects the first returned value." ) ZonedDateTime begin; @JsonFormat(shape = Shape.STRING) @Schema( accessMode = AccessMode.READ_ONLY, - description = "The requested end time of the data, in ISO-8601 format with offset and timezone ('" + ZONED_DATE_TIME_FORMAT + "')" + description = "The end time represented by the values in this response, in ISO-8601 format with offset and timezone ('" + + ZONED_DATE_TIME_FORMAT + "'). When trim=true and values are returned, this reflects the last returned value." ) ZonedDateTime end; @@ -229,8 +232,8 @@ public void addValue(Timestamp dateTime, Double value, int qualityCode, Timestam } public void addValue(Record record) { - // Set the current page, if not set - if ((page == null || page.isEmpty()) && (values == null || values.isEmpty())) { + // Only paged responses expose cursors. page-size=-1 requests the entire window. + if (pageSize > 0 && (page == null || page.isEmpty()) && (values == null || values.isEmpty())) { page = encodeCursor(String.format("%d", record.dateTime.getTime()), pageSize, total); } if (pageSize > 0 && values.size() == pageSize) { @@ -246,6 +249,16 @@ public TimeSeries withValues(List values) { return this; } + public TimeSeries alignWindowToReturnedValues(boolean trim) { + if (!trim || values == null || values.isEmpty()) { + return this; + } + + begin = values.get(0).getDateTime().toInstant().atZone(ZoneOffset.UTC); + end = values.get(values.size() - 1).getDateTime().toInstant().atZone(ZoneOffset.UTC); + return this; + } + public static List getColumnDescriptor() { List columns = new ArrayList<>(); for (Field f: Record.class.getDeclaredFields()) { diff --git a/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesDirectReadParityIT.java b/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesDirectReadParityIT.java new file mode 100644 index 0000000000..f30b152b21 --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesDirectReadParityIT.java @@ -0,0 +1,722 @@ +package cwms.cda.api; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import cwms.cda.api.enums.VersionType; +import cwms.cda.data.dto.TimeSeries; +import cwms.cda.formatters.Formats; +import cwms.cda.formatters.json.JsonV2; +import fixtures.CwmsDataApiSetupCallback; +import io.restassured.filter.log.LogDetail; +import io.restassured.response.ExtractableResponse; +import io.restassured.response.Response; +import io.restassured.specification.RequestSpecification; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.servlet.http.HttpServletResponse; +import mil.army.usace.hec.test.database.CwmsDatabaseContainer; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import usace.cwms.db.jooq.codegen.packages.CWMS_TS_PACKAGE; + +@Tag("integration") +final class TimeSeriesDirectReadParityIT extends DataApiTestIT { + private static final ObjectMapper OBJECT_MAPPER = JsonV2.buildObjectMapper(); + private static final String OFFICE = "SPK"; + private static final double DOUBLE_TOLERANCE = 1e-9; + + @Test + void denseRegularReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARREG", + "ITPARREG.Stage.Inst.1Minute.0.BENCH", + "ft", + Instant.parse("2024-01-01T00:00:00Z"), + Instant.parse("2024-01-01T00:05:00Z"), + denseRows(), + false, + false, + VersionType.UNVERSIONED, + Duration.ofMinutes(1), + 0L, + null + ); + } + + @Test + void denseRegularEntryDateReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARREG", + "ITPARREG.Stage.Inst.1Minute.0.BENCH", + "ft", + Instant.parse("2024-01-01T00:00:00Z"), + Instant.parse("2024-01-01T00:05:00Z"), + denseRows(), + false, + true, + VersionType.UNVERSIONED, + Duration.ofMinutes(1), + 0L, + null + ); + } + + @Test + void gapFilledRegularReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARGAP", + "ITPARGAP.Stage.Inst.1Minute.0.BENCH", + "ft", + Instant.parse("2024-01-01T00:00:00Z"), + Instant.parse("2024-01-01T00:09:00Z"), + gapRows(), + false, + false, + VersionType.UNVERSIONED, + Duration.ofMinutes(1), + 0L, + null + ); + } + + @Test + void maxVersionReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARVER", + "ITPARVER.Flow.Inst.1Hour.0.BENCH", + "cfs", + Instant.parse("2024-05-01T15:00:00Z"), + Instant.parse("2024-05-01T18:00:00Z"), + versionedRows(), + true, + false, + VersionType.MAX_AGGREGATE, + Duration.ofHours(1), + 0L, + null + ); + } + + @Test + void specificVersionReadMatchesRetrieveTs() throws Exception { + Instant newerVersion = Instant.parse("2024-06-21T08:00:00Z"); + assertDirectReadMatchesOracle( + "ITPARVER", + "ITPARVER.Flow.Inst.1Hour.0.BENCH", + "cfs", + Instant.parse("2024-05-01T15:00:00Z"), + Instant.parse("2024-05-01T18:00:00Z"), + versionedRows(), + true, + false, + VersionType.SINGLE_VERSION, + Duration.ofHours(1), + 0L, + newerVersion + ); + } + + @Test + void irregularReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARIRR", + "ITPARIRR.Flow.Inst.0.0.BENCH", + "cfs", + Instant.parse("2024-01-05T12:00:00Z"), + Instant.parse("2024-01-05T12:33:10Z"), + irregularRows(), + false, + false, + VersionType.UNVERSIONED, + Duration.ZERO, + (long) Integer.MIN_VALUE, + null + ); + } + + @Test + void dstWindowRegularReadMatchesRetrieveTs() throws Exception { + Instant dstStart = Instant.parse("2024-03-09T00:00:00Z"); + assertDirectReadMatchesOracle( + "ITPARDST", + "ITPARDST.Stage.Inst.1Minute.0.BENCH", + "ft", + dstStart, + dstStart.plus(Duration.ofMinutes(4999)), + regularRows(dstStart, 5000, 1.0, Duration.ofDays(1)), + false, + false, + VersionType.UNVERSIONED, + Duration.ofMinutes(1), + 0L, + null + ); + } + + @Test + void localRegularGapReadMatchesRetrieveTs() throws Exception { + assertDirectReadMatchesOracle( + "ITPARLCL", + "ITPARLCL.Flow.Inst.~1Day.0.BENCH", + "cfs", + Instant.parse("2024-01-01T00:00:00Z"), + Instant.parse("2024-01-05T00:00:00Z"), + List.of( + row("2024-01-01T00:00:00Z", 1.0, 0, "2024-01-06T00:00:00Z", null), + row("2024-01-02T00:00:00Z", 2.0, 0, "2024-01-06T00:00:00Z", null), + row("2024-01-04T00:00:00Z", 4.0, 0, "2024-01-06T00:00:00Z", null), + row("2024-01-05T00:00:00Z", 5.0, 0, "2024-01-06T00:00:00Z", null) + ), + false, + false, + VersionType.UNVERSIONED, + Duration.ofDays(1), + 0L, + null + ); + } + + @Test + void pageSizeZeroReturnsEmptyValuesArray() throws Exception { + List rows = denseRows(); + Instant beginTime = Instant.parse("2024-01-01T00:00:00Z"); + Instant endTime = Instant.parse("2024-01-01T00:05:00Z"); + seedTimeSeries("ITPARPZ0", "ITPARPZ0.Stage.Inst.1Minute.0.BENCH", rows, false); + + TimeSeries response = fetchCdaRowsWithPageSize( + "ITPARPZ0.Stage.Inst.1Minute.0.BENCH", + "ft", + beginTime, + endTime, + 0, + false, + null, + true + ); + + assertEquals(0, response.getPageSize(), "page-size"); + assertNotNull(response.getValues(), "values"); + assertEquals(0, response.getValues().size(), "values size"); + assertEquals(rows.size(), response.getTotal(), "total"); + assertNull(response.getPage(), "page"); + assertNull(response.getNextPage(), "next-page"); + } + + @Test + void pageSizeNegativeOneReturnsWholeWindowWithoutPagination() throws Exception { + List rows = denseRows(); + Instant beginTime = Instant.parse("2024-01-01T00:00:00Z"); + Instant endTime = Instant.parse("2024-01-01T00:05:00Z"); + seedTimeSeries("ITPARALL", "ITPARALL.Stage.Inst.1Minute.0.BENCH", rows, false); + + TimeSeries response = fetchCdaRowsWithPageSize( + "ITPARALL.Stage.Inst.1Minute.0.BENCH", + "ft", + beginTime, + endTime, + -1, + false, + null, + true + ); + + assertEquals(-1, response.getPageSize(), "page-size"); + assertEquals(rows.size(), response.getValues().size(), "values size"); + assertEquals(rows.size(), response.getTotal(), "total"); + assertNull(response.getPage(), "page"); + assertNull(response.getNextPage(), "next-page"); + } + + @Test + void trimmedResponseWindowMatchesReturnedValues() throws Exception { + List rows = gapRows(); + seedTimeSeries("ITPARTRM", "ITPARTRM.Stage.Inst.1Minute.0.BENCH", rows, false); + + TimeSeries response = fetchCdaRowsWithPageSize( + "ITPARTRM.Stage.Inst.1Minute.0.BENCH", + "ft", + Instant.parse("2023-12-31T23:59:00Z"), + Instant.parse("2024-01-01T00:10:00Z"), + 1000, + false, + null, + true + ); + + assertNotNull(response.getBegin(), "begin"); + assertNotNull(response.getEnd(), "end"); + assertEquals(Instant.parse("2024-01-01T00:00:00Z"), response.getBegin().toInstant(), "begin"); + assertEquals(Instant.parse("2024-01-01T00:09:00Z"), response.getEnd().toInstant(), "end"); + } + + private static void assertDirectReadMatchesOracle(String locationId, String seriesId, String units, + Instant beginTime, Instant endTime, List rows, + boolean versioned, boolean includeEntryDate, + VersionType expectedDateVersionType, + Duration expectedInterval, long expectedIntervalOffset, + Instant versionDate) throws Exception { + seedTimeSeries(locationId, seriesId, rows, versioned); + + List expectedRows = fetchOracleRows(seriesId, units, beginTime, endTime, + includeEntryDate, versionDate); + TimeSeries actualResponse = fetchCdaRows(seriesId, units, beginTime, endTime, rows.size(), + includeEntryDate, versionDate); + String mismatchSummary = buildMismatchSummary(expectedRows, actualResponse); + + assertNotNull(actualResponse.getTotal(), "Reported total " + mismatchSummary); + assertEquals(expectedRows.size(), actualResponse.getTotal(), "Reported total " + mismatchSummary); + assertEquals(expectedDateVersionType, actualResponse.getDateVersionType(), "Date version type"); + assertEquals(expectedInterval, actualResponse.getInterval(), "Interval"); + assertEquals(expectedIntervalOffset, actualResponse.getIntervalOffset(), "Interval offset"); + + if (versionDate != null) { + assertNotNull(actualResponse.getVersionDate(), "Version date"); + assertEquals(versionDate, actualResponse.getVersionDate().toInstant(), "Version date"); + } else { + assertNull(actualResponse.getVersionDate(), "Version date"); + } + + assertNotNull(actualResponse.getValues(), "Values " + mismatchSummary); + assertEquals(expectedRows.size(), actualResponse.getValues().size(), "Row count " + mismatchSummary); + for (int index = 0; index < expectedRows.size(); index++) { + assertRecordsEqual(expectedRows.get(index), actualResponse.getValues().get(index), index); + } + } + + private static List denseRows() { + return List.of( + row("2024-01-01T00:00:00Z", 1.0, 0, "2024-01-02T00:00:00Z", null), + row("2024-01-01T00:01:00Z", 2.0, 0, "2024-01-02T00:01:00Z", null), + row("2024-01-01T00:02:00Z", 3.0, 0, "2024-01-02T00:02:00Z", null), + row("2024-01-01T00:03:00Z", 4.0, 0, "2024-01-02T00:03:00Z", null), + row("2024-01-01T00:04:00Z", 5.0, 0, "2024-01-02T00:04:00Z", null), + row("2024-01-01T00:05:00Z", 6.0, 0, "2024-01-02T00:05:00Z", null) + ); + } + + private static List gapRows() { + return List.of( + row("2024-01-01T00:00:00Z", 1.0, 0, "2024-01-03T00:00:00Z", null), + row("2024-01-01T00:01:00Z", 2.0, 0, "2024-01-03T00:01:00Z", null), + row("2024-01-01T00:02:00Z", 3.0, 0, "2024-01-03T00:02:00Z", null), + row("2024-01-01T00:05:00Z", 6.0, 0, "2024-01-03T00:05:00Z", null), + row("2024-01-01T00:06:00Z", 7.0, 0, "2024-01-03T00:06:00Z", null), + row("2024-01-01T00:07:00Z", 8.0, 0, "2024-01-03T00:07:00Z", null), + row("2024-01-01T00:08:00Z", 9.0, 0, "2024-01-03T00:08:00Z", null), + row("2024-01-01T00:09:00Z", 10.0, 0, "2024-01-03T00:09:00Z", null) + ); + } + + private static List versionedRows() { + Instant olderVersion = Instant.parse("2024-06-20T08:00:00Z"); + Instant newerVersion = Instant.parse("2024-06-21T08:00:00Z"); + return List.of( + row("2024-05-01T15:00:00Z", 4.0, 0, "2024-06-20T09:00:00Z", olderVersion), + row("2024-05-01T16:00:00Z", 4.0, 0, "2024-06-20T09:01:00Z", olderVersion), + row("2024-05-01T17:00:00Z", 4.0, 0, "2024-06-20T09:02:00Z", olderVersion), + row("2024-05-01T18:00:00Z", 3.0, 0, "2024-06-20T09:03:00Z", olderVersion), + row("2024-05-01T15:00:00Z", 1.0, 0, "2024-06-21T09:00:00Z", newerVersion), + row("2024-05-01T16:00:00Z", 1.0, 0, "2024-06-21T09:01:00Z", newerVersion), + row("2024-05-01T17:00:00Z", 1.0, 0, "2024-06-21T09:02:00Z", newerVersion) + ); + } + + private static List irregularRows() { + return List.of( + row("2024-01-05T12:00:00Z", 10.0, 0, "2024-01-06T00:00:00Z", null), + row("2024-01-05T12:07:20Z", 20.0, 0, "2024-01-06T00:01:00Z", null), + row("2024-01-05T12:19:45Z", 30.0, 0, "2024-01-06T00:02:00Z", null), + row("2024-01-05T12:33:10Z", 40.0, 0, "2024-01-06T00:03:00Z", null) + ); + } + + private static SeedRow row(String dateTime, Double value, int qualityCode, String dataEntryDate, + Instant versionDate) { + return new SeedRow( + Instant.parse(dateTime), + value, + qualityCode, + Instant.parse(dataEntryDate), + versionDate + ); + } + + private static List regularRows(Instant start, int count, double firstValue, + Duration entryDateOffset) { + return IntStream.range(0, count) + .mapToObj(index -> new SeedRow( + start.plusSeconds(index * 60L), + firstValue + index, + 0, + start.plus(entryDateOffset).plusSeconds(index * 60L), + null + )) + .collect(Collectors.toList()); + } + + private static String buildMismatchSummary(List expectedRows, TimeSeries actualResponse) { + return "expectedRows=" + summarizeRows(expectedRows) + + " actualRows=" + summarizeRows(actualResponse.getValues()) + + " actualTotal=" + actualResponse.getTotal(); + } + + private static String summarizeRows(List rows) { + if (rows == null) { + return "null"; + } + + return rows.stream() + .limit(12) + .map(row -> "{t=" + toMillis(row.getDateTime()) + + ",v=" + row.getValue() + + ",q=" + row.getQualityCode() + + ",e=" + toMillis(row.getDataEntryDate()) + + "}") + .collect(Collectors.joining(", ", "[", rows.size() > 12 ? ", ...]" : "]")); + } + + private static long toMillis(Timestamp timestamp) { + return timestamp != null ? timestamp.getTime() : Long.MIN_VALUE; + } + + private static void assertRecordsEqual(TimeSeries.Record expected, TimeSeries.Record actual, int index) { + assertEquals(expected.getDateTime(), actual.getDateTime(), "Row " + index + " timestamp"); + assertEquals(expected.getQualityCode(), actual.getQualityCode(), "Row " + index + " quality"); + + if (expected.getValue() == null) { + assertNull(actual.getValue(), "Row " + index + " value"); + } else { + assertNotNull(actual.getValue(), "Row " + index + " value"); + assertEquals(expected.getValue(), actual.getValue(), DOUBLE_TOLERANCE, "Row " + index + " value"); + } + + assertEquals(expected.getDataEntryDate(), actual.getDataEntryDate(), "Row " + index + " entry date"); + } + + private static void seedTimeSeries(String locationId, String seriesId, List rows, + boolean versioned) throws SQLException { + createLocation(locationId, true, OFFICE); + createTimeseries(OFFICE, seriesId, 0); + + CwmsDatabaseContainer database = CwmsDataApiSetupCallback.getDatabaseLink(); + database.connection(connection -> { + try { + if (versioned) { + CWMS_TS_PACKAGE.call_SET_TSID_VERSIONED(DSL.using(connection).configuration(), + seriesId, + "T", + OFFICE); + } + + long tsCode = findTsCode(connection, seriesId); + List years = rows.stream() + .map(seedRow -> OffsetDateTime.ofInstant(seedRow.dateTime, ZoneOffset.UTC).getYear()) + .distinct() + .collect(Collectors.toList()); + + clearScenarioRows(connection, tsCode, years); + insertScenarioRows(connection, tsCode, rows); + updateScenarioExtents(connection, tsCode, rows); + } catch (SQLException e) { + throw new RuntimeException("Unable to seed time series " + seriesId, e); + } + }, "cwms_20"); + } + + private static long findTsCode(Connection connection, String seriesId) throws SQLException { + String sql = "select ts_code from at_cwms_ts_id where db_office_id = ? and cwms_ts_id = ?"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setString(1, OFFICE); + statement.setString(2, seriesId); + try (ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + throw new IllegalStateException("Unable to find ts_code for " + seriesId); + } + return resultSet.getLong(1); + } + } + } + + private static void clearScenarioRows(Connection connection, long tsCode, List years) throws SQLException { + for (Integer year : years) { + try (PreparedStatement statement = connection.prepareStatement( + "delete from at_tsv_" + year + " where ts_code = ?")) { + statement.setLong(1, tsCode); + statement.executeUpdate(); + } + } + + try (PreparedStatement statement = connection.prepareStatement( + "delete from at_ts_extents where ts_code = ?")) { + statement.setLong(1, tsCode); + statement.executeUpdate(); + } + } + + private static void insertScenarioRows(Connection connection, long tsCode, List rows) + throws SQLException { + List sortedRows = new ArrayList<>(rows); + sortedRows.sort(Comparator.comparing(seedRow -> seedRow.dateTime)); + + Map> rowsByYear = new LinkedHashMap<>(); + for (SeedRow row: sortedRows) { + int year = OffsetDateTime.ofInstant(row.dateTime, ZoneOffset.UTC).getYear(); + rowsByYear.computeIfAbsent(year, ignored -> new ArrayList<>()).add(row); + } + + for (Map.Entry> entry: rowsByYear.entrySet()) { + String sql = "insert into at_tsv_" + entry.getKey() + + " (ts_code, date_time, version_date, data_entry_date, value, quality_code, dest_flag)" + + " values (?, ?, ?, ?, ?, ?, 0)"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + int batchCount = 0; + for (SeedRow row: entry.getValue()) { + bindScenarioInsert(statement, tsCode, row); + statement.addBatch(); + batchCount++; + if (batchCount % 1000 == 0) { + statement.executeBatch(); + } + } + statement.executeBatch(); + } + } + } + + private static void bindScenarioInsert(PreparedStatement statement, long tsCode, SeedRow row) + throws SQLException { + statement.setLong(1, tsCode); + statement.setTimestamp(2, Timestamp.from(row.dateTime)); + statement.setTimestamp(3, Timestamp.from(row.versionDate != null + ? row.versionDate + : Instant.parse("1111-11-11T00:00:00Z"))); + if (row.dataEntryDate != null) { + statement.setTimestamp(4, Timestamp.from(row.dataEntryDate)); + } else { + statement.setNull(4, Types.TIMESTAMP); + } + if (row.value != null) { + statement.setDouble(5, row.value); + } else { + statement.setNull(5, Types.DOUBLE); + } + statement.setInt(6, row.qualityCode); + } + + private static void updateScenarioExtents(Connection connection, long tsCode, List rows) + throws SQLException { + Set distinctVersionDates = rows.stream() + .map(seedRow -> seedRow.versionDate) + .filter(Objects::nonNull) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + if (distinctVersionDates.isEmpty()) { + updateTsExtents(connection, tsCode, "date '1111-11-11'"); + return; + } + + for (Instant versionDate : distinctVersionDates) { + updateTsExtents(connection, tsCode, toOracleDateExpression(versionDate)); + } + } + + private static void updateTsExtents(Connection connection, long tsCode, String versionDateExpression) + throws SQLException { + String sql = "begin cwms_ts.update_ts_extents(" + tsCode + ", " + versionDateExpression + "); end;"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.execute(); + } + } + + private static List fetchOracleRows(String seriesId, String units, Instant beginTime, + Instant endTime, boolean includeEntryDate, + Instant versionDate) throws SQLException { + CwmsDatabaseContainer database = CwmsDataApiSetupCallback.getDatabaseLink(); + return database.connection(connection -> { + try { + String functionName = includeEntryDate + ? "cwms_20.cwms_ts.retrieve_ts_entry_out_tab" + : "cwms_20.cwms_ts.retrieve_ts_out_tab"; + String rowProjection = includeEntryDate + ? ", case when data_entry_date is null then null else round((cast(data_entry_date as date) - date '1970-01-01') * 86400000) end as data_entry_date_ms" + : ""; + String maxVersionFlag = versionDate != null ? "F" : "T"; + String sql = "select round((date_time - date '1970-01-01') * 86400000) as date_time_ms," + + " value," + + " quality_code" + + rowProjection + + " from table(" + functionName + "(" + + "?, " + + "?, " + + "?, " + + "?, " + + "'UTC', 'T', 'T', 'T', 'F', 'F', " + + "?, " + + "?, " + + "?" + + "))" + + " order by date_time"; + + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setString(1, seriesId); + statement.setString(2, units); + statement.setTimestamp(3, Timestamp.from(beginTime)); + statement.setTimestamp(4, Timestamp.from(endTime)); + if (versionDate != null) { + statement.setTimestamp(5, Timestamp.from(versionDate)); + } else { + statement.setNull(5, Types.TIMESTAMP); + } + statement.setString(6, maxVersionFlag); + statement.setString(7, OFFICE); + try (ResultSet resultSet = statement.executeQuery()) { + List rows = new ArrayList<>(); + while (resultSet.next()) { + Double value = resultSet.getDouble("value"); + if (resultSet.wasNull()) { + value = null; + } + + Long dataEntryDateMillis = null; + if (includeEntryDate) { + long entryMillis = resultSet.getLong("data_entry_date_ms"); + if (!resultSet.wasNull()) { + dataEntryDateMillis = entryMillis; + } + } + + rows.add(toRecord( + resultSet.getLong("date_time_ms"), + value, + resultSet.getInt("quality_code"), + dataEntryDateMillis + )); + } + return rows; + } + } + } catch (SQLException e) { + throw new RuntimeException("Unable to fetch Oracle rows for " + seriesId, e); + } + }, "cwms_20"); + } + + private static TimeSeries.Record toRecord(long dateTimeMillis, Double value, int qualityCode, + Long dataEntryDateMillis) { + Timestamp dateTime = Timestamp.from(Instant.ofEpochMilli(dateTimeMillis)); + if (dataEntryDateMillis != null) { + return new TimeSeries.Record(dateTime, value, qualityCode, + Timestamp.from(Instant.ofEpochMilli(dataEntryDateMillis))); + } + return new TimeSeries.Record(dateTime, value, qualityCode); + } + + private static TimeSeries fetchCdaRows(String seriesId, String units, Instant beginTime, Instant endTime, + int seedRowCount, boolean includeEntryDate, Instant versionDate) + throws Exception { + int pageSize = Math.max(1000, seedRowCount * 2); + return fetchCdaRowsWithPageSize(seriesId, units, beginTime, endTime, pageSize, includeEntryDate, + versionDate, true); + } + + private static TimeSeries fetchCdaRowsWithPageSize(String seriesId, String units, Instant beginTime, + Instant endTime, int pageSize, boolean includeEntryDate, + Instant versionDate, boolean trim) + throws Exception { + RequestSpecification request = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.JSONV2) + .queryParam(Controllers.OFFICE, OFFICE) + .queryParam(Controllers.NAME, seriesId) + .queryParam(Controllers.UNIT, units) + .queryParam(Controllers.BEGIN, beginTime.toString()) + .queryParam(Controllers.END, endTime.toString()) + .queryParam(Controllers.PAGE_SIZE, pageSize) + .queryParam(Controllers.TRIM, trim) + .queryParam(Controllers.INCLUDE_ENTRY_DATE, includeEntryDate); + if (versionDate != null) { + request = request.queryParam(Controllers.VERSION_DATE, versionDate.toString()); + } + + ExtractableResponse response = request.when() + .redirects().follow(true) + .redirects().max(3) + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL, true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_OK)) + .extract(); + + String responseBody = response.asString(); + TimeSeries timeSeries = OBJECT_MAPPER.readValue(responseBody, TimeSeries.class); + if (!includeEntryDate) { + return timeSeries; + } + + JsonNode payload = OBJECT_MAPPER.readTree(responseBody); + List values = new ArrayList<>(); + for (JsonNode entry : payload.get("values")) { + Long dataEntryDateMillis = null; + if (entry.size() > 3 && !entry.get(3).isNull()) { + dataEntryDateMillis = entry.get(3).asLong(); + } + values.add(toRecord( + entry.get(0).asLong(), + entry.get(1).isNull() ? null : entry.get(1).asDouble(), + entry.get(2).asInt(), + dataEntryDateMillis + )); + } + return timeSeries.withValues(values); + } + + private static String toOracleDateExpression(Instant instant) { + LocalDateTime utc = LocalDateTime.ofInstant(instant, ZoneOffset.UTC); + return "to_date('" + utc.format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + + "', 'yyyy-mm-dd hh24:mi:ss')"; + } + + private static final class SeedRow { + private final Instant dateTime; + private final Double value; + private final int qualityCode; + private final Instant dataEntryDate; + private final Instant versionDate; + + private SeedRow(Instant dateTime, Double value, int qualityCode, Instant dataEntryDate, + Instant versionDate) { + this.dateTime = dateTime; + this.value = value; + this.qualityCode = qualityCode; + this.dataEntryDate = dataEntryDate; + this.versionDate = versionDate; + } + } +} diff --git a/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesFilteredControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesFilteredControllerTestIT.java index 4c9d43a0bb..56bdb49786 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesFilteredControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/TimeSeriesFilteredControllerTestIT.java @@ -32,6 +32,86 @@ class TimeSeriesFilteredControllerTestIT extends DataApiTestIT { static FluentLogger logger = FluentLogger.forEnclosingClass(); public static final String JSON_FILE = "/cwms/cda/api/lrl/1hour.json"; + @ParameterizedTest + @ValueSource(strings = {Formats.JSONV2, Formats.DEFAULT}) + void test_page_size_special_cases(String format) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + InputStream resource = this.getClass().getResourceAsStream(JSON_FILE); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(Controllers.NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + try { + createLocation(location, true, officeId); + + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + + given() + .log().ifValidationFails(LogDetail.ALL,true) + .accept(format) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization",user.toHeaderValue()) + .queryParam(Controllers.OFFICE, officeId) + .when() + .redirects().follow(true) + .redirects().max(3) + .post("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_OK)); + + given() + .config(RestAssured.config().jsonConfig(jsonConfig().numberReturnType(JsonPathConfig.NumberReturnType.DOUBLE))) + .log().ifValidationFails(LogDetail.ALL,true) + .accept(format) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.UNIT,"cfs") + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN,"2023-01-11T12:00:00-00:00") + .queryParam(Controllers.END,"2023-01-11T15:00:00-00:00") + .queryParam(Controllers.PAGE_SIZE, 0) + .when() + .redirects().follow(true) + .redirects().max(3) + .get("/timeseries/filtered/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_OK)) + .body("page-size", equalTo(0)) + .body("time-series.values.size()", equalTo(0)); + + given() + .config(RestAssured.config().jsonConfig(jsonConfig().numberReturnType(JsonPathConfig.NumberReturnType.DOUBLE))) + .log().ifValidationFails(LogDetail.ALL,true) + .accept(format) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.UNIT,"cfs") + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN,"2023-01-11T12:00:00-00:00") + .queryParam(Controllers.END,"2023-01-11T15:00:00-00:00") + .queryParam(Controllers.PAGE_SIZE, -1) + .when() + .redirects().follow(true) + .redirects().max(3) + .get("/timeseries/filtered/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .assertThat() + .statusCode(is(HttpServletResponse.SC_OK)) + .body("page-size", equalTo(-1)) + .body("time-series.values.size()", equalTo(4)); + } catch (SQLException ex) { + throw new RuntimeException("Unable to create location for TS", ex); + } + } + @ParameterizedTest @ValueSource(strings = {Formats.JSONV2, Formats.DEFAULT}) void test_filter_nulls(String format) throws Exception { diff --git a/cwms-data-api/src/test/java/fixtures/CwmsDataApiSetupCallback.java b/cwms-data-api/src/test/java/fixtures/CwmsDataApiSetupCallback.java index 7781b7ca0c..03994a2321 100644 --- a/cwms-data-api/src/test/java/fixtures/CwmsDataApiSetupCallback.java +++ b/cwms-data-api/src/test/java/fixtures/CwmsDataApiSetupCallback.java @@ -266,6 +266,39 @@ public static CwmsDatabaseContainer getDatabaseLink() { return cwmsDb; } + public static void shutdown() throws Exception { + Exception failure = null; + if (cdaInstance != null) { + try { + cdaInstance.stop(); + } catch (Exception e) { + failure = e; + } finally { + cdaInstance = null; + } + } + + if (cwmsDb != null) { + try { + cwmsDb.stop(); + } catch (Exception e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } finally { + cwmsDb = null; + } + } + + webUser = null; + + if (failure != null) { + throw failure; + } + } + private String loadResourceAsString(String fileName) { try { return IOUtils.toString( diff --git a/cwms-data-api/src/test/java/fixtures/KeyCloakExtension.java b/cwms-data-api/src/test/java/fixtures/KeyCloakExtension.java index 4949b27186..70a0bd2232 100644 --- a/cwms-data-api/src/test/java/fixtures/KeyCloakExtension.java +++ b/cwms-data-api/src/test/java/fixtures/KeyCloakExtension.java @@ -121,6 +121,16 @@ public static String getCodeUrl() { public static String getTokenUrl() { return tokenUrl; } + + public static void shutdown() { + if (kcc.isRunning()) { + kcc.stop(); + } + authUrl = null; + issuer = null; + codeUrl = null; + tokenUrl = null; + } /** * Retrieve the Access token for the user. diff --git a/cwms-data-api/src/test/java/fixtures/MinIOExtension.java b/cwms-data-api/src/test/java/fixtures/MinIOExtension.java index 15dce3f721..8eeacb4455 100644 --- a/cwms-data-api/src/test/java/fixtures/MinIOExtension.java +++ b/cwms-data-api/src/test/java/fixtures/MinIOExtension.java @@ -52,5 +52,11 @@ private static void createTestBucket() { } } + public static void shutdown() { + if (MINIO_CONTAINER.isRunning()) { + MINIO_CONTAINER.stop(); + } + } + } diff --git a/cwms-data-api/src/test/java/helpers/TimeSeriesReadBenchmark.java b/cwms-data-api/src/test/java/helpers/TimeSeriesReadBenchmark.java new file mode 100644 index 0000000000..b86ca3a50a --- /dev/null +++ b/cwms-data-api/src/test/java/helpers/TimeSeriesReadBenchmark.java @@ -0,0 +1,705 @@ +package helpers; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import fixtures.CwmsDataApiSetupCallback; +import fixtures.KeyCloakExtension; +import fixtures.MinIOExtension; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import mil.army.usace.hec.test.database.CwmsDatabaseContainer; +import org.jooq.impl.DSL; +import usace.cwms.db.jooq.codegen.packages.CWMS_TS_PACKAGE; + +public final class TimeSeriesReadBenchmark { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final DateTimeFormatter REQUEST_TIME_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC); + private static final DateTimeFormatter ORACLE_DATE_TIME_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC); + private static final String ACCEPT_JSON_V2 = "application/json;version=2"; + private static final String NON_VERSIONED_DATE_SQL = "date '1111-11-11'"; + + private TimeSeriesReadBenchmark() { + } + + public static void main(String[] args) throws Exception { + BenchmarkConfig config = BenchmarkConfig.fromSystemProperties(); + System.out.println("Starting benchmark fixtures..."); + + try { + new KeyCloakExtension().beforeAll(null); + new MinIOExtension().beforeAll(null); + new CwmsDataApiSetupCallback().beforeAll(null); + + System.out.println("Running benchmark..."); + BenchmarkReport report = runBenchmark(config); + + Files.createDirectories(config.resultsDir); + Path resultFile = config.resultsDir.resolve("timeseries-read-benchmark-" + + DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(ZoneOffset.UTC).format(Instant.now()) + + ".json"); + + OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValue(resultFile.toFile(), report); + OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValue(System.out, report); + System.out.println(); + System.out.println("Benchmark report written to " + resultFile); + + for (BenchmarkRun run: report.runs) { + if (run.httpCode != 200) { + throw new IllegalStateException( + "Benchmark completed with HTTP failures. Results saved to " + resultFile); + } + } + } finally { + System.out.println("Shutting down benchmark fixtures..."); + shutdownFixtures(); + } + } + + private static BenchmarkReport runBenchmark(BenchmarkConfig config) throws Exception { + Files.createDirectories(config.resultsDir); + Files.createDirectories(config.responsesDir); + + SeedInfo seed = ensureBenchmarkSeed(config); + if (seed.pointCount != config.pointCount) { + throw new IllegalStateException("Expected " + config.pointCount + " seeded points but found " + + seed.pointCount); + } + + waitForCdaReady(config); + if (config.warmup) { + Path warmupFile = config.responsesDir.resolve("warmup.json"); + executeRequest(config, warmupFile); + if (!config.keepResponses) { + Files.deleteIfExists(warmupFile); + } + } + + List runs = new ArrayList<>(); + for (int runIndex = 1; runIndex <= config.runs; runIndex++) { + runs.add(executeRun(config, runIndex)); + } + + return new BenchmarkReport( + "timeseries-read", + Instant.now().toString(), + resolveGitValue("git", "branch", "--show-current"), + resolveGitValue("git", "rev-parse", "HEAD"), + config.office, + config.locationId, + config.seriesId, + config.units, + config.startTime.toString(), + config.endTime.toString(), + config.pointCount, + config.pageSize, + config.requestUrl().toString(), + seed, + BenchmarkSummary.fromRuns(runs), + runs + ); + } + + private static SeedInfo ensureBenchmarkSeed(BenchmarkConfig config) throws SQLException { + long existingCount = getSeededPointCount(config); + if (config.skipSeed) { + return new SeedInfo(false, existingCount); + } + if (!config.forceReseed && existingCount == config.pointCount) { + return new SeedInfo(false, existingCount); + } + + CwmsDatabaseContainer database = CwmsDataApiSetupCallback.getDatabaseLink(); + database.connection(connection -> { + try { + ensureLocationExists(connection, config); + ensureTimeSeriesExists(connection, config); + CWMS_TS_PACKAGE.call_SET_TSID_VERSIONED( + DSL.using(connection).configuration(), config.seriesId, "F", config.office); + + long tsCode = findTsCode(connection, config.office, config.seriesId); + List segments = buildYearSegments(config.startTime, config.pointCount); + clearSeededRows(connection, tsCode, segments); + insertSeededRows(connection, tsCode, segments); + updateTsExtents(connection, tsCode); + if (!connection.getAutoCommit()) { + connection.commit(); + } + } catch (SQLException e) { + throw new RuntimeException("Unable to seed benchmark series " + config.seriesId, e); + } + }, "cwms_20"); + + return new SeedInfo(true, getSeededPointCount(config)); + } + + private static void ensureLocationExists(Connection connection, BenchmarkConfig config) throws SQLException { + String sql = "declare " + + "location_exists exception; " + + "pragma exception_init(location_exists, -20026); " + + "begin " + + "cwms_loc.create_location(?, ?, null, null, null, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); " + + "exception when location_exists then null; " + + "end;"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setString(1, config.locationId); + statement.setString(2, "SITE"); + statement.setDouble(3, 38.0d); + statement.setDouble(4, -90.0d); + statement.setString(5, "NAD83"); + statement.setString(6, config.locationId); + statement.setString(7, config.locationId + " Benchmark Location"); + statement.setString(8, "Performance benchmark location"); + statement.setString(9, "UTC"); + statement.setString(10, null); + statement.setString(11, null); + statement.setString(12, "T"); + statement.setString(13, config.office); + statement.execute(); + } + } + + private static void ensureTimeSeriesExists(Connection connection, BenchmarkConfig config) throws SQLException { + String sql = "declare " + + "ts_exists exception; " + + "pragma exception_init(ts_exists, -20003); " + + "begin " + + "cwms_ts.create_ts(?, ?, 0); " + + "exception when ts_exists then null; " + + "end;"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setString(1, config.office); + statement.setString(2, config.seriesId); + statement.execute(); + } + } + + private static long findTsCode(Connection connection, String office, String seriesId) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement( + "select ts_code from at_cwms_ts_id where db_office_id = ? and cwms_ts_id = ?")) { + statement.setString(1, office); + statement.setString(2, seriesId); + try (ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + throw new IllegalStateException("Unable to find ts_code for " + seriesId); + } + return resultSet.getLong(1); + } + } + } + + private static void clearSeededRows(Connection connection, long tsCode, List segments) throws SQLException { + for (YearSegment segment : segments) { + try (PreparedStatement statement = connection.prepareStatement( + "delete from at_tsv_" + segment.year + " where ts_code = ?")) { + statement.setLong(1, tsCode); + statement.executeUpdate(); + } + } + try (PreparedStatement statement = connection.prepareStatement( + "delete from at_ts_extents where ts_code = ?")) { + statement.setLong(1, tsCode); + statement.executeUpdate(); + } + } + + private static void insertSeededRows(Connection connection, long tsCode, List segments) throws SQLException { + for (YearSegment segment : segments) { + String sql = "insert /*+ APPEND */ into at_tsv_" + segment.year + + " (ts_code, date_time, version_date, data_entry_date, value, quality_code, dest_flag) " + + "select ?, to_date(?, 'yyyy-mm-dd hh24:mi:ss') + numtodsinterval(level - 1, 'MINUTE'), " + + NON_VERSIONED_DATE_SQL + ", systimestamp, ? + level - 1, 0, 0 " + + "from dual connect by level <= ?"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setLong(1, tsCode); + statement.setString(2, ORACLE_DATE_TIME_FORMAT.format(segment.startTime)); + statement.setLong(3, segment.valueStart); + statement.setInt(4, segment.count); + statement.executeUpdate(); + } + } + } + + private static void updateTsExtents(Connection connection, long tsCode) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement( + "begin cwms_ts.update_ts_extents(?, " + NON_VERSIONED_DATE_SQL + "); end;")) { + statement.setLong(1, tsCode); + statement.execute(); + } + } + + private static long getSeededPointCount(BenchmarkConfig config) throws SQLException { + CwmsDatabaseContainer database = CwmsDataApiSetupCallback.getDatabaseLink(); + return database.connection(connection -> { + try (PreparedStatement statement = connection.prepareStatement( + "select count(*) from av_tsv v " + + "join at_cwms_ts_id t on t.ts_code = v.ts_code " + + "where t.db_office_id = ? and t.cwms_ts_id = ?")) { + statement.setString(1, config.office); + statement.setString(2, config.seriesId); + try (ResultSet resultSet = statement.executeQuery()) { + resultSet.next(); + return resultSet.getLong(1); + } + } catch (SQLException e) { + throw new RuntimeException("Unable to count seeded rows for " + config.seriesId, e); + } + }, "cwms_20"); + } + + private static List buildYearSegments(Instant startTime, int pointCount) { + List segments = new ArrayList<>(); + Instant cursor = startTime; + int remaining = pointCount; + long valueStart = 1L; + while (remaining > 0) { + Instant nextYear = cursor.atOffset(ZoneOffset.UTC) + .withDayOfYear(1) + .withHour(0) + .withMinute(0) + .withSecond(0) + .withNano(0) + .plusYears(1) + .toInstant(); + long minutesUntilNextYear = Math.max(1L, Duration.between(cursor, nextYear).toMinutes()); + int segmentCount = (int) Math.min(remaining, minutesUntilNextYear); + segments.add(new YearSegment(cursor.atOffset(ZoneOffset.UTC).getYear(), cursor, segmentCount, valueStart)); + cursor = cursor.plusSeconds(segmentCount * 60L); + valueStart += segmentCount; + remaining -= segmentCount; + } + return segments; + } + + private static void waitForCdaReady(BenchmarkConfig config) throws Exception { + HttpClient client = HttpClient.newHttpClient(); + URI readinessUri = URI.create(config.resolvedBaseUrl() + "/offices/" + urlEncode(config.office)); + for (int attempt = 0; attempt < 30; attempt++) { + HttpRequest request = HttpRequest.newBuilder(readinessUri) + .header("Accept", ACCEPT_JSON_V2) + .GET() + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofInputStream()); + try (InputStream ignored = response.body()) { + if (response.statusCode() == 200) { + return; + } + } + Thread.sleep(1000L); + } + throw new IllegalStateException("CDA did not become ready at " + readinessUri); + } + + private static BenchmarkRun executeRun(BenchmarkConfig config, int runIndex) throws Exception { + Path responseFile = config.responsesDir.resolve("timeseries-read-run-" + runIndex + ".json"); + RequestResult requestResult = executeRequest(config, responseFile); + ResponseSummary responseSummary = summarizeResponse(responseFile); + String responseFileValue = responseFile.toAbsolutePath().toString(); + if (!config.keepResponses && requestResult.httpCode == 200) { + Files.deleteIfExists(responseFile); + responseFileValue = null; + } + return new BenchmarkRun( + runIndex, + requestResult.httpCode, + roundSeconds(requestResult.timeTotalNanos), + responseSummary.responseBytes, + responseSummary.reportedTotal, + responseSummary.reportedPageSize, + responseSummary.firstTimestamp, + responseSummary.lastTimestamp, + requestResult.httpCode == 200 ? null : Files.readString(responseFile), + responseFileValue + ); + } + + private static RequestResult executeRequest(BenchmarkConfig config, Path responseFile) throws Exception { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder(config.requestUrl()) + .header("Accept", ACCEPT_JSON_V2) + .GET() + .build(); + long startNanos = System.nanoTime(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofFile(responseFile)); + long endNanos = System.nanoTime(); + return new RequestResult(response.statusCode(), endNanos - startNanos); + } + + private static ResponseSummary summarizeResponse(Path responseFile) throws IOException { + Integer reportedTotal = null; + Integer reportedPageSize = null; + Long firstTimestamp = null; + Long lastTimestamp = null; + + try (InputStream inputStream = Files.newInputStream(responseFile); + JsonParser parser = JSON_FACTORY.createParser(inputStream)) { + while (parser.nextToken() != null) { + if (parser.currentToken() != JsonToken.FIELD_NAME) { + continue; + } + String fieldName = parser.currentName(); + JsonToken valueToken = parser.nextToken(); + if ("total".equals(fieldName) && valueToken != JsonToken.VALUE_NULL) { + reportedTotal = parser.getIntValue(); + } else if ("page-size".equals(fieldName) && valueToken != JsonToken.VALUE_NULL) { + reportedPageSize = parser.getIntValue(); + } else if ("values".equals(fieldName) && valueToken == JsonToken.START_ARRAY) { + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.START_ARRAY) { + parser.skipChildren(); + continue; + } + parser.nextToken(); + long timestamp = parser.getLongValue(); + if (firstTimestamp == null) { + firstTimestamp = timestamp; + } + lastTimestamp = timestamp; + while (parser.nextToken() != JsonToken.END_ARRAY) { + parser.skipChildren(); + } + } + } else { + parser.skipChildren(); + } + } + } + + return new ResponseSummary( + Files.size(responseFile), + reportedTotal, + reportedPageSize, + firstTimestamp, + lastTimestamp + ); + } + + private static String resolveGitValue(String... command) { + ProcessBuilder processBuilder = new ProcessBuilder(command); + processBuilder.redirectErrorStream(true); + try { + Process process = processBuilder.start(); + byte[] outputBytes = process.getInputStream().readAllBytes(); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return null; + } + String value = new String(outputBytes, StandardCharsets.UTF_8).trim(); + return value.isEmpty() ? null : value; + } catch (IOException e) { + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + private static double roundSeconds(long nanos) { + return Math.round((nanos / 1_000_000_000.0d) * 1_000_000.0d) / 1_000_000.0d; + } + + private static String urlEncode(String value) { + return URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private static void shutdownFixtures() throws Exception { + Exception failure = null; + try { + CwmsDataApiSetupCallback.shutdown(); + } catch (Exception e) { + failure = e; + } + + try { + MinIOExtension.shutdown(); + } catch (Exception e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } + + try { + KeyCloakExtension.shutdown(); + } catch (Exception e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } + + if (failure != null) { + throw failure; + } + } + + private static final class BenchmarkConfig { + private final String office; + private final String locationId; + private final String seriesId; + private final String units; + private final String baseUrl; + private final Instant startTime; + private final Instant endTime; + private final int pointCount; + private final int pageSize; + private final int runs; + private final boolean warmup; + private final boolean skipSeed; + private final boolean forceReseed; + private final boolean keepResponses; + private final Path resultsDir; + private final Path responsesDir; + + private BenchmarkConfig(String office, String locationId, String seriesId, String units, String baseUrl, + Instant startTime, int pointCount, int pageSize, int runs, boolean warmup, + boolean skipSeed, boolean forceReseed, boolean keepResponses, Path resultsDir, + Path responsesDir) { + this.office = office; + this.locationId = locationId; + this.seriesId = seriesId; + this.units = units; + this.baseUrl = baseUrl; + this.startTime = startTime; + this.endTime = startTime.plusSeconds(Math.max(0L, pointCount - 1L) * 60L); + this.pointCount = pointCount; + this.pageSize = pageSize; + this.runs = runs; + this.warmup = warmup; + this.skipSeed = skipSeed; + this.forceReseed = forceReseed; + this.keepResponses = keepResponses; + this.resultsDir = resultsDir; + this.responsesDir = responsesDir; + } + + private static BenchmarkConfig fromSystemProperties() { + String office = System.getProperty("benchmark.office", "SPK"); + String locationId = System.getProperty("benchmark.locationId", "PERF1MREAD"); + String seriesId = System.getProperty("benchmark.seriesId", "PERF1MREAD.Stage.Inst.1Minute.0.BENCH"); + String units = System.getProperty("benchmark.units", "ft"); + String baseUrl = System.getProperty("benchmark.baseUrl"); + Instant startTime = Instant.parse(System.getProperty("benchmark.startTime", "2024-01-01T00:00:00Z")); + int pointCount = Integer.parseInt(System.getProperty("benchmark.pointCount", "1000000")); + int pageSize = Integer.parseInt(System.getProperty("benchmark.pageSize", String.valueOf(pointCount))); + int runs = Integer.parseInt(System.getProperty("benchmark.runs", "1")); + boolean warmup = Boolean.parseBoolean(System.getProperty("benchmark.warmup", "false")); + boolean skipSeed = Boolean.parseBoolean(System.getProperty("benchmark.skipSeed", "false")); + boolean forceReseed = Boolean.parseBoolean(System.getProperty("benchmark.forceReseed", "false")); + boolean keepResponses = Boolean.parseBoolean(System.getProperty("benchmark.keepResponses", "false")); + Path resultsDir = Paths.get(System.getProperty("benchmark.resultsDir", + "..\\load_data\\performance\\results")).normalize().toAbsolutePath(); + Path responsesDir = Paths.get(System.getProperty("benchmark.responsesDir", + "..\\load_data\\performance\\responses")).normalize().toAbsolutePath(); + return new BenchmarkConfig(office, locationId, seriesId, units, baseUrl, startTime, pointCount, + pageSize, runs, warmup, skipSeed, forceReseed, keepResponses, resultsDir, responsesDir); + } + + private URI requestUrl() { + StringBuilder builder = new StringBuilder(resolvedBaseUrl()); + builder.append("/timeseries?office=").append(urlEncode(office)); + builder.append("&name=").append(urlEncode(seriesId)); + builder.append("&units=").append(urlEncode(units)); + builder.append("&begin=").append(urlEncode(REQUEST_TIME_FORMAT.format(startTime))); + builder.append("&end=").append(urlEncode(REQUEST_TIME_FORMAT.format(endTime))); + builder.append("&page-size=").append(pageSize); + return URI.create(builder.toString()); + } + + private String resolvedBaseUrl() { + if (baseUrl != null && !baseUrl.isBlank()) { + return baseUrl; + } + return CwmsDataApiSetupCallback.httpUrl() + ":" + CwmsDataApiSetupCallback.httpPort() + + System.getProperty("warContext"); + } + } + + private static final class YearSegment { + private final int year; + private final Instant startTime; + private final int count; + private final long valueStart; + + private YearSegment(int year, Instant startTime, int count, long valueStart) { + this.year = year; + this.startTime = startTime; + this.count = count; + this.valueStart = valueStart; + } + } + + private static final class RequestResult { + private final int httpCode; + private final long timeTotalNanos; + + private RequestResult(int httpCode, long timeTotalNanos) { + this.httpCode = httpCode; + this.timeTotalNanos = timeTotalNanos; + } + } + + private static final class ResponseSummary { + private final long responseBytes; + private final Integer reportedTotal; + private final Integer reportedPageSize; + private final Long firstTimestamp; + private final Long lastTimestamp; + + private ResponseSummary(long responseBytes, Integer reportedTotal, Integer reportedPageSize, + Long firstTimestamp, Long lastTimestamp) { + this.responseBytes = responseBytes; + this.reportedTotal = reportedTotal; + this.reportedPageSize = reportedPageSize; + this.firstTimestamp = firstTimestamp; + this.lastTimestamp = lastTimestamp; + } + } + + public static final class SeedInfo { + public final boolean seeded; + public final long pointCount; + + private SeedInfo(boolean seeded, long pointCount) { + this.seeded = seeded; + this.pointCount = pointCount; + } + } + + public static final class BenchmarkSummary { + public final int successfulRuns; + public final Double averageTimeTotalSeconds; + public final Double minTimeTotalSeconds; + public final Double maxTimeTotalSeconds; + + private BenchmarkSummary(int successfulRuns, Double averageTimeTotalSeconds, + Double minTimeTotalSeconds, Double maxTimeTotalSeconds) { + this.successfulRuns = successfulRuns; + this.averageTimeTotalSeconds = averageTimeTotalSeconds; + this.minTimeTotalSeconds = minTimeTotalSeconds; + this.maxTimeTotalSeconds = maxTimeTotalSeconds; + } + + private static BenchmarkSummary fromRuns(List runs) { + List successfulRuns = new ArrayList<>(); + for (BenchmarkRun run : runs) { + if (run.httpCode == 200) { + successfulRuns.add(run); + } + } + if (successfulRuns.isEmpty()) { + return new BenchmarkSummary(0, null, null, null); + } + + double total = 0.0d; + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + for (BenchmarkRun run : successfulRuns) { + total += run.timeTotalSeconds; + min = Math.min(min, run.timeTotalSeconds); + max = Math.max(max, run.timeTotalSeconds); + } + return new BenchmarkSummary( + successfulRuns.size(), + Math.round((total / successfulRuns.size()) * 1_000_000.0d) / 1_000_000.0d, + Math.round(min * 1_000_000.0d) / 1_000_000.0d, + Math.round(max * 1_000_000.0d) / 1_000_000.0d + ); + } + } + + public static final class BenchmarkRun { + public final int run; + public final int httpCode; + public final double timeTotalSeconds; + public final long responseBytesOnDisk; + public final Integer reportedTotal; + public final Integer reportedPageSize; + public final Long firstTimestamp; + public final Long lastTimestamp; + public final String errorBody; + public final String responseFile; + + private BenchmarkRun(int run, int httpCode, double timeTotalSeconds, long responseBytesOnDisk, + Integer reportedTotal, Integer reportedPageSize, Long firstTimestamp, + Long lastTimestamp, String errorBody, String responseFile) { + this.run = run; + this.httpCode = httpCode; + this.timeTotalSeconds = timeTotalSeconds; + this.responseBytesOnDisk = responseBytesOnDisk; + this.reportedTotal = reportedTotal; + this.reportedPageSize = reportedPageSize; + this.firstTimestamp = firstTimestamp; + this.lastTimestamp = lastTimestamp; + this.errorBody = errorBody; + this.responseFile = responseFile; + } + } + + public static final class BenchmarkReport { + public final String benchmark; + public final String generatedAt; + public final String gitBranch; + public final String gitCommit; + public final String office; + public final String locationId; + public final String seriesId; + public final String units; + public final String startTimeUtc; + public final String endTimeUtc; + public final int pointCount; + public final int pageSize; + public final String requestUrl; + public final SeedInfo seed; + public final BenchmarkSummary summary; + public final List runs; + + private BenchmarkReport(String benchmark, String generatedAt, String gitBranch, String gitCommit, + String office, String locationId, String seriesId, String units, + String startTimeUtc, String endTimeUtc, int pointCount, int pageSize, + String requestUrl, SeedInfo seed, BenchmarkSummary summary, + List runs) { + this.benchmark = benchmark; + this.generatedAt = generatedAt; + this.gitBranch = gitBranch; + this.gitCommit = gitCommit; + this.office = office; + this.locationId = locationId; + this.seriesId = seriesId; + this.units = units; + this.startTimeUtc = startTimeUtc; + this.endTimeUtc = endTimeUtc; + this.pointCount = pointCount; + this.pageSize = pageSize; + this.requestUrl = requestUrl; + this.seed = seed; + this.summary = summary; + this.runs = runs; + } + } +} diff --git a/load_data/performance/.gitignore b/load_data/performance/.gitignore new file mode 100644 index 0000000000..ddbb6df966 --- /dev/null +++ b/load_data/performance/.gitignore @@ -0,0 +1,2 @@ +results/ +responses/