Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from pyarrow import lib
from core.models.type.large_binary import largebinary


class AttributeType(Enum):
"""
Types supported by PyTexera & PyAmber.
Expand Down Expand Up @@ -78,6 +77,17 @@ class AttributeType(Enum):
}


FROM_STRING_PARSER_MAPPING = {
AttributeType.STRING: str,
AttributeType.INT: int,
AttributeType.LONG: int,
AttributeType.DOUBLE: float,
AttributeType.BOOL: lambda v: str(v).strip().lower() in ("True", "true", "1", "yes"),
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boolean parser in FROM_STRING_PARSER_MAPPING for AttributeType.BOOL includes "True" in the truthy set (line 85), but this is redundant since the lambda already calls str(v).strip().lower() — after .lower(), no string can equal "True" (capital T). The "True" entry in the tuple is dead code and the set effectively only checks for "true", "1", and "yes".

Copilot uses AI. Check for mistakes.
AttributeType.BINARY: lambda v: v if isinstance(v, bytes) else str(v).encode(),
AttributeType.TIMESTAMP: lambda v: datetime.datetime.fromisoformat(v),
AttributeType.LARGE_BINARY: largebinary,
}

# Only single-directional mapping.
TO_PYOBJECT_MAPPING = {
AttributeType.STRING: str,
Expand Down
2 changes: 2 additions & 0 deletions amber/src/main/python/pyamber/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SourceOperator,
TupleOperatorV2,
State,
AttributeType,
)

__all__ = [
Expand All @@ -41,4 +42,5 @@
"TupleOperatorV2",
"SourceOperator",
"State",
"AttributeType",
]
2 changes: 2 additions & 0 deletions amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
UDFSourceOperator,
)
from core.models.type.large_binary import largebinary
from core.models.schema.attribute_type import *

__all__ = [
"State",
Expand All @@ -53,4 +54,5 @@
"Iterator",
"Optional",
"Union",
"AttributeType",
]
78 changes: 71 additions & 7 deletions amber/src/main/python/pytexera/udf/udf_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,76 @@
# under the License.

from abc import abstractmethod
from typing import Iterator, Optional, Union

from pyamber import *
from typing import Any, Dict, Iterator, Optional, Union

import functools

class UDFOperatorV2(TupleOperatorV2):
from pyamber import *
from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING

class _UiParameterSupport:
_ui_parameter_injected_values: Dict[str, Any] = {}
_ui_parameter_name_types: Dict[str, AttributeType] = {}

# Reserved hook name. Backend injector will generate this in the user's class.
def _texera_injected_ui_parameters(self) -> Dict[str, Any]:
return {}

def _texera_apply_injected_ui_parameters(self) -> None:
values = self._texera_injected_ui_parameters()
# Write to base class storage (not cls) because UiParameter reads from _UiParameterSupport directly
_UiParameterSupport._ui_parameter_injected_values = dict(values or {})
_UiParameterSupport._ui_parameter_name_types = {}
Comment on lines +27 to +38
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ui_parameter_injected_values and _ui_parameter_name_types are class-level dictionaries on _UiParameterSupport (lines 27-28). This means all subclass instances share the same state. If multiple UDF operator instances run concurrently in the same Python worker process (e.g., due to parallelism), one operator's _texera_apply_injected_ui_parameters() call could overwrite another operator's injected values, causing incorrect parameter values to be read. Each operator instance should have its own storage, for example by using instance-level variables instead of class-level ones.

Copilot uses AI. Check for mistakes.

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)

# Wrap only methods defined on this class (not inherited ones)
original_open = getattr(cls, "open", None)
if original_open is None:
return

# Avoid double wrapping
if getattr(original_open, "__texera_ui_params_wrapped__", False):
return

@functools.wraps(original_open)
def wrapped_open(self, *args, **kwargs):
self._texera_apply_injected_ui_parameters()
return original_open(self, *args, **kwargs)

setattr(wrapped_open, "__texera_ui_params_wrapped__", True)
cls.open = wrapped_open

class UiParameter:
def __init__(self, name: str, type: AttributeType):
if not isinstance(type, AttributeType):
raise TypeError(
f"UiParameter.type must be an AttributeType, got {type!r}."
)

existing_type = _UiParameterSupport._ui_parameter_name_types.get(name)
if existing_type is not None and existing_type != type:
raise ValueError(
f"Duplicate UiParameter name '{name}' with conflicting types: "
f"{existing_type.name} vs {type.name}."
)

_UiParameterSupport._ui_parameter_name_types[name] = type
raw_value = _UiParameterSupport._ui_parameter_injected_values.get(name)
self.name = name
self.type = type
self.value = _UiParameterSupport._parse(raw_value, type)
Comment on lines +61 to +78
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UiParameter.__init__ method uses type as a parameter name (line 61), which shadows the Python built-in type() function. While this works correctly inside the function, it is a Python anti-pattern that can lead to subtle bugs if developers modify this code and inadvertently try to use type(). A clearer name like attr_type would be more consistent with _parse's parameter name attr_type used on line 81.

Copilot uses AI. Check for mistakes.

@staticmethod
def _parse(value: Any, attr_type: AttributeType) -> Any:
if value is None:
return None

py_type = FROM_STRING_PARSER_MAPPING.get(attr_type)
return py_type(value)
Comment on lines +81 to +86
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _parse method in _UiParameterSupport (lines 81-86) does not handle the case where FROM_STRING_PARSER_MAPPING.get(attr_type) returns None (i.e., when the attr_type is not in the mapping). If py_type is None, calling py_type(value) on line 86 will raise a TypeError: 'NoneType' object is not callable, giving an unhelpful error to the user.

Copilot uses AI. Check for mistakes.

Comment on lines +26 to +87
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new _UiParameterSupport class and UiParameter inner class in udf_operator.py have no corresponding Python unit tests. The surrounding codebase has thorough test coverage (e.g., test_echo_operator.py, test_count_batch_operator.py). Tests should be added to cover at minimum: parameter value injection via _texera_apply_injected_ui_parameters, duplicate name detection with conflicting types in UiParameter, the _parse method for all supported attribute types, and the __init_subclass__ wrapping behavior.

Copilot uses AI. Check for mistakes.
class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2):
"""
Base class for tuple-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -65,7 +129,7 @@ def close(self) -> None:
pass


