Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions nifi/user-python-extensions/convert_json_record_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import sys
import traceback
from logging import Logger

Expand All @@ -8,8 +9,13 @@
PropertyDescriptor,
StandardValidators,
)
from nifiapi.relationship import Relationship
from py4j.java_gateway import JavaObject, JVMView

# we need to add it to the sys imports
sys.path.insert(0, "/opt/nifi/user-scripts")

from utils.generic import parse_value # noqa: I001,E402

class ConvertJsonRecordSchema(FlowFileTransform):
identifier = None
Expand Down Expand Up @@ -49,29 +55,47 @@ def __init__(self, jvm: JVMView):
validators=[StandardValidators.BOOLEAN_VALIDATOR])
]

self._relationships = [
Relationship(
name="success",
description="All FlowFiles processed successfully."
),
Relationship(
name="failure",
description="FlowFiles that failed processing."
)
]

self.descriptors: list[PropertyDescriptor] = self._properties
self.relationships: list[Relationship] = self._relationships

def getRelationships(self) -> list[Relationship]:
return self.relationships

def getPropertyDescriptors(self) -> list[PropertyDescriptor]:
return self.descriptors

def set_logger(self, logger: Logger):
self.logger = logger

def set_properties(self, properties: dict):
def set_properties(self, properties: dict) -> None:
""" Gets the properties from the processor's context and sets them as instance variables.

Args:
properties (dict): dictionary containing property names and values.
"""

for k, v in list(properties.items()):
self.logger.debug(f"property set '{k.name}' with value '{v}'")
if hasattr(self, k.name):
setattr(self, k.name, v)
for k, v in properties.items():
name = k.name if hasattr(k, "name") else str(k)
val = parse_value(v)
if hasattr(self, name):
setattr(self, name, val)
self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})")

def map_record(self, record: dict, json_mapper_schema: dict) -> dict:
"""
Maps the fields of a record to new field names based on the provided JSON schema mapping.
{new_field -> old_field, ....}

Args:
record (dict): The input record whose fields need to be mapped.
Expand All @@ -83,22 +107,16 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict:

new_record: dict = {}

new_schema_field_names: list = [str(x).lower() for x in json_mapper_schema.keys()]
# reverse the json_mapper_schema to map old_field -> new_field
json_mapper_schema_reverse: dict = {v: k for k, v in json_mapper_schema.items() if v}

for curr_field_name, curr_field_value in record.items():
curr_field_name = str(curr_field_name).lower()
if curr_field_name in new_schema_field_names:
if curr_field_name in json_mapper_schema_reverse:
new_field_name = json_mapper_schema_reverse[curr_field_name]
# check if the mapping is not a dict (nested field)
if isinstance(json_mapper_schema[curr_field_name], str):
new_record.update({json_mapper_schema[curr_field_name] : curr_field_value})
elif isinstance(json_mapper_schema[curr_field_name], dict):
# nested field
new_record.update({curr_field_name: {}})
for nested_field_name, nested_field_value in curr_field_value.items():
if nested_field_name in json_mapper_schema[curr_field_name].keys():
new_record[curr_field_name].update({ \
json_mapper_schema[curr_field_name][nested_field_name]: nested_field_value})

if isinstance(new_field_name, str):
new_record.update({new_field_name: curr_field_value})

elif self.preserve_non_mapped_fields:
new_record.update({curr_field_name: curr_field_value})

Expand Down
4 changes: 2 additions & 2 deletions nifi/user-python-extensions/prepare_record_for_nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import traceback
from logging import Logger
from typing import Any, Dict, List, Union
from typing import Any, Union

from avro.datafile import DataFileReader
from avro.io import DatumReader
Expand Down Expand Up @@ -104,7 +104,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore
input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)

reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]]
reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]]

if self.process_flow_file_type == "avro":
reader = DataFileReader(input_byte_buffer, DatumReader())
Expand Down
8 changes: 4 additions & 4 deletions nifi/user-python-extensions/prepare_record_for_ocr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import traceback
from logging import Logger
from typing import Any, Dict, List, Union
from typing import Any, Union

from avro.datafile import DataFileReader
from avro.io import DatumReader
Expand All @@ -19,7 +19,7 @@
# we need to add it to the sys imports
sys.path.insert(0, "/opt/nifi/user-scripts")

from utils.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402
from utils.helpers.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402


class PrepareRecordForOcr(FlowFileTransform):
Expand Down Expand Up @@ -72,12 +72,12 @@ def __init__(self, jvm: JVMView):
required=True,
allowable_values=["avro", "json"]),
]

self.descriptors: list[PropertyDescriptor] = self._properties

def getPropertyDescriptors(self) -> list[PropertyDescriptor]:
return self.descriptors


def set_logger(self, logger: Logger):
self.logger = logger

Expand Down Expand Up @@ -105,7 +105,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore
input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)

reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]]
reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]]

if self.process_flow_file_type == "avro":
reader = DataFileReader(input_byte_buffer, DatumReader())
Expand Down
3 changes: 2 additions & 1 deletion nifi/user-python-extensions/record_decompress_cerner_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
All RECORDS are expected to have the same fields, and presumably belonging to the same DOCUMENT.
All the fields of these records should have the same field values, except for the binary data field.
The binary data field is expected to be a base64 encoded string, which will be concatenated according to
the blob_sequence_order_field_name field, preserving the order of the blobs and composing the whole document (supposedly).
the blob_sequence_order_field_name field, preserving the order of the blobs and composing
the whole document (supposedly).
The final base64 enncoded string will be decoded back to binary data, then decompressed using LZW algorithm.
"""

