diff --git a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl index 4232b4f3cc1321..e52d2513d98d64 100644 --- a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl +++ b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl @@ -19,13 +19,14 @@ export SPARK_MASTER_HOST=doris--spark-iceberg # wait iceberg-rest start -while [[ ! $(curl -s --fail http://rest:8181/v1/config) ]]; do +while ! curl -s --fail http://rest:8181/v1/config >/dev/null; do sleep 1 done set -ex mkdir -p /opt/spark/events +SPARK_THRIFT_EXTENSIONS="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions" for f in /opt/spark/sbin/*; do ln -s $f /usr/local/bin/$(basename $f) @@ -39,7 +40,6 @@ done start-master.sh -p 7077 start-worker.sh spark://doris--spark-iceberg:7077 start-history-server.sh -start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby" # The creation of a Spark SQL client is time-consuming, # and reopening a new client for each SQL file execution leads to significant overhead. @@ -68,6 +68,39 @@ END_TIME3=$(date +%s) EXECUTION_TIME3=$((END_TIME3 - START_TIME3)) echo "Script iceberg load total: {} executed in $EXECUTION_TIME3 seconds" +spark-sql \ + --master spark://doris--spark-iceberg:7077 \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + -e "CREATE DATABASE IF NOT EXISTS demo.default" + +start-thriftserver.sh \ + --master spark://doris--spark-iceberg:7077 \ + --conf "spark.sql.extensions=${SPARK_THRIFT_EXTENSIONS}" \ + --conf spark.dynamicAllocation.enabled=false \ + --conf spark.cores.max=8 \ + --conf spark.executor.cores=4 \ + --conf spark.executor.memory=8g \ + --conf spark.driver.memory=4g \ + --conf spark.sql.shuffle.partitions=16 \ + --conf spark.default.parallelism=16 \ + --driver-java-options "-Dderby.system.home=/tmp/derby" + +SPARK_THRIFT_READY_ATTEMPTS=0 +while ! beeline \ + -u "jdbc:hive2://localhost:10000/default" \ + -n hadoop \ + -p hadoop \ + -e "SELECT 1" >/tmp/spark-thriftserver-ready.log 2>&1; do + SPARK_THRIFT_READY_ATTEMPTS=$((SPARK_THRIFT_READY_ATTEMPTS + 1)) + if [ "${SPARK_THRIFT_READY_ATTEMPTS}" -ge 120 ]; then + echo "ERROR: Spark thriftserver did not become ready after ${SPARK_THRIFT_READY_ATTEMPTS} attempts" >&2 + cat /tmp/spark-thriftserver-ready.log >&2 || true + tail -n 200 /opt/spark/logs/*HiveThriftServer2*.out >&2 || true + exit 1 + fi + sleep 1 +done + touch /mnt/SUCCESS; tail -f /dev/null diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.env b/docker/thirdparties/docker-compose/iceberg/iceberg.env index 6bebd49f437d80..0950783075cf21 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.env +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.env @@ -19,6 +19,7 @@ NOTEBOOK_SERVER_PORT=8888 SPARK_DRIVER_UI_PORT=8080 SPARK_HISTORY_UI_PORT=10000 +SPARK_THRIFT_PORT=11000 REST_CATALOG_PORT=18181 MINIO_UI_PORT=9000 MINIO_API_PORT=19001 diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index 0c4e8e1cc65027..9b1704a7891028 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -41,6 +41,8 @@ services: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 + ports: + - ${SPARK_THRIFT_PORT}:10000 entrypoint: /bin/sh /mnt/scripts/entrypoint.sh user: root networks: diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql index eb60255a08e965..026bd8aab72f2d 100644 --- a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql @@ -228,4 +228,4 @@ VALUES (1, NULL, 100.0), (2, 'NULL', 200.0), (3, '\\N', 300.0), (4, 'null', 400.0), - (5, 'A', 500.0); \ No newline at end of file + (5, 'A', 500.0); diff --git a/docker/thirdparties/docker-compose/iceberg/spark-defaults.conf b/docker/thirdparties/docker-compose/iceberg/spark-defaults.conf index 8336a2afcf8aba..f05bf40726f877 100644 --- a/docker/thirdparties/docker-compose/iceberg/spark-defaults.conf +++ b/docker/thirdparties/docker-compose/iceberg/spark-defaults.conf @@ -20,6 +20,7 @@ # Example: spark.sql.session.timeZone Asia/Shanghai + spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.demo.type rest spark.sql.catalog.demo.uri http://rest:8181 @@ -42,4 +43,4 @@ spark.sql.catalog.paimon.warehouse s3://warehouse/wh spark.sql.catalog.paimon.s3.endpoint http://minio:9000 spark.sql.catalog.paimon.s3.access-key admin spark.sql.catalog.paimon.s3.secret-key password -spark.sql.catalog.paimon.s3.region us-east-1 \ No newline at end of file +spark.sql.catalog.paimon.s3.region us-east-1 diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 0dc8f9d17371a5..9ac21f91e9c413 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -111,7 +111,6 @@ class Suite implements GroovyInterceptable { private AmazonS3 s3Client = null private FileSystem fs = null - private String sparkIcebergContainerNameCache = null Suite(String name, String group, SuiteContext context, SuiteCluster cluster) { this.name = name @@ -1618,80 +1617,41 @@ class Suite implements GroovyInterceptable { return result } - /** - * Get the spark-iceberg container name by querying docker. - * Uses 'docker ps --filter name=spark-iceberg' to find the container. - */ - private String getSparkIcebergContainerName() { - if (!Strings.isNullOrEmpty(sparkIcebergContainerNameCache)) { - return sparkIcebergContainerNameCache - } - - try { - // Use docker ps with filter to find containers with 'spark-iceberg' in the name - String command = "docker ps --filter name=spark-iceberg --format {{.Names}}" - def process = command.execute() - process.waitFor() - String output = process.in.text.trim() + private List> spark_sql(String sqlStr, boolean isOrder = false) { + String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") + logger.info("Execute Spark JDBC SQL: ${cleanedSqlStr}".toString()) + logger.info("Spark JDBC URL: ${context.getSparkIcebergJdbcUrl()}".toString()) + return sql_impl(context.getSparkIcebergConnection(), cleanedSqlStr, isOrder) + } - if (output) { - // Get the first matching container - String containerName = output.split('\n')[0].trim() - if (containerName) { - sparkIcebergContainerNameCache = containerName - logger.info("Found spark-iceberg container: ${containerName}".toString()) - return containerName - } - } + private List spark_sql_multi(Object sqlStatements, boolean isOrder = false) { + def statements = sqlStatements.toString().split(';').collect { it.trim() }.findAll { it } - logger.warn("No spark-iceberg container found via docker ps") - return null - } catch (Exception e) { - logger.warn("Failed to get spark-iceberg container via docker ps: ${e.message}".toString()) - return null + if (statements.isEmpty()) { + return [] } + + logger.info("Execute Spark JDBC SQL statements via ${context.getSparkIcebergJdbcUrl()}: ${statements}".toString()) + Connection sparkConn = context.getSparkIcebergConnection() + return statements.collect { statement -> sql_impl(sparkConn, statement, isOrder) } } /** - * Execute Spark SQL on the spark-iceberg container via docker exec. + * Execute Spark SQL on the Spark ThriftServer via Hive JDBC. * * Usage in test suite: * spark_iceberg "CREATE TABLE demo.test_db.t1 (id INT) USING iceberg" * spark_iceberg "INSERT INTO demo.test_db.t1 VALUES (1)" * def result = spark_iceberg "SELECT * FROM demo.test_db.t1" - * - * The container name is found by querying 'docker ps --filter name=spark-iceberg' */ - String spark_iceberg(String sqlStr, int timeoutSeconds = 120) { - String containerName = getSparkIcebergContainerName() - if (containerName == null) { - throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.") - } - String masterUrl = "spark://${containerName}:7077" - - // Escape double quotes in SQL string for shell command - String escapedSql = sqlStr.replaceAll('"', '\\\\"') - - // Build docker exec command - String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -e "${escapedSql}" """ - - logger.info("Executing Spark Iceberg SQL: ${sqlStr}".toString()) - logger.info("Container: ${containerName}".toString()) - - try { - String result = cmd(command, timeoutSeconds) - logger.info("Spark Iceberg SQL result: ${result}".toString()) - return result - } catch (Exception e) { - logger.error("Spark Iceberg SQL failed: ${e.message}".toString()) - throw e - } + List> spark_iceberg(String sqlStr, boolean isOrder = false) { + return spark_sql(sqlStr, isOrder) } /** - * Execute multiple Spark SQL statements on the spark-iceberg container. + * Execute multiple Spark SQL statements on the Spark ThriftServer via Hive JDBC. * Statements are separated by semicolons. - * All statements are executed in one spark-sql process to reduce startup overhead. + * All statements are executed on one JDBC connection to reduce startup overhead. * * Usage: * spark_iceberg_multi ''' @@ -1700,46 +1660,36 @@ class Suite implements GroovyInterceptable { * INSERT INTO demo.test_db.t1 VALUES (1); * ''' */ - List spark_iceberg_multi(String sqlStatements, int timeoutSeconds = 300) { - def statements = sqlStatements.split(';').collect { it.trim() }.findAll { it } - - if (statements.isEmpty()) { - return [] - } - - String combinedSql = statements.collect { "${it};" }.join(" ") - return [spark_iceberg(combinedSql, timeoutSeconds)] + List spark_iceberg_multi(Object sqlStatements, boolean isOrder = false) { + return spark_sql_multi(sqlStatements, isOrder) } /** - * Execute Spark SQL on the spark-iceberg container with Paimon extensions enabled. + * Execute Spark SQL with the Paimon catalog on the Spark ThriftServer via Hive JDBC. * * Usage in test suite: * spark_paimon "CREATE TABLE paimon.test_db.t1 (id INT) USING paimon" * spark_paimon "INSERT INTO paimon.test_db.t1 VALUES (1)" * def result = spark_paimon "SELECT * FROM paimon.test_db.t1" */ - String spark_paimon(String sqlStr, int timeoutSeconds = 120) { - String containerName = getSparkIcebergContainerName() - if (containerName == null) { - throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.") - } - String masterUrl = "spark://${containerName}:7077" - - String escapedSql = sqlStr.replaceAll('"', '\\\\"') - String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -e "${escapedSql}" """ - - logger.info("Executing Spark Paimon SQL: ${sqlStr}".toString()) - logger.info("Container: ${containerName}".toString()) + List> spark_paimon(String sqlStr, boolean isOrder = false) { + return spark_sql(sqlStr, isOrder) + } - try { - String result = cmd(command, timeoutSeconds) - logger.info("Spark Paimon SQL result: ${result}".toString()) - return result - } catch (Exception e) { - logger.error("Spark Paimon SQL failed: ${e.message}".toString()) - throw e - } + /** + * Execute multiple Spark SQL statements with the Paimon catalog on the Spark ThriftServer via Hive JDBC. + * Statements are separated by semicolons. + * All statements are executed on one JDBC connection to reduce startup overhead. + * + * Usage: + * spark_paimon_multi ''' + * CREATE DATABASE IF NOT EXISTS paimon.test_db; + * CREATE TABLE paimon.test_db.t1 (id INT) USING paimon; + * INSERT INTO paimon.test_db.t1 VALUES (1); + * ''' + */ + List spark_paimon_multi(Object sqlStatements, boolean isOrder = false) { + return spark_sql_multi(sqlStatements, isOrder) } List> db2_docker(String sqlStr, boolean isOrder = false) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index 0d599aed817ae6..fcd59cb3a7337e 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -53,6 +53,7 @@ class SuiteContext implements Closeable { public final ThreadLocal threadHive2DockerConn = new ThreadLocal<>() public final ThreadLocal threadHive3DockerConn = new ThreadLocal<>() public final ThreadLocal threadHiveRemoteConn = new ThreadLocal<>() + public final ThreadLocal threadSparkIcebergConn = new ThreadLocal<>() public final ThreadLocal threadDB2DockerConn = new ThreadLocal<>() private final ThreadLocal syncer = new ThreadLocal<>() public final Config config @@ -239,6 +240,15 @@ class SuiteContext implements Closeable { return threadConn } + Connection getSparkIcebergConnection() { + def threadConn = threadSparkIcebergConn.get() + if (threadConn == null) { + threadConn = getConnectionBySparkIcebergConfig() + threadSparkIcebergConn.set(threadConn) + } + return threadConn + } + Connection getDB2DockerConnection() { def threadConn = threadDB2DockerConn.get() if (threadConn == null) { @@ -314,6 +324,21 @@ class SuiteContext implements Closeable { return DriverManager.getConnection(hiveJdbcUrl, hiveJdbcUser, hiveJdbcPassword) } + Connection getConnectionBySparkIcebergConfig() { + Class.forName("org.apache.hive.jdbc.HiveDriver"); + String sparkJdbcUser = "hadoop" + String sparkJdbcPassword = "hadoop" + String sparkJdbcUrl = getSparkIcebergJdbcUrl() + log.info("Create Spark Iceberg JDBC connection to ${sparkJdbcUrl}".toString()) + return DriverManager.getConnection(sparkJdbcUrl, sparkJdbcUser, sparkJdbcPassword) + } + + String getSparkIcebergJdbcUrl() { + String sparkHost = config.otherConfigs.get("externalEnvIp") + String sparkPort = config.otherConfigs.get("iceberg_spark_thrift_port") ?: "11000" + return "jdbc:hive2://${sparkHost}:${sparkPort}/default" + } + Connection getConnectionByDB2DockerConfig() { Class.forName("com.ibm.db2.jcc.DB2Driver"); String db2Host = config.otherConfigs.get("externalEnvIp") @@ -616,6 +641,16 @@ class SuiteContext implements Closeable { log.warn("Close connection failed", t) } } + + Connection spark_iceberg_conn = threadSparkIcebergConn.get() + if (spark_iceberg_conn != null) { + threadSparkIcebergConn.remove() + try { + spark_iceberg_conn.close() + } catch (Throwable t) { + log.warn("Close connection failed", t) + } + } } diff --git a/regression-test/suites/external_table_p2/iceberg/test_iceberg_spark_doris_consistency_demo.groovy b/regression-test/suites/external_table_p2/iceberg/test_iceberg_spark_doris_consistency_demo.groovy new file mode 100644 index 00000000000000..052b7be513aaca --- /dev/null +++ b/regression-test/suites/external_table_p2/iceberg/test_iceberg_spark_doris_consistency_demo.groovy @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_spark_doris_consistency_demo", "p2,external,iceberg,external_docker,external_docker_iceberg") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalogName = "test_iceberg_spark_doris_consistency_demo" + String dbName = "iceberg_spark_doris_consistency_demo_db" + String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minioPort = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + def normalizeRows = { rows -> + rows.collect { row -> + row.collect { value -> value == null ? null : value.toString() } + } + } + def expectedRows = [ + [1, "alice", 10], + [2, "bob", 20], + [3, "cindy", null], + [4, "doris", 40] + ] + def expectedAggRows = [[4L, 70L]] + + // Example: execute multiple Spark Iceberg statements in one JDBC connection. + spark_iceberg_multi """ + CREATE DATABASE IF NOT EXISTS demo.${dbName}; + DROP TABLE IF EXISTS demo.${dbName}.spark_written_iceberg_demo; + CREATE TABLE demo.${dbName}.spark_written_iceberg_demo ( + id INT, + name STRING, + score INT + ) USING iceberg; + INSERT INTO demo.${dbName}.spark_written_iceberg_demo VALUES + (1, 'alice', 10), + (2, 'bob', 20), + (3, 'cindy', NULL); + """ + + // Example: write one more Iceberg row through Spark SQL. + spark_iceberg """ + INSERT INTO demo.${dbName}.spark_written_iceberg_demo VALUES + (4, 'doris', 40); + """ + + sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${restPort}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}', + 's3.region' = 'us-east-1' + ); + """ + + sql """switch ${catalogName}""" + + def sparkRows = spark_iceberg """ + SELECT id, name, score + FROM demo.${dbName}.spark_written_iceberg_demo + ORDER BY id + """ + // Example 1: compare Spark Iceberg query result with explicit expected values. + assertEquals(expectedRows, sparkRows) + + def dorisRows = sql """ + SELECT id, name, score + FROM ${dbName}.spark_written_iceberg_demo + ORDER BY id + """ + // Example 1: compare Doris Iceberg query result with explicit expected values. + assertEquals(expectedRows, dorisRows) + + // Example 2: compare Doris and Spark query results. + assertEquals(normalizeRows(sparkRows), normalizeRows(dorisRows)) + + def sparkAggRows = spark_iceberg """ + SELECT count(*), sum(score) + FROM demo.${dbName}.spark_written_iceberg_demo + """ + // Compare Spark Iceberg aggregate result with explicit expected values. + assertEquals(expectedAggRows, sparkAggRows) + + def dorisAggRows = sql """ + SELECT count(*), sum(score) + FROM ${dbName}.spark_written_iceberg_demo + """ + // Doris and Spark JDBC may return the same aggregate value with different + // Java number classes, for example Long vs BigInteger, so normalize before comparing. + assertEquals(normalizeRows(sparkAggRows), normalizeRows(dorisAggRows)) +} diff --git a/regression-test/suites/external_table_p2/paimon/test_paimon_spark_doris_consistency_demo.groovy b/regression-test/suites/external_table_p2/paimon/test_paimon_spark_doris_consistency_demo.groovy new file mode 100644 index 00000000000000..f9023ffd73f32b --- /dev/null +++ b/regression-test/suites/external_table_p2/paimon/test_paimon_spark_doris_consistency_demo.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_spark_doris_consistency_demo", "p2,external,paimon") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable paimon test.") + return + } + + String catalogName = "test_paimon_spark_doris_consistency_demo" + String dbName = "paimon_spark_doris_consistency_demo_db" + String minioPort = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + def normalizeRows = { rows -> + rows.collect { row -> + row.collect { value -> value == null ? null : value.toString() } + } + } + def expectedRows = [ + [1, "alice", 10], + [2, "bob", 20], + [3, "cindy", null], + [4, "doris", 40] + ] + def expectedAggRows = [[4L, 70L]] + + // Example: execute multiple Spark Paimon statements in one JDBC connection. + spark_paimon_multi """ + CREATE DATABASE IF NOT EXISTS paimon.${dbName}; + DROP TABLE IF EXISTS paimon.${dbName}.spark_written_paimon_demo; + CREATE TABLE paimon.${dbName}.spark_written_paimon_demo ( + id INT, + name STRING, + score INT + ) USING paimon; + INSERT INTO paimon.${dbName}.spark_written_paimon_demo VALUES + (1, 'alice', 10), + (2, 'bob', 20), + (3, 'cindy', NULL); + """ + + // Example: write one more Paimon row through Spark SQL. + spark_paimon """ + INSERT INTO paimon.${dbName}.spark_written_paimon_demo VALUES + (4, 'doris', 40); + """ + + sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + + sql """switch ${catalogName}""" + + def sparkRows = spark_paimon """ + SELECT id, name, score + FROM paimon.${dbName}.spark_written_paimon_demo + ORDER BY id + """ + // Example 1: compare Spark Paimon query result with explicit expected values. + assertEquals(expectedRows, sparkRows) + + def dorisRows = sql """ + SELECT id, name, score + FROM ${dbName}.spark_written_paimon_demo + ORDER BY id + """ + // Example 1: compare Doris Paimon query result with explicit expected values. + assertEquals(expectedRows, dorisRows) + + // Example 2: compare Doris and Spark query results. + assertEquals(normalizeRows(sparkRows), normalizeRows(dorisRows)) + + def sparkAggRows = spark_paimon """ + SELECT count(*), sum(score) + FROM paimon.${dbName}.spark_written_paimon_demo + """ + // Compare Spark Paimon aggregate result with explicit expected values. + assertEquals(expectedAggRows, sparkAggRows) + + def dorisAggRows = sql """ + SELECT count(*), sum(score) + FROM ${dbName}.spark_written_paimon_demo + """ + // Doris and Spark JDBC may return the same aggregate value with different + // Java number classes, for example Long vs BigInteger, so normalize before comparing. + assertEquals(normalizeRows(sparkAggRows), normalizeRows(dorisAggRows)) +}