class UDFSourceOperator(SourceOperator):
class UDFSourceOperator(_UiParameterSupport, SourceOperator):
def open(self) -> None:
"""
Open a context of the operator. Usually can be used for loading/initiating some
Expand All @@ -90,7 +154,7 @@ def close(self) -> None:
pass


class UDFTableOperator(TableOperator):
class UDFTableOperator(_UiParameterSupport, TableOperator):
"""
Base class for table-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -123,7 +187,7 @@ def close(self) -> None:
pass


class UDFBatchOperator(BatchOperator):
class UDFBatchOperator(_UiParameterSupport, BatchOperator):
"""
Base class for batch-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.texera.amber.pybuilder.EncodableStringAnnotation;
import org.apache.texera.amber.pybuilder.PyStringTypes;
import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableStringFactory$;
Comment on lines +25 to +26
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two unused imports were added: import org.apache.texera.amber.pybuilder.PyStringTypes; (line 25) and import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableStringFactory$; (line 26). Neither PyStringTypes nor EncodableStringFactory$ are referenced anywhere in the file body — only @EncodableStringAnnotation from line 24's import is used. These unused imports should be removed.

Copilot uses AI. Check for mistakes.


import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -49,6 +53,7 @@ public Attribute(

@JsonProperty(value = "attributeName", required = true)
@NotBlank(message = "Attribute name is required")
@EncodableStringAnnotation
public String getName() {
return attributeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
)
var outputColumns: List[Attribute] = List()

@JsonProperty
@JsonSchemaTitle("Parameters")
@JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script")
var uiParameters: List[UiUDFParameter] = List()

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
Expand All @@ -88,7 +93,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(true)
.withSuggestedWorkerNum(workers)
Expand All @@ -98,7 +103,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class PythonUDFOpDescV2 extends LogicalOp {
)
var outputColumns: List[Attribute] = List()

@JsonProperty
@JsonSchemaTitle("Parameters")
@JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script")
var uiParameters: List[UiUDFParameter] = List()

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
Expand Down Expand Up @@ -118,7 +123,7 @@ class PythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(true)
.withSuggestedWorkerNum(workers)
Expand All @@ -128,7 +133,7 @@ class PythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(false)
}
Expand Down
Loading
Loading