From 1c14d91dc0be17ada1e842fc669a8a59b7970f86 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Sun, 26 Oct 2025 21:18:48 +0000 Subject: [PATCH 1/5] NiFi: updated processor to new standard (testing). --- .../convert_json_record_schema.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 071bf5448..92b684af2 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -8,6 +8,7 @@ PropertyDescriptor, StandardValidators, ) +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView @@ -49,7 +50,22 @@ def __init__(self, jvm: JVMView): validators=[StandardValidators.BOOLEAN_VALIDATOR]) ] + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors From 174cc7e8d0c944bd1ed4b71a75d7ebccf70bdab5 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Mon, 27 Oct 2025 14:33:51 +0000 Subject: [PATCH 2/5] NiFi: proc updates. --- .../convert_json_record_schema.py | 21 +++++++------------ .../prepare_record_for_ocr.py | 2 +- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 92b684af2..b0586a7be 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -88,6 +88,7 @@ def set_properties(self, properties: dict): def map_record(self, record: dict, json_mapper_schema: dict) -> dict: """ Maps the fields of a record to new field names based on the provided JSON schema mapping. + {new_field -> old_field, ....} Args: record (dict): The input record whose fields need to be mapped. @@ -99,22 +100,16 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: new_record: dict = {} - new_schema_field_names: list = [str(x).lower() for x in json_mapper_schema.keys()] + # reverse the json_mapper_schema to map old_field -> new_field + json_mapper_schema_reverse: dict = {v: k for k, v in json_mapper_schema.items() if v} for curr_field_name, curr_field_value in record.items(): - curr_field_name = str(curr_field_name).lower() - if curr_field_name in new_schema_field_names: + if curr_field_name in json_mapper_schema_reverse: + new_field_name = json_mapper_schema_reverse[curr_field_name] # check if the mapping is not a dict (nested field) - if isinstance(json_mapper_schema[curr_field_name], str): - new_record.update({json_mapper_schema[curr_field_name] : curr_field_value}) - elif isinstance(json_mapper_schema[curr_field_name], dict): - # nested field - new_record.update({curr_field_name: {}}) - for nested_field_name, nested_field_value in curr_field_value.items(): - if nested_field_name in json_mapper_schema[curr_field_name].keys(): - new_record[curr_field_name].update({ \ - json_mapper_schema[curr_field_name][nested_field_name]: nested_field_value}) - + if isinstance(new_field_name, str): + new_record.update({new_field_name: curr_field_value}) + elif self.preserve_non_mapped_fields: new_record.update({curr_field_name: curr_field_value}) diff --git a/nifi/user-python-extensions/prepare_record_for_ocr.py b/nifi/user-python-extensions/prepare_record_for_ocr.py index 1c4f9d0a0..561cb1bdb 100644 --- a/nifi/user-python-extensions/prepare_record_for_ocr.py +++ b/nifi/user-python-extensions/prepare_record_for_ocr.py @@ -72,12 +72,12 @@ def __init__(self, jvm: JVMView): required=True, allowable_values=["avro", "json"]), ] + self.descriptors: list[PropertyDescriptor] = self._properties def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors - def set_logger(self, logger: Logger): self.logger = logger From 4149205f50f1868d149285da515aa3bc56ad8355 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Mon, 27 Oct 2025 15:09:16 +0000 Subject: [PATCH 3/5] NiFi: proc updates (prop normalisation) --- .../convert_json_record_schema.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index b0586a7be..75dccc22f 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -81,9 +81,24 @@ def set_properties(self, properties: dict): """ for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) + name = k.name if hasattr(k, "name") else str(k) + val_str = str(v).strip() + + # Boolean normalization + if val_str.lower() in ("true", "false"): + val = val_str.lower() == "true" + + # Numeric normalization (optional) + elif val_str.replace(".", "", 1).isdigit(): + val = float(val_str) if "." in val_str else int(val_str) + else: + # leave as string/path etc. + val = v + + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") + + if hasattr(self, name): + setattr(self, name, val) def map_record(self, record: dict, json_mapper_schema: dict) -> dict: """ From ca5a346e90194f4c9d6893c149fe4419f52eb8e0 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Mon, 27 Oct 2025 16:04:02 +0000 Subject: [PATCH 4/5] NiFi: refactored processors & util scripts. --- .../convert_json_record_schema.py | 26 ++-- .../prepare_record_for_nlp.py | 4 +- .../prepare_record_for_ocr.py | 6 +- nifi/user-scripts/get_files_from_storage.py | 16 ++- nifi/user-scripts/utils/generic.py | 59 +++++++++ .../utils/{ => helpers}/avro_json_encoder.py | 0 .../utils/helpers/base_nifi_processor.py | 119 ++++++++++++++++++ nifi/user-scripts/utils/helpers/logging.py | 22 ---- .../utils/helpers/nifi_api_client.py | 9 +- nifi/user-scripts/utils/helpers/service.py | 33 +++++ 10 files changed, 241 insertions(+), 53 deletions(-) rename nifi/user-scripts/utils/{ => helpers}/avro_json_encoder.py (100%) create mode 100644 nifi/user-scripts/utils/helpers/base_nifi_processor.py delete mode 100644 nifi/user-scripts/utils/helpers/logging.py create mode 100644 nifi/user-scripts/utils/helpers/service.py diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 75dccc22f..65cf24da6 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -1,4 +1,5 @@ import json +import sys import traceback from logging import Logger @@ -11,6 +12,10 @@ from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 class ConvertJsonRecordSchema(FlowFileTransform): identifier = None @@ -73,32 +78,19 @@ def getPropertyDescriptors(self) -> list[PropertyDescriptor]: def set_logger(self, logger: Logger): self.logger = logger - def set_properties(self, properties: dict): + def set_properties(self, properties: dict) -> None: """ Gets the properties from the processor's context and sets them as instance variables. Args: properties (dict): dictionary containing property names and values. """ - for k, v in list(properties.items()): + for k, v in properties.items(): name = k.name if hasattr(k, "name") else str(k) - val_str = str(v).strip() - - # Boolean normalization - if val_str.lower() in ("true", "false"): - val = val_str.lower() == "true" - - # Numeric normalization (optional) - elif val_str.replace(".", "", 1).isdigit(): - val = float(val_str) if "." in val_str else int(val_str) - else: - # leave as string/path etc. - val = v - - self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") - + val = parse_value(v) if hasattr(self, name): setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def map_record(self, record: dict, json_mapper_schema: dict) -> dict: """ diff --git a/nifi/user-python-extensions/prepare_record_for_nlp.py b/nifi/user-python-extensions/prepare_record_for_nlp.py index b0fd0221e..99157745c 100644 --- a/nifi/user-python-extensions/prepare_record_for_nlp.py +++ b/nifi/user-python-extensions/prepare_record_for_nlp.py @@ -2,7 +2,7 @@ import json import traceback from logging import Logger -from typing import Any, Dict, List, Union +from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader @@ -104,7 +104,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]] + reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] if self.process_flow_file_type == "avro": reader = DataFileReader(input_byte_buffer, DatumReader()) diff --git a/nifi/user-python-extensions/prepare_record_for_ocr.py b/nifi/user-python-extensions/prepare_record_for_ocr.py index 561cb1bdb..b9a19f6b9 100644 --- a/nifi/user-python-extensions/prepare_record_for_ocr.py +++ b/nifi/user-python-extensions/prepare_record_for_ocr.py @@ -4,7 +4,7 @@ import sys import traceback from logging import Logger -from typing import Any, Dict, List, Union +from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader @@ -19,7 +19,7 @@ # we need to add it to the sys imports sys.path.insert(0, "/opt/nifi/user-scripts") -from utils.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 +from utils.helpers.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 class PrepareRecordForOcr(FlowFileTransform): @@ -105,7 +105,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: Union[DataFileReader, List[Dict[str, Any]] | List[Any]] + reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] if self.process_flow_file_type == "avro": reader = DataFileReader(input_byte_buffer, DatumReader()) diff --git a/nifi/user-scripts/get_files_from_storage.py b/nifi/user-scripts/get_files_from_storage.py index 9df4ea6b3..f1aefbbb2 100644 --- a/nifi/user-scripts/get_files_from_storage.py +++ b/nifi/user-scripts/get_files_from_storage.py @@ -11,8 +11,14 @@ import numpy import pandas -# get the arguments from the "Command Arguments" property in NiFi, we are looking at anything after the 1st arg (which is the script name) -# example args: ['/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/', 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022', 'file_id_csv_column_name_match=file_name_id_no_ext'] +# get the arguments from the "Command Arguments" property in NiFi, +# we are looking at anything after the 1st arg (which is the script name) +# example args: +# [ +# '/opt/nifi/user-scripts/get_files_from_storage.py', 'root_project_data_dir=/opt/data/', +# 'folder_pattern=.*\\d{4}\\/\\d{2}\\/\\d{2}', 'folder_to_ingest=2022', +# 'file_id_csv_column_name_match=file_name_id_no_ext' +# ] folder_to_ingest = "2022" folder_pattern = ".*\d{4}\/\d{2}\/\d{2}" @@ -20,10 +26,12 @@ root_project_data_dir = "/opt/data/" csv_separator = "|" output_batch_size = 1000 -# generates a separate pseudoID, in this case, UUID for the documents. useful when doc IDs are weird or a mess and you dont want to spend time cleaning. +# generates a separate pseudoID, in this case, UUID for the documents. +# useful when doc IDs are weird or a mess and you dont want to spend time cleaning. generate_pseudo_doc_id = False -# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and the file name as the document_Id +# default: None, possible values: "files_only" - read files and only store their text & binary content (pre-ocr) and +# the file name as the document_Id operation_mode = "" encoding="UTF-8" diff --git a/nifi/user-scripts/utils/generic.py b/nifi/user-scripts/utils/generic.py index 867f5bd41..0f17e3bfc 100644 --- a/nifi/user-scripts/utils/generic.py +++ b/nifi/user-scripts/utils/generic.py @@ -1,5 +1,7 @@ import json +import logging import os +import traceback from collections import defaultdict from typing import Union @@ -39,3 +41,60 @@ def dict2jsonl_file(input_dict: Union[dict| defaultdict], file_path: str): json_obj = json.loads(json.dumps(o)) json.dump(json_obj, outfile, ensure_ascii=False, indent=None, separators=(',',':')) print('', file=outfile) + + +def get_logger(name: str) -> logging.Logger: + """Return a configured logger shared across all NiFi clients.""" + level_name = os.getenv("NIFI_LOG_LEVEL", "INFO").upper() + level = getattr(logging, level_name, logging.INFO) + + logger = logging.getLogger(name) + if not logger.handlers: + handler = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter( + "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", + "%Y-%m-%d %H:%M:%S", + ) + handler.setFormatter(fmt) + logger.addHandler(handler) + logger.setLevel(level) + logger.propagate = False + return logger + + +# ----------------------------------------------------------------------------------------------------------------- +# Function for handling property parsing, used in NiFi processors mainly, but can beused elsewhere +# ----------------------------------------------------------------------------------------------------------------- +def parse_value(value: str) -> str|int|float|bool: + """Convert NiFi string property values into native Python types.""" + if isinstance(value, bool): + return value + + val_str = str(value).strip() + if val_str.lower() in ("true", "false"): + return val_str.lower() == "true" + if val_str.replace(".", "", 1).isdigit(): + return float(val_str) if "." in val_str else int(val_str) + return value + + +# ----------------------------------------------------------------------------------------------------------------- +# Safe execution wrapper with consistent error logging +# ----------------------------------------------------------------------------------------------------------------- +def safe_execute(logger: logging.Logger, func, *args, **kwargs): + """ + Executes a function safely, logging errors with full traceback. + + Args: + logger (logging.Logger): Logger to write errors to. + func (Callable): Function to execute. + *args, **kwargs: Arguments passed to the function. + + Returns: + The result of func(*args, **kwargs), or None on error. + """ + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"❌ Error during execution of {func.__name__}: {e}\n{traceback.format_exc()}") + raise diff --git a/nifi/user-scripts/utils/avro_json_encoder.py b/nifi/user-scripts/utils/helpers/avro_json_encoder.py similarity index 100% rename from nifi/user-scripts/utils/avro_json_encoder.py rename to nifi/user-scripts/utils/helpers/avro_json_encoder.py diff --git a/nifi/user-scripts/utils/helpers/base_nifi_processor.py b/nifi/user-scripts/utils/helpers/base_nifi_processor.py new file mode 100644 index 000000000..986cf3ea8 --- /dev/null +++ b/nifi/user-scripts/utils/helpers/base_nifi_processor.py @@ -0,0 +1,119 @@ +import io +import json +import logging +import sys +import traceback +from logging import Logger + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import ( + ProcessContext, + PropertyDescriptor, + StandardValidators, +) +from nifiapi.relationship import Relationship +from py4j.java_gateway import JavaObject, JVMView + +# this script is using a custom utility for decompressing Cerner blobs +# from nifi/user-python-extensions/record_decompress_cerner_blob.py +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 + + +class BaseNiFiProcessor(FlowFileTransform): + """Base class providing common NiFi Python processor utilities.""" + + identifier = None + logger: Logger = Logger(__qualname__) + + + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + + class ProcessorDetails: + version = '0.0.1' + + def __init__(self, jvm: JVMView): + """ + Args: + jvm (JVMView): Required, Store if you need to use Java classes later. + """ + self.jvm = jvm + self.logger: Logger = logging.getLogger(self.__class__.__name__) + self.process_context: ProcessContext + + self.sample_property_one: bool = True + self.sample_property_two: str = "" + self.sample_property_three: str = "default_value_one" + + # this is directly mirrored to the UI + self._properties = [ + PropertyDescriptor(name="sample_property_one", + description="sample property one description", + default_value="true", + required=True, + validators=StandardValidators.BOOLEAN_VALIDATOR), + PropertyDescriptor(name="sample_property_two", + description="sample property two description", + required=False, + default_value="some_value"), + PropertyDescriptor(name="sample_property_three", + required=True, + description="sample property three description", + default_value="default_value_one", + allowable_values=["default_value_one", "default_value_two", "default_value_three"], + validators=StandardValidators.NON_EMPTY_VALIDATOR) + ] + + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships + + def getPropertyDescriptors(self) -> list[PropertyDescriptor]: + return self.descriptors + + def set_logger(self, logger: Logger): + self.logger = logger + + def set_properties(self, properties: dict) -> None: + """Populate class attributes from NiFi property map.""" + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") + + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + """ Main processor logic. This example reads Avro records from the incoming flowfile, + and writes them back out to a new flowfile. It also adds the processor properties + to the flowfile attributes. IT IS A SAMPLE ONLY, + you are meant to use this as a starting point for other processors + """ + output_contents = [] + try: + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + self.logger.info("Successfully transformed Avro content for OCR") + + return FlowFileTransformResult(relationship="success", + attributes=attributes, + contents=json.dumps(output_contents)) + except Exception as exception: + self.logger.error("Exception during Avro processing: " + traceback.format_exc()) + raise exception diff --git a/nifi/user-scripts/utils/helpers/logging.py b/nifi/user-scripts/utils/helpers/logging.py deleted file mode 100644 index eaa6a7bdd..000000000 --- a/nifi/user-scripts/utils/helpers/logging.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -import os -import sys - - -def get_logger(name: str) -> logging.Logger: - """Return a configured logger shared across all NiFi clients.""" - level_name = os.getenv("NIFI_LOG_LEVEL", "INFO").upper() - level = getattr(logging, level_name, logging.INFO) - - logger = logging.getLogger(name) - if not logger.handlers: - handler = logging.StreamHandler(sys.stdout) - fmt = logging.Formatter( - "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", - "%Y-%m-%d %H:%M:%S", - ) - handler.setFormatter(fmt) - logger.addHandler(handler) - logger.setLevel(level) - logger.propagate = False - return logger diff --git a/nifi/user-scripts/utils/helpers/nifi_api_client.py b/nifi/user-scripts/utils/helpers/nifi_api_client.py index 372d7ef06..1c353d2c1 100644 --- a/nifi/user-scripts/utils/helpers/nifi_api_client.py +++ b/nifi/user-scripts/utils/helpers/nifi_api_client.py @@ -1,5 +1,4 @@ from logging import Logger -from typing import List # noqa: UP035 from dto.nifi_api_config import NiFiAPIConfig from nipyapi import canvas, security @@ -10,7 +9,7 @@ from nipyapi.registry import ApiClient as RegistryApiClient from nipyapi.registry import BucketsApi from nipyapi.registry.configuration import Configuration as RegistryConfiguration -from utils.helpers.logging import get_logger +from utils.generic import get_logger class NiFiRegistryClient: @@ -63,7 +62,7 @@ def _login(self) -> None: def get_root_process_group_id(self) -> str: return canvas.get_root_pg_id() - def get_process_group_by_name(self, process_group_name: str) -> None | List[object] | object: + def get_process_group_by_name(self, process_group_name: str) -> None | list[object] | object: return canvas.get_process_group(process_group_name, identifier_type="nam") def get_process_group_by_id(self, process_group_id: str) -> ProcessGroupEntity: @@ -75,9 +74,9 @@ def start_process_group(self, process_group_id: str) -> bool: def stop_process_group(self, process_group_id: str) -> bool: return canvas.schedule_process_group(process_group_id, False) - def get_child_process_groups_from_parent_id(self, parent_process_group_id: str) -> List[ProcessGroupEntity]: + def get_child_process_groups_from_parent_id(self, parent_process_group_id: str) -> list[ProcessGroupEntity]: parent_pg = canvas.get_process_group(parent_process_group_id, identifier_type="id") return canvas.list_all_process_groups(parent_pg.id) - def get_all_processors_in_process_group(self, process_group_id: str) -> List[ProcessorEntity]: + def get_all_processors_in_process_group(self, process_group_id: str) -> list[ProcessorEntity]: return canvas.list_all_processors(process_group_id) diff --git a/nifi/user-scripts/utils/helpers/service.py b/nifi/user-scripts/utils/helpers/service.py new file mode 100644 index 000000000..9d4b28080 --- /dev/null +++ b/nifi/user-scripts/utils/helpers/service.py @@ -0,0 +1,33 @@ +import sys +import time + +import psycopg2 +from psycopg2 import sql + +sys.path.append("../../dto/") + +from dto.pg_config import PGConfig + + +def check_postgres(cfg: PGConfig) -> tuple[bool, float | None, str | None]: + """Return (is_healthy, latency_ms, error_detail)""" + start = time.perf_counter() + try: + conn = psycopg2.connect( + host=cfg.host, + port=cfg.port, + dbname=cfg.db, + user=cfg.user, + password=cfg.password, + connect_timeout=cfg.timeout + ) + with conn.cursor() as cur: + cur.execute(sql.SQL("SELECT 1;")) + result = cur.fetchone() + conn.close() + if result != (1,): + return False, None, f"Unexpected result: {result}" + latency = (time.perf_counter() - start) * 1000 + return True, latency, None + except Exception as e: + return False, None, str(e) From 089627e6602a89a1ececf1a0b60bb797ca43629d Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Mon, 27 Oct 2025 23:04:16 +0000 Subject: [PATCH 5/5] Nifi: added default es index template. --- .../record_decompress_cerner_blob.py | 3 +- .../elasticsearch/base_index_settings.json | 66 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 nifi/user-schemas/elasticsearch/base_index_settings.json diff --git a/nifi/user-python-extensions/record_decompress_cerner_blob.py b/nifi/user-python-extensions/record_decompress_cerner_blob.py index ffee6dbae..233c9c893 100644 --- a/nifi/user-python-extensions/record_decompress_cerner_blob.py +++ b/nifi/user-python-extensions/record_decompress_cerner_blob.py @@ -17,7 +17,8 @@ All RECORDS are expected to have the same fields, and presumably belonging to the same DOCUMENT. All the fields of these records should have the same field values, except for the binary data field. The binary data field is expected to be a base64 encoded string, which will be concatenated according to - the blob_sequence_order_field_name field, preserving the order of the blobs and composing the whole document (supposedly). + the blob_sequence_order_field_name field, preserving the order of the blobs and composing + the whole document (supposedly). The final base64 enncoded string will be decoded back to binary data, then decompressed using LZW algorithm. """ diff --git a/nifi/user-schemas/elasticsearch/base_index_settings.json b/nifi/user-schemas/elasticsearch/base_index_settings.json new file mode 100644 index 000000000..b150ec474 --- /dev/null +++ b/nifi/user-schemas/elasticsearch/base_index_settings.json @@ -0,0 +1,66 @@ +{ + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 1, + "refresh_interval": "60s" + } + }, + "mappings": { + "date_detection": true, + "dynamic_date_formats": [ + "yyyy-MM-dd HH:mm:ss", + "yyyy-MM-dd", + "epoch_millis", + "basic_date", + "date_hour", + "date_hour_minute", + "date_hour_minute_second", + "time", + "hour_minute", + "yyyy/MM/dd", + "dd/MM/yyyy", + "dd/MM/yyyy HH:mm", + "date_time", + "t_time", + "date_hour_minute_second_millis", + "basic_time", + "basic_time_no_millis", + "basic_t_time", + "hour_minute_second", + "HH:mm.ss", + "HH:mm.ssZ" + ], + "dynamic_templates": [ + { + "dates": { + "match_mapping_type": "string", + "match_pattern": "regex", + "match": "(?i).*(date|time|when|dt|dttm|timestamp|created|updated|modified|inserted|recorded|logged|entered|performed|signed|cosigned|completed|admit|discharge|visit|appointment|service|start|end|effective|expiry|validfrom|validto|close)$", + "mapping": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||basic_date||date_hour_minute_second" + } + } + }, + { + "strings_as_text": { + "match_mapping_type": "string", + "mapping": { + "type": "text", + "analyzer": "standard", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + ], + "properties": { + "id": { "type": "keyword" } + } + } +}