|
6 | 6 | AVAILABLE FUNCTIONS |
7 | 7 | ------------------- |
8 | 8 | SequenceFunction - Generates a sequence of integers 0..n-1 |
9 | | -RangeFunction - Generates integers in a start..end range |
10 | 9 | ConstantTableFunction - Returns a constant single-row table |
11 | | -RandomSampleFunction - Generates random sample data (parallelizable) |
12 | 10 | GeneratorExceptionFunction - Demonstrates exception handling |
| 11 | +LoggingGeneratorFunction - Demonstrates log message emission |
| 12 | +MultiColumnGeneratorFunction - Demonstrates varargs with dynamic output schema |
| 13 | +PartitionedSequenceFunction - Demonstrates multi-worker parallel execution |
| 14 | +ProjectedDataFunction - Demonstrates projection pushdown |
| 15 | +SettingsAwareFunction - Demonstrates settings-aware output schema |
13 | 16 | """ |
14 | 17 |
|
15 | 18 | import struct |
|
35 | 38 | "ConstantTableFunction", |
36 | 39 | "GeneratorExceptionFunction", |
37 | 40 | "LoggingGeneratorFunction", |
| 41 | + "MultiColumnGeneratorFunction", |
38 | 42 | "PartitionedSequenceFunction", |
39 | 43 | "ProjectedDataFunction", |
40 | 44 | "SettingsAwareFunction", |
@@ -601,3 +605,95 @@ def process(self) -> OutputGenerator: |
601 | 605 | data["details"] = [f"row_{i}"] |
602 | 606 |
|
603 | 607 | yield Output(pa.RecordBatch.from_pydict(data, schema=output_schema)) |
| 608 | + |
| 609 | + |
| 610 | +class MultiColumnGeneratorFunction(TableFunctionGenerator): |
| 611 | + """Generates a table with dynamic columns based on varargs. |
| 612 | +
|
| 613 | + USE CASE |
| 614 | + -------- |
| 615 | + Demonstrates varargs where the output schema is determined by the |
| 616 | + column names provided as arguments. Each column name becomes a column |
| 617 | + in the output with sequential integer values. |
| 618 | +
|
| 619 | + This shows how varargs can be used to create flexible functions where |
| 620 | + the output structure depends on the number and names of arguments. |
| 621 | +
|
| 622 | + SCHEMA |
| 623 | + ------ |
| 624 | + Output schema is dynamic based on provided column names. |
| 625 | + Example: multi_column_generator(3, 'a', 'b', 'c') |
| 626 | + Output: {"a": int64, "b": int64, "c": int64} |
| 627 | +
|
| 628 | + PARALLELIZATION |
| 629 | + --------------- |
| 630 | + Single worker only (max_workers=1). |
| 631 | +
|
| 632 | + Example: |
| 633 | + ------- |
| 634 | + SELECT * FROM multi_column_generator(5, 'x', 'y') |
| 635 | + Returns: [{"x": 0, "y": 0}, {"x": 1, "y": 1}, ..., {"x": 4, "y": 4}] |
| 636 | +
|
| 637 | + SELECT * FROM multi_column_generator(3, 'id', 'value', 'score') |
| 638 | + Returns: [{"id": 0, "value": 0, "score": 0}, ...] |
| 639 | +
|
| 640 | + """ |
| 641 | + |
| 642 | + class Meta: |
| 643 | + """Metadata for MultiColumnGeneratorFunction.""" |
| 644 | + |
| 645 | + name = "multi_column_generator" |
| 646 | + description = "Generates a table with columns specified via varargs" |
| 647 | + categories = ["generator", "utility"] |
| 648 | + max_workers = 1 |
| 649 | + examples = [ |
| 650 | + FunctionExample( |
| 651 | + sql="SELECT * FROM multi_column_generator(5, 'x', 'y')", |
| 652 | + description="Generate 5 rows with columns x and y", |
| 653 | + ), |
| 654 | + FunctionExample( |
| 655 | + sql="SELECT * FROM multi_column_generator(10, 'a', 'b', 'c')", |
| 656 | + description="Generate 10 rows with columns a, b, and c", |
| 657 | + ), |
| 658 | + ] |
| 659 | + |
| 660 | + count: Annotated[int, Arg(0, doc="Number of rows to generate", ge=0)] |
| 661 | + columns: Annotated[ |
| 662 | + tuple[str, ...], |
| 663 | + Arg( |
| 664 | + 1, |
| 665 | + varargs=True, |
| 666 | + arrow_type=pa.string(), |
| 667 | + doc="Column names to generate (at least one required)", |
| 668 | + ), |
| 669 | + ] |
| 670 | + |
| 671 | + BATCH_SIZE: int = 1000 |
| 672 | + |
| 673 | + @property |
| 674 | + def output_schema(self) -> pa.Schema: |
| 675 | + """Return output schema with one int64 column per vararg.""" |
| 676 | + return pa.schema([pa.field(name, pa.int64()) for name in self.columns]) |
| 677 | + |
| 678 | + @property |
| 679 | + def cardinality(self) -> TableCardinality: |
| 680 | + """Return exact cardinality since we know the count.""" |
| 681 | + return TableCardinality(estimate=self.count, max=self.count) |
| 682 | + |
| 683 | + def process(self) -> OutputGenerator: |
| 684 | + """Generate data for all specified columns.""" |
| 685 | + output_schema = self.output_schema |
| 686 | + remaining = self.count |
| 687 | + current_row = 0 |
| 688 | + |
| 689 | + while remaining > 0: |
| 690 | + batch_size = min(remaining, self.BATCH_SIZE) |
| 691 | + |
| 692 | + # Generate sequence values for each column |
| 693 | + values = list(range(current_row, current_row + batch_size)) |
| 694 | + data = {name: values for name in self.columns} |
| 695 | + |
| 696 | + yield Output(pa.RecordBatch.from_pydict(data, schema=output_schema)) |
| 697 | + |
| 698 | + current_row += batch_size |
| 699 | + remaining -= batch_size |
0 commit comments