Expand Down
66 changes: 66 additions & 0 deletions nifi/user-schemas/elasticsearch/base_index_settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "60s"
}
},
"mappings": {
"date_detection": true,
"dynamic_date_formats": [
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd",
"epoch_millis",
"basic_date",
"date_hour",
"date_hour_minute",
"date_hour_minute_second",
"time",
"hour_minute",
"yyyy/MM/dd",
"dd/MM/yyyy",
"dd/MM/yyyy HH:mm",
"date_time",
"t_time",
"date_hour_minute_second_millis",
"basic_time",
"basic_time_no_millis",
"basic_t_time",
"hour_minute_second",
"HH:mm.ss",
"HH:mm.ssZ"
],
"dynamic_templates": [
{
"dates": {
"match_mapping_type": "string",
"match_pattern": "regex",
"match": "(?i).*(date|time|when|dt|dttm|timestamp|created|updated|modified|inserted|recorded|logged|entered|performed|signed|cosigned|completed|admit|discharge|visit|appointment|service|start|end|effective|expiry|validfrom|validto|close)$",
"mapping": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||basic_date||date_hour_minute_second"
}
}
},
{
"strings_as_text": {
"match_mapping_type": "string",
"mapping": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
],
"properties": {
"id": { "type": "keyword" }
}
}
}
16 changes: 12 additions & 4 deletions nifi/user-scripts/get_files_from_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@
import numpy
import pandas

# get the arguments from the "Command Arguments" property in NiFi, we are looking at anything after the 1st arg (which is the script name)
# example args: ['/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/', 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022', 'file_id_csv_column_name_match=file_name_id_no_ext']
# get the arguments from the "Command Arguments" property in NiFi,
# we are looking at anything after the 1st arg (which is the script name)
# example args:
# [
# '/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/',
# 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022',
# 'file_id_csv_column_name_match=file_name_id_no_ext'
# ]

folder_to_ingest = "2022"
folder_pattern = ".*\d{4}\/\d{2}\/\d{2}"
file_id_csv_column_name_match = "file_name_id_no_ext"
root_project_data_dir = "/opt/data/"
csv_separator = "|"
output_batch_size = 1000
# generates a separate pseudoID, in this case, UUID for the documents. useful when doc IDs are weird or a mess and you dont want to spend time cleaning.
# generates a separate pseudoID, in this case, UUID for the documents.
# useful when doc IDs are weird or a mess and you dont want to spend time cleaning.
generate_pseudo_doc_id = False

# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and the file name as the document_Id
# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and
# the file name as the document_Id
operation_mode = ""

encoding="UTF-8"
Expand Down
59 changes: 59 additions & 0 deletions nifi/user-scripts/utils/generic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import logging
import os
import traceback
from collections import defaultdict
from typing import Union

Expand Down Expand Up @@ -39,3 +41,60 @@ def dict2jsonl_file(input_dict: Union[dict| defaultdict], file_path: str):
json_obj = json.loads(json.dumps(o))
json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':'))
print('', file=outfile)


def get_logger(name: str) -> logging.Logger:
"""Return a configured logger shared across all NiFi clients."""
level_name = os.getenv("NIFI_LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)

logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
fmt = logging.Formatter(
"[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s",
"%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
logger.propagate = False
return logger


# -----------------------------------------------------------------------------------------------------------------
# Function for handling property parsing, used in NiFi processors mainly, but can beused elsewhere
# -----------------------------------------------------------------------------------------------------------------
def parse_value(value: str) -> str|int|float|bool:
"""Convert NiFi string property values into native Python types."""
if isinstance(value, bool):
return value

val_str = str(value).strip()
if val_str.lower() in ("true", "false"):
return val_str.lower() == "true"
if val_str.replace(".", "", 1).isdigit():
return float(val_str) if "." in val_str else int(val_str)
return value


# -----------------------------------------------------------------------------------------------------------------
# Safe execution wrapper with consistent error logging
# -----------------------------------------------------------------------------------------------------------------
def safe_execute(logger: logging.Logger, func, *args, **kwargs):
"""
Executes a function safely, logging errors with full traceback.

Args:
logger (logging.Logger): Logger to write errors to.
func (Callable): Function to execute.
*args, **kwargs: Arguments passed to the function.

Returns:
The result of func(*args, **kwargs), or None on error.
"""
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"❌ Error during execution of {func.__name__}: {e}\n{traceback.format_exc()}")
raise
Loading