diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 071bf544..65cf24da 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -1,4 +1,5 @@ import json +import sys import traceback from logging import Logger @@ -8,8 +9,13 @@ PropertyDescriptor, StandardValidators, ) +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 class ConvertJsonRecordSchema(FlowFileTransform): identifier = None @@ -49,7 +55,22 @@ def __init__(self, jvm: JVMView): validators=[StandardValidators.BOOLEAN_VALIDATOR]) ] + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors @@ -57,21 +78,24 @@ def getPropertyDescriptors(self) -> list[PropertyDescriptor]: def set_logger(self, logger: Logger): self.logger = logger - def set_properties(self, properties: dict): + def set_properties(self, properties: dict) -> None: """ Gets the properties from the processor's context and sets them as instance variables. Args: properties (dict): dictionary containing property names and values. """ - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def map_record(self, record: dict, json_mapper_schema: dict) -> dict: """ Maps the fields of a record to new field names based on the provided JSON schema mapping. + {new_field -> old_field, ....} Args: record (dict): The input record whose fields need to be mapped. @@ -83,22 +107,16 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: new_record: dict = {} - new_schema_field_names: list = [str(x).lower() for x in json_mapper_schema.keys()] + # reverse the json_mapper_schema to map old_field -> new_field + json_mapper_schema_reverse: dict = {v: k for k, v in json_mapper_schema.items() if v} for curr_field_name, curr_field_value in record.items(): - curr_field_name = str(curr_field_name).lower() - if curr_field_name in new_schema_field_names: + if curr_field_name in json_mapper_schema_reverse: + new_field_name = json_mapper_schema_reverse[curr_field_name] # check if the mapping is not a dict (nested field) - if isinstance(json_mapper_schema[curr_field_name], str): - new_record.update({json_mapper_schema[curr_field_name] : curr_field_value}) - elif isinstance(json_mapper_schema[curr_field_name], dict): - # nested field - new_record.update({curr_field_name: {}}) - for nested_field_name, nested_field_value in curr_field_value.items(): - if nested_field_name in json_mapper_schema[curr_field_name].keys(): - new_record[curr_field_name].update({ \ - json_mapper_schema[curr_field_name][nested_field_name]: nested_field_value}) - + if isinstance(new_field_name, str): + new_record.update({new_field_name: curr_field_value}) + elif self.preserve_non_mapped_fields: new_record.update({curr_field_name: curr_field_value}) diff --git a/nifi/user-python-extensions/prepare_record_for_nlp.py b/nifi/user-python-extensions/prepare_record_for_nlp.py index b0fd0221..99157745 100644 --- a/nifi/user-python-extensions/prepare_record_for_nlp.py +++ b/nifi/user-python-extensions/prepare_record_for_nlp.py @@ -2,7 +2,7 @@ import json import traceback from logging import Logger -from typing import Any, Dict, List, Union +from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader @@ -104,7 +104,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]] + reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] if self.process_flow_file_type == "avro": reader = DataFileReader(input_byte_buffer, DatumReader()) diff --git a/nifi/user-python-extensions/prepare_record_for_ocr.py b/nifi/user-python-extensions/prepare_record_for_ocr.py index 1c4f9d0a..b9a19f6b 100644 --- a/nifi/user-python-extensions/prepare_record_for_ocr.py +++ b/nifi/user-python-extensions/prepare_record_for_ocr.py @@ -4,7 +4,7 @@ import sys import traceback from logging import Logger -from typing import Any, Dict, List, Union +from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader @@ -19,7 +19,7 @@ # we need to add it to the sys imports sys.path.insert(0, "/opt/nifi/user-scripts") -from utils.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 +from utils.helpers.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 class PrepareRecordForOcr(FlowFileTransform): @@ -72,12 +72,12 @@ def __init__(self, jvm: JVMView): required=True, allowable_values=["avro", "json"]), ] + self.descriptors: list[PropertyDescriptor] = self._properties def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors - def set_logger(self, logger: Logger): self.logger = logger @@ -105,7 +105,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]] + reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] if self.process_flow_file_type == "avro": reader = DataFileReader(input_byte_buffer, DatumReader()) diff --git a/nifi/user-python-extensions/record_decompress_cerner_blob.py b/nifi/user-python-extensions/record_decompress_cerner_blob.py index ffee6dba..233c9c89 100644 --- a/nifi/user-python-extensions/record_decompress_cerner_blob.py +++ b/nifi/user-python-extensions/record_decompress_cerner_blob.py @@ -17,7 +17,8 @@ All RECORDS are expected to have the same fields, and presumably belonging to the same DOCUMENT. All the fields of these records should have the same field values, except for the binary data field. The binary data field is expected to be a base64 encoded string, which will be concatenated according to - the blob_sequence_order_field_name field, preserving the order of the blobs and composing the whole document (supposedly). + the blob_sequence_order_field_name field, preserving the order of the blobs and composing + the whole document (supposedly). The final base64 enncoded string will be decoded back to binary data, then decompressed using LZW algorithm. """ diff --git a/nifi/user-schemas/elasticsearch/base_index_settings.json b/nifi/user-schemas/elasticsearch/base_index_settings.json new file mode 100644 index 00000000..b150ec47 --- /dev/null +++ b/nifi/user-schemas/elasticsearch/base_index_settings.json @@ -0,0 +1,66 @@ +{ + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 1, + "refresh_interval": "60s" + } + }, + "mappings": { + "date_detection": true, + "dynamic_date_formats": [ + "yyyy-MM-dd HH:mm:ss", + "yyyy-MM-dd", + "epoch_millis", + "basic_date", + "date_hour", + "date_hour_minute", + "date_hour_minute_second", + "time", + "hour_minute", + "yyyy/MM/dd", + "dd/MM/yyyy", + "dd/MM/yyyy HH:mm", + "date_time", + "t_time", + "date_hour_minute_second_millis", + "basic_time", + "basic_time_no_millis", + "basic_t_time", + "hour_minute_second", + "HH:mm.ss", + "HH:mm.ssZ" + ], + "dynamic_templates": [ + { + "dates": { + "match_mapping_type": "string", + "match_pattern": "regex", + "match": "(?i).*(date|time|when|dt|dttm|timestamp|created|updated|modified|inserted|recorded|logged|entered|performed|signed|cosigned|completed|admit|discharge|visit|appointment|service|start|end|effective|expiry|validfrom|validto|close)$", + "mapping": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||basic_date||date_hour_minute_second" + } + } + }, + { + "strings_as_text": { + "match_mapping_type": "string", + "mapping": { + "type": "text", + "analyzer": "standard", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + ], + "properties": { + "id": { "type": "keyword" } + } + } +} diff --git a/nifi/user-scripts/get_files_from_storage.py b/nifi/user-scripts/get_files_from_storage.py index 9df4ea6b..f1aefbbb 100644 --- a/nifi/user-scripts/get_files_from_storage.py +++ b/nifi/user-scripts/get_files_from_storage.py @@ -11,8 +11,14 @@ import numpy import pandas -# get the arguments from the "Command Arguments" property in NiFi, we are looking at anything after the 1st arg (which is the script name) -# example args: ['/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/', 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022', 'file_id_csv_column_name_match=file_name_id_no_ext'] +# get the arguments from the "Command Arguments" property in NiFi, +# we are looking at anything after the 1st arg (which is the script name) +# example args: +# [ +# '/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/', +# 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022', +# 'file_id_csv_column_name_match=file_name_id_no_ext' +# ] folder_to_ingest = "2022" folder_pattern = ".*\d{4}\/\d{2}\/\d{2}" @@ -20,10 +26,12 @@ root_project_data_dir = "/opt/data/" csv_separator = "|" output_batch_size = 1000 -# generates a separate pseudoID, in this case, UUID for the documents. useful when doc IDs are weird or a mess and you dont want to spend time cleaning. +# generates a separate pseudoID, in this case, UUID for the documents. +# useful when doc IDs are weird or a mess and you dont want to spend time cleaning. generate_pseudo_doc_id = False -# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and the file name as the document_Id +# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and +# the file name as the document_Id operation_mode = "" encoding="UTF-8" diff --git a/nifi/user-scripts/utils/generic.py b/nifi/user-scripts/utils/generic.py index 867f5bd4..0f17e3bf 100644 --- a/nifi/user-scripts/utils/generic.py +++ b/nifi/user-scripts/utils/generic.py @@ -1,5 +1,7 @@ import json +import logging import os +import traceback from collections import defaultdict from typing import Union @@ -39,3 +41,60 @@ def dict2jsonl_file(input_dict: Union[dict| defaultdict], file_path: str): json_obj = json.loads(json.dumps(o)) json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) print('', file=outfile) + + +def get_logger(name: str) -> logging.Logger: + """Return a configured logger shared across all NiFi clients.""" + level_name = os.getenv("NIFI_LOG_LEVEL", "INFO").upper() + level = getattr(logging, level_name, logging.INFO) + + logger = logging.getLogger(name) + if not logger.handlers: + handler = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter( + "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", + "%Y-%m-%d %H:%M:%S", + ) + handler.setFormatter(fmt) + logger.addHandler(handler) + logger.setLevel(level) + logger.propagate = False + return logger + + +# ----------------------------------------------------------------------------------------------------------------- +# Function for handling property parsing, used in NiFi processors mainly, but can beused elsewhere +# ----------------------------------------------------------------------------------------------------------------- +def parse_value(value: str) -> str|int|float|bool: + """Convert NiFi string property values into native Python types.""" + if isinstance(value, bool): + return value + + val_str = str(value).strip() + if val_str.lower() in ("true", "false"): + return val_str.lower() == "true" + if val_str.replace(".", "", 1).isdigit(): + return float(val_str) if "." in val_str else int(val_str) + return value + + +# ----------------------------------------------------------------------------------------------------------------- +# Safe execution wrapper with consistent error logging +# ----------------------------------------------------------------------------------------------------------------- +def safe_execute(logger: logging.Logger, func, *args, **kwargs): + """ + Executes a function safely, logging errors with full traceback. + + Args: + logger (logging.Logger): Logger to write errors to. + func (Callable): Function to execute. + *args, **kwargs: Arguments passed to the function. + + Returns: + The result of func(*args, **kwargs), or None on error. + """ + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"❌ Error during execution of {func.__name__}: {e}\n{traceback.format_exc()}") + raise diff --git a/nifi/user-scripts/utils/avro_json_encoder.py b/nifi/user-scripts/utils/helpers/avro_json_encoder.py similarity index 100% rename from nifi/user-scripts/utils/avro_json_encoder.py rename to nifi/user-scripts/utils/helpers/avro_json_encoder.py diff --git a/nifi/user-scripts/utils/helpers/base_nifi_processor.py b/nifi/user-scripts/utils/helpers/base_nifi_processor.py new file mode 100644 index 00000000..986cf3ea --- /dev/null +++ b/nifi/user-scripts/utils/helpers/base_nifi_processor.py @@ -0,0 +1,119 @@ +import io +import json +import logging +import sys +import traceback +from logging import Logger + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import ( + ProcessContext, + PropertyDescriptor, + StandardValidators, +) +from nifiapi.relationship import Relationship +from py4j.java_gateway import JavaObject, JVMView + +# this script is using a custom utility for decompressing Cerner blobs +# from nifi/user-python-extensions/record_decompress_cerner_blob.py +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 + + +class BaseNiFiProcessor(FlowFileTransform): + """Base class providing common NiFi Python processor utilities.""" + + identifier = None + logger: Logger = Logger(__qualname__) + + + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + + class ProcessorDetails: + version = '0.0.1' + + def __init__(self, jvm: JVMView): + """ + Args: + jvm (JVMView): Required, Store if you need to use Java classes later. + """ + self.jvm = jvm + self.logger: Logger = logging.getLogger(self.__class__.__name__) + self.process_context: ProcessContext + + self.sample_property_one: bool = True + self.sample_property_two: str = "" + self.sample_property_three: str = "default_value_one" + + # this is directly mirrored to the UI + self._properties = [ + PropertyDescriptor(name="sample_property_one", + description="sample property one description", + default_value="true", + required=True, + validators=StandardValidators.BOOLEAN_VALIDATOR), + PropertyDescriptor(name="sample_property_two", + description="sample property two description", + required=False, + default_value="some_value"), + PropertyDescriptor(name="sample_property_three", + required=True, + description="sample property three description", + default_value="default_value_one", + allowable_values=["default_value_one", "default_value_two", "default_value_three"], + validators=StandardValidators.NON_EMPTY_VALIDATOR) + ] + + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships + + def getPropertyDescriptors(self) -> list[PropertyDescriptor]: + return self.descriptors + + def set_logger(self, logger: Logger): + self.logger = logger + + def set_properties(self, properties: dict) -> None: + """Populate class attributes from NiFi property map.""" + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") + + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + """ Main processor logic. This example reads Avro records from the incoming flowfile, + and writes them back out to a new flowfile. It also adds the processor properties + to the flowfile attributes. IT IS A SAMPLE ONLY, + you are meant to use this as a starting point for other processors + """ + output_contents = [] + try: + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + self.logger.info("Successfully transformed Avro content for OCR") + + return FlowFileTransformResult(relationship="success", + attributes=attributes, + contents=json.dumps(output_contents)) + except Exception as exception: + self.logger.error("Exception during Avro processing: " + traceback.format_exc()) + raise exception diff --git a/nifi/user-scripts/utils/helpers/logging.py b/nifi/user-scripts/utils/helpers/logging.py deleted file mode 100644 index eaa6a7bd..00000000 --- a/nifi/user-scripts/utils/helpers/logging.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -import os -import sys - - -def get_logger(name: str) -> logging.Logger: - """Return a configured logger shared across all NiFi clients.""" - level_name = os.getenv("NIFI_LOG_LEVEL", "INFO").upper() - level = getattr(logging, level_name, logging.INFO) - - logger = logging.getLogger(name) - if not logger.handlers: - handler = logging.StreamHandler(sys.stdout) - fmt = logging.Formatter( - "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", - "%Y-%m-%d %H:%M:%S", - ) - handler.setFormatter(fmt) - logger.addHandler(handler) - logger.setLevel(level) - logger.propagate = False - return logger diff --git a/nifi/user-scripts/utils/helpers/nifi_api_client.py b/nifi/user-scripts/utils/helpers/nifi_api_client.py index 372d7ef0..1c353d2c 100644 --- a/nifi/user-scripts/utils/helpers/nifi_api_client.py +++ b/nifi/user-scripts/utils/helpers/nifi_api_client.py @@ -1,5 +1,4 @@ from logging import Logger -from typing import List # noqa: UP035 from dto.nifi_api_config import NiFiAPIConfig from nipyapi import canvas, security @@ -10,7 +9,7 @@ from nipyapi.registry import ApiClient as RegistryApiClient from nipyapi.registry import BucketsApi from nipyapi.registry.configuration import Configuration as RegistryConfiguration -from utils.helpers.logging import get_logger +from utils.generic import get_logger class NiFiRegistryClient: @@ -63,7 +62,7 @@ def _login(self) -> None: def get_root_process_group_id(self) -> str: return canvas.get_root_pg_id() - def get_process_group_by_name(self, process_group_name: str) -> None | List[object] | object: + def get_process_group_by_name(self, process_group_name: str) -> None | list[object] | object: return canvas.get_process_group(process_group_name, identifier_type="nam") def get_process_group_by_id(self, process_group_id: str) -> ProcessGroupEntity: @@ -75,9 +74,9 @@ def start_process_group(self, process_group_id: str) -> bool: def stop_process_group(self, process_group_id: str) -> bool: return canvas.schedule_process_group(process_group_id, False) - def get_child_process_groups_from_parent_id(self, parent_process_group_id: str) -> List[ProcessGroupEntity]: + def get_child_process_groups_from_parent_id(self, parent_process_group_id: str) -> list[ProcessGroupEntity]: parent_pg = canvas.get_process_group(parent_process_group_id, identifier_type="id") return canvas.list_all_process_groups(parent_pg.id) - def get_all_processors_in_process_group(self, process_group_id: str) -> List[ProcessorEntity]: + def get_all_processors_in_process_group(self, process_group_id: str) -> list[ProcessorEntity]: return canvas.list_all_processors(process_group_id) diff --git a/nifi/user-scripts/utils/helpers/service.py b/nifi/user-scripts/utils/helpers/service.py new file mode 100644 index 00000000..9d4b2808 --- /dev/null +++ b/nifi/user-scripts/utils/helpers/service.py @@ -0,0 +1,33 @@ +import sys +import time + +import psycopg2 +from psycopg2 import sql + +sys.path.append("../../dto/") + +from dto.pg_config import PGConfig + + +def check_postgres(cfg: PGConfig) -> tuple[bool, float | None, str | None]: + """Return (is_healthy, latency_ms, error_detail)""" + start = time.perf_counter() + try: + conn = psycopg2.connect( + host=cfg.host, + port=cfg.port, + dbname=cfg.db, + user=cfg.user, + password=cfg.password, + connect_timeout=cfg.timeout + ) + with conn.cursor() as cur: + cur.execute(sql.SQL("SELECT 1;")) + result = cur.fetchone() + conn.close() + if result != (1,): + return False, None, f"Unexpected result: {result}" + latency = (time.perf_counter() - start) * 1000 + return True, latency, None + except Exception as e: + return False, None, str(e)