Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,53 @@ class AttributeType(Enum):
}


FROM_STRING_PARSER_MAPPING = {
AttributeType.STRING: str,
AttributeType.INT: lambda v: (
0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v)
),
AttributeType.LONG: lambda v: (
0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v)
),
AttributeType.DOUBLE: lambda v: (
0.0 if v is None or (isinstance(v, str) and v.strip() == "") else float(v)
),
AttributeType.BOOL: lambda v: (
False
if v is None or (isinstance(v, str) and v.strip() == "")
else (
True
if str(v).strip().lower() == "true"
else (
False
if str(v).strip().lower() == "false"
else float(str(v).strip()) != 0
)
)
),
AttributeType.BINARY: lambda v: (
(_ for _ in ()).throw(
ValueError(
"UiParameter does not support BINARY values. "
"Use a supported type instead."
)
)
),
AttributeType.TIMESTAMP: lambda v: (
datetime.datetime.fromtimestamp(0)
if v is None or (isinstance(v, str) and v.strip() == "")
else datetime.datetime.fromisoformat(v)
),
AttributeType.LARGE_BINARY: lambda v: (
(_ for _ in ()).throw(
ValueError(
"UiParameter does not support LARGE_BINARY values. "
"Use a supported type instead."
)
)
),
}

# Only single-directional mapping.
TO_PYOBJECT_MAPPING = {
AttributeType.STRING: str,
Expand Down
6 changes: 5 additions & 1 deletion amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from loguru import logger
from overrides import overrides
from typing import Iterator, Optional, Union
from typing import Iterator, Optional, Union, Dict, Any

from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
Expand All @@ -30,6 +30,7 @@
UDFSourceOperator,
)
from core.models.type.large_binary import largebinary
from core.models.schema.attribute_type import *

__all__ = [
"State",
Expand All @@ -53,4 +54,7 @@
"Iterator",
"Optional",
"Union",
"Dict",
"Any",
"AttributeType",
]
188 changes: 188 additions & 0 deletions amber/src/main/python/pytexera/udf/test_udf_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
from typing import Iterator, Optional

import pytest

from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2
from pytexera.udf.udf_operator import _UiParameterSupport


class InjectedParametersOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {
"count": "7",
"enabled": "1",
"created_at": "2024-01-01T00:00:00",
}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)
self.enabled_parameter = self.UiParameter(
name="enabled", type=AttributeType.BOOL
)
self.created_at_parameter = self.UiParameter(
"created_at", type=AttributeType.TIMESTAMP
)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class ConflictingParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"duplicate": "1"}

def open(self):
self.UiParameter("duplicate", AttributeType.INT)
self.UiParameter("duplicate", AttributeType.STRING)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class FirstIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "1"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class SecondIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "2"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class TestUiParameterSupport:
def test_injected_values_are_applied_before_open(self):
operator = InjectedParametersOperator()

operator.open()

assert operator.count_parameter.value == 7
assert operator.enabled_parameter.value is True
assert operator.created_at_parameter.value == datetime.datetime(
2024, 1, 1, 0, 0
)

def test_duplicate_parameter_names_with_conflicting_types_raise(self):
operator = ConflictingParameterOperator()

with pytest.raises(ValueError) as exc_info:
operator.open()

assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value)

@pytest.mark.parametrize(
("raw_value", "attr_type", "expected"),
[
("hello", AttributeType.STRING, "hello"),
("7", AttributeType.INT, 7),
("99", AttributeType.LONG, 99),
("3.14", AttributeType.DOUBLE, 3.14),
("1", AttributeType.BOOL, True),
(
"2024-01-01T00:00:00",
AttributeType.TIMESTAMP,
datetime.datetime(2024, 1, 1, 0, 0),
),
],
)
def test_parse_supported_types(self, raw_value, attr_type, expected):
assert _UiParameterSupport._parse(raw_value, attr_type) == expected

@pytest.mark.parametrize(
("raw_value", "expected"),
[
("", False),
(" ", False),
("True", True),
("true", True),
("1", True),
("1.0", True),
("2", True),
("-1", True),
("False", False),
("false", False),
("0", False),
("0.0", False),
],
)
def test_parse_bool_string_values(self, raw_value, expected):
assert _UiParameterSupport._parse(raw_value, AttributeType.BOOL) is expected

@pytest.mark.parametrize(
("raw_value", "attr_type", "expected_message"),
[
(
"payload",
AttributeType.BINARY,
"UiParameter does not support BINARY values",
),
(
"s3://bucket/path/to/object",
AttributeType.LARGE_BINARY,
"UiParameter does not support LARGE_BINARY values",
),
],
)
def test_parse_binary_types_raise_helpful_error(
self, raw_value, attr_type, expected_message
):
with pytest.raises(ValueError, match=expected_message):
_UiParameterSupport._parse(raw_value, attr_type)

def test_parse_unsupported_type_raises_helpful_error(self):
with pytest.raises(TypeError, match="UiParameter.type .* is not supported"):
_UiParameterSupport._parse("value", object())

def test_wrapped_open_uses_instance_local_state(self):
assert (
getattr(
FirstIndependentParameterOperator.open,
"__texera_ui_params_wrapped__",
False,
)
is True
)

first_operator = FirstIndependentParameterOperator()
second_operator = SecondIndependentParameterOperator()

first_operator.open()
second_operator.open()

assert first_operator.count_parameter.value == 1
assert second_operator.count_parameter.value == 2
assert first_operator._ui_parameter_injected_values == {"count": "1"}
assert second_operator._ui_parameter_injected_values == {"count": "2"}
assert (
first_operator._ui_parameter_injected_values
is not second_operator._ui_parameter_injected_values
)
Loading