Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ public enum BuiltinFunctionName {
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
// .put("earliest", BuiltinFunctionName.EARLIEST)
// .put("latest", BuiltinFunctionName.LATEST)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.build();

Expand Down
24 changes: 24 additions & 0 deletions docs/user/ppl/cmd/eventstats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ Example::
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+


DISTINCT_COUNT, DC(Since 3.3)
------------------

Description
>>>>>>>>>>>

Usage: DISTINCT_COUNT(expr), DC(expr). Returns the approximate number of distinct values of expr using HyperLogLog++ algorithm. Both ``DISTINCT_COUNT`` and ``DC`` are equivalent and provide the same functionality.

Example::

PPL> source=accounts | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender;
fetched rows / total rows = 4/4
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | distinct_states | dc_states_alt |
|----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------|-----------------|
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | 1 |
| 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3 | 3 |
| 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3 | 3 |
| 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3 | 3 |
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+


Configuration
=============
This command requires Calcite enabled.
Expand Down Expand Up @@ -312,6 +334,8 @@ Eventstats::
source = table | where a < 50 | eventstats count(c)
source = table | eventstats min(c), max(c) by b
source = table | eventstats count(c) as count_by by b | where count_by > 1000
source = table | eventstats dc(field) as distinct_count
source = table | eventstats distinct_count(category) by region


Example 1: Calculate the average, sum and count of a field by group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ public void supportPushDownScriptOnTextField() throws IOException {
}

@Test
public void testEventstatsDistinctCountExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats dc(state) as distinct_states";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_dc.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testEventstatsDistinctCountFunctionExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats distinct_count(state) as"
+ " distinct_states by gender";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_distinct_count.json");
assertJsonEqualsIgnoreId(expected, result);
}

public void testExplainBinWithBins() throws IOException {
String expected = loadExpectedPlan("explain_bin_bins.json");
assertJsonEqualsIgnoreId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.sql.legacy.TestsConstants;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLEventstatsIT extends PPLIntegTestCase {
Expand Down Expand Up @@ -299,28 +297,6 @@ public void testUnsupportedWindowFunctions() {
}
}

@Ignore("DC should fail in window function")
public void testDistinctCountShouldFail() throws IOException {
Request request1 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
request1.setJsonEntity(
"{\"name\":\"Jim\",\"age\":27,\"state\":\"Ontario\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request1);
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(state) by country",
TEST_INDEX_STATE_COUNTRY));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 3),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3),
rows("Jim", "Canada", "Ontario", 4, 2023, 27, 3),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testMultipleEventstat() throws IOException {
JSONObject actual =
Expand Down Expand Up @@ -617,6 +593,111 @@ public void testEventstatVarianceWithNullBy() throws IOException {
rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800));
}

@Test
public void testEventstatDistinctCount() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state | fields name, country, state, month, year, age, dc_state", TEST_INDEX_STATE_COUNTRY));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fields required in these ITs.

Copy link
Contributor Author

@ahkcs ahkcs Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause the CI to fail, local testing was successful without fields

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause the CI to fail, local testing was successful without fields

If the original tests were successful without this PR, sounds a potential bug is introducing in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI failure is this PR was caused by the order of the schema, and this failure also is not able to be reproduced locally. I suspect that it's triggered by different testing environment in CI and local. Do you think we should also add this fields to main for consistency in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this failure also is not able to be reproduced locally. I suspect that it's triggered by different testing environment in CI and local.

The question is how the PR introduced a reproduced CI failure. we need to figure out the reason.


verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Test
public void testEventstatDistinctCountByCountry() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state by country | fields name, country, state, month, year, age, dc_state",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountFunction() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(country) as dc_country | fields name, country, state, month, year, age, dc_country",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_country", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state | fields name, country, state, month, year, age, dc_state",
TEST_INDEX_STATE_COUNTRY_WITH_NULL));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows(null, "Canada", null, 4, 2023, 10, 4),
rows("Kevin", null, null, 4, 2023, null, 4),
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Ignore
@Test
public void testEventstatEarliestAndLatest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
2 changes: 2 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ scalarWindowFunctionName
| LAST
| NTH
| NTILE
| DISTINCT_COUNT
| DC
;

// aggregation terms
Expand Down
Loading