diff --git a/.gitignore b/.gitignore index 4bfee8275..564452318 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,9 @@ .vscode .venv .ruff_cache +venv **__pycache__ +**/venv # keys and certificates *.pem diff --git a/deploy/Makefile b/deploy/Makefile index aff95d4af..d7b2dbcda 100644 --- a/deploy/Makefile +++ b/deploy/Makefile @@ -10,6 +10,7 @@ set -a && source ./export_env_vars.sh; endef # utility commands + git-freeze-security: @../scripts/git_freeze_security.sh @@ -19,8 +20,14 @@ git-unfreeze-security: git-update-submodules: @../scripts/git_update_submodules_in_repo.sh +load-env: + $(WITH_ENV) echo "Environment variables loaded." + +show-env: + ${WITH_ENV} >/dev/null 2>&1; printenv | sort + # start services -# + start-nifi: $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow diff --git a/docs/deploy/deployment.md b/docs/deploy/deployment.md index 928da02b8..1a0bd7440 100644 --- a/docs/deploy/deployment.md +++ b/docs/deploy/deployment.md @@ -49,6 +49,92 @@ This design allows each service to be: - NiFi-specific configuration (properties, custom processors, drivers, Python scripts, etc.) is under: [`./nifi`](https://github.com/CogStack/CogStack-NiFi/tree/main/nifi/) +## 🧰 Makefile Command Overview + +A concise reference for controlling the full CogStack deployment stack (NiFi, Elasticsearch, JupyterHub, MedCAT, OCR-service, GitEA, Beats, DB, etc.). +All commands automatically load environment variables via `export_env_vars.sh`. + +--- + +### πŸ”§ Utilities + +| Command | Description | +|------------------------|---------------------------------------------| +| `make load-env` | Load all environment variables | +| `make show-env` | Print environment variables (sorted) | +| `make git-freeze-security` | Freeze all security submodules (read-only) | +| `make git-unfreeze-security` | Unfreeze security submodules | +| `make git-update-submodules` | Update all submodules | + +--- + +### πŸš€ Start Services + +| Command | Description | +|---------------------------------|-------------| +| `make start-nifi` | Start NiFi, NiFi-Nginx, NiFi Registry | +| `make start-elastic` | Start ES-1, ES-2, Kibana | +| `make start-elastic-cluster` | Start ES-1, ES-2, ES-3 | +| `make start-elastic-1/2/3` | Start individual Elasticsearch nodes | +| `make start-metricbeat-1/2/3` | Start Metricbeat agents | +| `make start-filebeat-1/2/3` | Start Filebeat agents | +| `make start-kibana` | Start Kibana only | +| `make start-samples` | Start samples DB | +| `make start-jupyter` | Start JupyterHub (prod config) | +| `make start-medcat-service` | Start MedCAT service | +| `make start-medcat-service-deid`| Start DE-ID MedCAT service | +| `make start-medcat-trainer` | Start MedCAT Trainer + Solr + Nginx | +| `make start-ocr-services` | Start OCR-service (full + text-only) | +| `make start-git-ea` | Start GitEA | +| `make start-production-db` | Start Databank DB | +| **`make start-data-infra`** | Start NiFi + Elastic + Samples DB | +| **`make start-all`** | Full stack: data infra + NLP + JupyterHub + OCR | + +--- + +### πŸ›‘ Stop Services + +| Command | Description | +|---------------------------------|-------------| +| `make stop-nifi` | Stop NiFi stack | +| `make stop-elastic` | Stop ES-1, ES-2, Kibana | +| `make stop-elastic-cluster` | Stop ES-1, ES-2 | +| `make stop-elastic-1/2/3` | Stop individual ES nodes | +| `make stop-metricbeat-1/2/3` | Stop Metricbeat agents | +| `make stop-filebeat-1/2/3` | Stop Filebeat agents | +| `make stop-kibana` | Stop Kibana | +| `make stop-samples` | Stop samples DB | +| `make stop-jupyter` | Stop JupyterHub | +| `make stop-medcat-service` | Stop MedCAT service | +| `make stop-medcat-service-deid` | Stop DE-ID MedCAT service | +| `make stop-medcat-trainer` | Stop MedCAT Trainer stack | +| `make stop-ocr-services` | Stop OCR-service stack | +| `make stop-git-ea` | Stop GitEA | +| `make stop-production-db` | Stop Databank DB | +| **`make stop-data-infra`** | Stop NiFi + Elastic + Samples | +| **`make stop-all`** | Stop entire stack | + +--- + +### 🧹 Cleanup + +| Command | Description | +|------------------|---------------------------------------------| +| `make down-all` | Docker Compose `down` for all core services | +| `make cleanup` | Full teardown, including volumes | + +--- + +### πŸ“ Notes + +- All `start-*` commands use `docker compose -f services.yml` unless referencing a specific service’s Dockerfile. +- `start-all` and `stop-all` act as the top-level orchestration entry points. +- Environment variables are **always sourced** using the integrated `WITH_ENV` macro. + +--- + +If you want, I can also generate a **minimal cheat sheet**, or an **ASCII tree diagram** that shows how `start-all` expands into all services. + ## πŸš€ Starting the Services All core services defined in `services.yml` can be started using the Makefile in the `deploy/` directory. @@ -69,7 +155,7 @@ This is useful for: --- -#### 🧩 Core NiFi Services +### 🧩 Core NiFi Services ```bash make start-nifi @@ -105,6 +191,8 @@ Ideal for running ingestion pipelines and ETL workflows. #### πŸ›’οΈ Elasticsearch / OpenSearch Services +Please note that to switch from OpenSearch (Amazon open-source fork) to ElasticSearch you will need to change some environment variables, see the [configuration](./configuration.md) section. + ```bash make start-elastic ``` @@ -137,7 +225,7 @@ Starts Kibana for inspecting logs, checking index mappings, monitoring ES health --- -#### πŸ—„οΈ Databases +### πŸ—„οΈ Databases ```bash make start-samples @@ -155,7 +243,7 @@ Use when testing SQL ingestion or verifying DB-driven NiFi flows. --- -#### πŸ“š JupyterHub +### πŸ“š JupyterHub ```bash make start-jupyter @@ -165,7 +253,7 @@ Starts the CogStack JupyterHub instance. Used for notebooks, analysis, model tes --- -#### 🧠 NLP Services (MedCAT & Trainer) +### 🧠 NLP Services (MedCAT Service & Trainer) ```bash make start-medcat-service @@ -187,7 +275,7 @@ Starts the full MedCAT Trainer stack (Trainer UI + Solr + NGINX). Useful for ann --- -#### πŸ“ OCR Services +### πŸ“ OCR Services ```bash make start-ocr-services @@ -202,7 +290,7 @@ Use for PDF ingestion, OCR debugging, and pipeline validation. --- -#### πŸ› οΈ Miscellaneous Services (GIT EA)' +### πŸ› οΈ Miscellaneous Services (GIT EA) ```bash make start-git-ea diff --git a/docs/deploy/main.md b/docs/deploy/main.md index e10c9eda3..5ad25f106 100755 --- a/docs/deploy/main.md +++ b/docs/deploy/main.md @@ -43,7 +43,7 @@ Execute the following commands in the root directory of the repo: 1. `git-lfs pull` 2. (OPTIONAL, if you already have the software in [this section installed](#-software-requirements-linuxmacos))`sudo bash ./scripts/installation_utils/install_docker_and_utils.sh` , and wait for it to finish, it may take a while to get all the packages.. -3. `sudo bash ./scripts/git_update_submodules_in_repo.sh` +3. `cd deploy && git-update-submodules` 4. check that docker works correctly : `docker pull hello-world` 5. if no errors, run: `docker run --rm hello-world`, it should run without issues 6. if there are any issues check the below warning section @@ -54,7 +54,7 @@ IMPORTANT NOTE: Do a `git-lfs pull` so that you have everything downloaded from :::{warning} Ensure all Git submodules are initialized and updated: -`sudo bash ./scripts/git_update_submodules_in_repo.sh` +`cd deploy && git-update-submodules` ::: :::{warning} diff --git a/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py b/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py index 01a3fdf57..0558dbbd4 100644 --- a/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py +++ b/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py @@ -1,33 +1,44 @@ +import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + import base64 import copy import io import json -import sys import traceback -from logging import Logger from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter from avro.schema import RecordSchema, Schema, parse -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, StandardValidators, ) from nifiapi.relationship import Relationship +from overrides import override from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.base_nifi_processor import BaseNiFiProcessor -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") -from utils.generic import parse_value # noqa: I001,E402 +class ConvertAvroBinaryRecordFieldToBase64(BaseNiFiProcessor): + """NiFi Python processor to convert a binary field in Avro records to base64-encoded string. + Reads each FlowFile as Avro, locates the configured binary_field_name, and rewrites the Avro schema, + so that field becomes a nullable string, preventing NiFi’s JSON + converters from turning raw bytes into integer arrays. -class ConvertAvroBinaryRecordFieldToBase64(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) + Streams every record through a new Avro writer, base64-encoding the binary payload when operation_mode=base64 + (or leaving bytes untouched for raw), then reattaching the remaining fields + so downstream processors still see the original record structure. + Emits the updated Avro binary along success with attributes capturing document ID field, + binary field, mode, and MIME type application/avro-binary; + + Exception routes to failure after logging a stack trace. + """ class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] @@ -36,11 +47,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.operation_mode: str = "base64" self.binary_field_name: str = "binarydoc" @@ -78,31 +85,9 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - - def getRelationships(self) -> list[Relationship]: - return self.relationships - - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict) -> None: - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in properties.items(): - name = k.name if hasattr(k, "name") else str(k) - val = parse_value(v) - if hasattr(self, name): - setattr(self, name, val) - self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + @override + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Transforms an Avro flow file by converting a specified binary field to a base64-encoded string. @@ -115,14 +100,15 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Exception: For any other errors during Avro processing. Returns: - FlowFileTransformResult: The result containing the transformed flow file, updated attributes, and relationship. + FlowFileTransformResult: The result containing the transformed flow file, updated attributes, + and relationship. """ try: self.process_context = context self.set_properties(context.getProperties()) # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) @@ -171,8 +157,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr writer.flush() output_byte_buffer.seek(0) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) attributes["binary_field"] = str(self.binary_field_name) attributes["operation_mode"] = str(self.operation_mode) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 649be8316..6cc672204 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -1,11 +1,13 @@ -import json import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + +import json import traceback from collections import defaultdict -from logging import Logger from typing import Any -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, @@ -13,17 +15,23 @@ ) from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.base_nifi_processor import BaseNiFiProcessor -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") -from utils.generic import parse_value # noqa: I001,E402 +class ConvertJsonRecordSchema(BaseNiFiProcessor): + """Remaps each incoming JSON record (single dict or list of dicts) + using a lookup loaded from json_mapper_schema_path, + so the FlowFile content conforms to the common schema defined under /opt/nifi/user-schemas/json. + For every mapping entry it can rename fields, populate constant null placeholders, + or stitch together composite fields by concatenating multiple source values with newline separators. -class ConvertJsonRecordSchema(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) + Optionally preserves any source fields not covered by the mapping via the + preserve_non_mapped_fields boolean property, which defaults to true to avoid accidental data loss. + Emits the transformed payload as JSON (mime.type=application/json) and tags the FlowFile with the schema path used, + routing successes to success and any exceptions to failure + """ class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] @@ -32,11 +40,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.json_mapper_schema_path: str = "/opt/nifi/user-schemas/json/cogstack_common_schema_mapping.json" self.preserve_non_mapped_fields: bool = True @@ -71,29 +75,6 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - - def getRelationships(self) -> list[Relationship]: - return self.relationships - - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict) -> None: - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in properties.items(): - name = k.name if hasattr(k, "name") else str(k) - val = parse_value(v) - if hasattr(self, name): - setattr(self, name, val) - self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def map_record(self, record: dict, json_mapper_schema: dict) -> dict: """ @@ -141,14 +122,15 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: return new_record - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: output_contents: list[dict[Any, Any]] = [] + try: self.process_context: ProcessContext = context self.set_properties(context.getProperties()) # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) if isinstance(records, dict): @@ -161,8 +143,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr for record in records: output_contents.append(self.map_record(record, json_mapper_schema)) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path) attributes["mime.type"] = "application/json" diff --git a/nifi/user-python-extensions/parse_service_response.py b/nifi/user-python-extensions/parse_service_response.py index 7030c1bc8..51c7995dc 100644 --- a/nifi/user-python-extensions/parse_service_response.py +++ b/nifi/user-python-extensions/parse_service_response.py @@ -1,27 +1,29 @@ -import json import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + +import json import traceback -from logging import Logger -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, StandardValidators, ) from nifiapi.relationship import Relationship +from overrides import override from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.base_nifi_processor import BaseNiFiProcessor -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") -from utils.generic import parse_value # noqa: I001,E402 - - -class ParseCogStackServiceResult(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) +class ParseCogStackServiceResult(BaseNiFiProcessor): + """ Normalises JSON responses from CogStack OCR or MedCAT services, reading each FlowFile, + coercing single objects to lists. + Exposes configurable properties for output text field name, service message type, + document ID/text fields, and MedCAT DEID behaviour so the same processor can be reused across services. + """ class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] @@ -30,11 +32,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.output_text_field_name: str = "text" self.service_message_type: str = "ocr" @@ -46,12 +44,14 @@ def __init__(self, jvm: JVMView): # this is directly mirrored to the UI self._properties = [ PropertyDescriptor(name="output_text_field_name", - description="field to store OCR output text, this can also be used in MedCAT output in DE_ID mode", + description="field to store OCR output text, this can also be used" + " in MedCAT output in DE_ID mode", default_value="text", required=True, validators=[StandardValidators.NON_EMPTY_VALIDATOR]), PropertyDescriptor(name="service_message_type", - description="the type of service message form this script processes, possible values: not_set | medcat | ocr", + description="the type of service message form this script processes," \ + " possible values: not_set | medcat | ocr", default_value="not_set", required=True, allowable_values=["ocr", "medcat", "not_set"]), @@ -66,14 +66,17 @@ def __init__(self, jvm: JVMView): required=True, default_value="text"), PropertyDescriptor(name="medcat_output_mode", - description="service_message_type is set to 'medcat' for this to work, only used for deid processing," - " if the output is for deid, then we can customise the name of the text field, possible values: deid | not_set", + description="service_message_type is set to 'medcat' \ + for this to work, only used for deid processing," + " if the output is for deid, then we can customise the" \ + " name of the text field, possible values: deid | not_set", default_value="not_set", required=True, allowable_values=["deid", "not_set"], ), PropertyDescriptor(name="medcat_deid_keep_annotations", - description="if set to true, then the annotations will be kept in the output with the text field", + description="if set to true, " \ + "then the annotations will be kept in the output with the text field", required=True, default_value="true", allowable_values=["true", "false"], @@ -95,30 +98,8 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - def getRelationships(self) -> list[Relationship]: - return self.relationships - - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict) -> None: - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in properties.items(): - name = k.name if hasattr(k, "name") else str(k) - val = parse_value(v) - if hasattr(self, name): - setattr(self, name, val) - self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") - - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + @override + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Transforms the input FlowFile by parsing the service response and extracting relevant fields. @@ -132,13 +113,15 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Returns: FlowFileTransformResult: The result containing the transformed contents and updated attributes. """ - output_contents = [] + + output_contents: list = [] + try: self.process_context: ProcessContext = context self.set_properties(context.getProperties()) # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) @@ -155,14 +138,13 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr _record["success"] = result.get("success", False) _record["timestamp"] = result.get("timestamp", None) - if "footer" in result.keys(): + if "footer" in result: for k, v in result["footer"].items(): _record[k] = v output_contents.append(_record) - elif self.service_message_type == "medcat": - if "result" in records[0].keys(): + elif self.service_message_type == "medcat" and "result" in records[0]: result = records[0].get("result", []) medcat_info = records[0].get("medcat_info", {}) @@ -190,7 +172,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents.append(_output_annotated_record) else: - for annotation_id, annotation_data in annotations: + for annotation_id, annotation_data in annotations.items(): _output_annotated_record = {} _output_annotated_record["service_model"] = medcat_info _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) @@ -201,14 +183,14 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr for k, v in footer.items(): _output_annotated_record[k] = v - if self.document_id_field_name in footer.keys(): + if self.document_id_field_name in footer: _output_annotated_record["annotation_id"] = \ str(footer[self.document_id_field_name]) + "_" + str(annotation_id) output_contents.append(_output_annotated_record) # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["output_text_field_name"] = str(self.output_text_field_name) attributes["mime.type"] = "application/json" diff --git a/nifi/user-python-extensions/prepare_record_for_nlp.py b/nifi/user-python-extensions/prepare_record_for_nlp.py index 99157745c..0f36f7069 100644 --- a/nifi/user-python-extensions/prepare_record_for_nlp.py +++ b/nifi/user-python-extensions/prepare_record_for_nlp.py @@ -1,24 +1,26 @@ +import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + import io import json import traceback -from logging import Logger from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, StandardValidators, ) +from overrides import override from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.base_nifi_processor import BaseNiFiProcessor -class PrepareRecordForNlp(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) - +class PrepareRecordForNlp(BaseNiFiProcessor): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] @@ -27,11 +29,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.document_text_field_name: str = "text" self.document_id_field_name : str = "id" @@ -60,26 +58,8 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict): - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) - - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + @override + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """_summary_ Args: @@ -93,7 +73,9 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Returns: FlowFileTransformResult: _description_ """ - output_contents = [] + + output_contents: list = [] + try: self.process_context = context self.set_properties(context.getProperties()) @@ -101,7 +83,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr self.process_flow_file_type = str(self.process_flow_file_type).lower() # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] @@ -128,12 +110,12 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if isinstance(reader, DataFileReader): reader.close() - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) attributes["mime.type"] = "application/json" output_contents = output_contents[0] if len(output_contents) == 1 else output_contents + return FlowFileTransformResult(relationship="success", attributes=attributes, contents=json.dumps({"content": output_contents}).encode("utf-8")) diff --git a/nifi/user-python-extensions/prepare_record_for_ocr.py b/nifi/user-python-extensions/prepare_record_for_ocr.py index 01a5924a2..0e29fa77d 100644 --- a/nifi/user-python-extensions/prepare_record_for_ocr.py +++ b/nifi/user-python-extensions/prepare_record_for_ocr.py @@ -1,33 +1,30 @@ +import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + import base64 import io import json import sys import traceback -from logging import Logger from typing import Any, Union from avro.datafile import DataFileReader from avro.io import DatumReader -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, StandardValidators, ) from nifiapi.relationship import Relationship +from overrides import override from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.avro_json_encoder import AvroJSONEncoder +from utils.helpers.base_nifi_processor import BaseNiFiProcessor -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") - -from utils.helpers.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 -from utils.generic import parse_value # noqa: I001,E402 - - -class PrepareRecordForOcr(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) +class PrepareRecordForOcr(BaseNiFiProcessor): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] @@ -36,11 +33,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.operation_mode: str = "base64" self.binary_field_name: str = "binarydoc" @@ -89,31 +82,11 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - def getRelationships(self) -> list[Relationship]: - return self.relationships - - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict) -> None: - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ + @override + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: - for k, v in properties.items(): - name = k.name if hasattr(k, "name") else str(k) - val = parse_value(v) - if hasattr(self, name): - setattr(self, name, val) - self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") + output_contents: list = [] - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore - output_contents = [] try: self.process_context = context self.set_properties(context.getProperties()) @@ -121,7 +94,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr self.process_flow_file_type = str(self.process_flow_file_type).lower() # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) reader: Union[DataFileReader, list[dict[str, Any]] | list[Any]] @@ -153,8 +126,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if isinstance(reader, DataFileReader): reader.close() - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) attributes["binary_field"] = str(self.binary_field_name) attributes["output_text_field_name"] = str(self.output_text_field_name) diff --git a/nifi/user-python-extensions/record_add_geolocation.py b/nifi/user-python-extensions/record_add_geolocation.py new file mode 100644 index 000000000..b29218ec6 --- /dev/null +++ b/nifi/user-python-extensions/record_add_geolocation.py @@ -0,0 +1,247 @@ +import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + +import csv +import json +import os +import shutil +import traceback +from zipfile import ZipFile + +from nifiapi.flowfiletransform import FlowFileTransformResult +from nifiapi.properties import ( + ProcessContext, + PropertyDescriptor, + StandardValidators, +) +from overrides import override +from py4j.java_gateway import JavaObject, JVMView +from utils.generic import download_file_from_url, safe_delete_paths +from utils.helpers.base_nifi_processor import BaseNiFiProcessor + + +class JsonRecordAddGeolocation(BaseNiFiProcessor): + """NiFi Python processor to add geolocation data to JSON records based on postcode lookup. + We use https://www.getthedata.com/open-postcode-geo for geolocation. + The schema of the file used is available at: https://www.getthedata.com/open-postcode-geo + | Field | Possible Values + |-------------------------------|--------------------------------------------------------------------- + | postcode | [outcode][space][incode] + | status | live
terminated + | usertype | small
large + | easting | int
NULL + | northing | int
NULL + | positional_quality_indicator | int + | country | England,Wales,Scotland,Northern Ireland,Channel Islands,Isle of Man + | latitude | decimal + | longitude | decimal + | postcode_no_space | [outcode][incode] + | postcode_fixed_width_seven | *See comments* + | postcode_fixed_width_eight | *See comments* + | postcode_area | [A-Z]{1,2} + | postcode_district | [outcode] + | postcode_sector | [outcode][space][number] + | outcode | [outcode] + | incode | [incode] + + """ + + class Java: + implements = ["org.apache.nifi.python.processor.FlowFileTransform"] + + class ProcessorDetails: + version = '0.0.1' + + def __init__(self, jvm: JVMView): + super().__init__(jvm) + + self.lookup_datafile_url: str = "https://download.getthedata.com/downloads/open_postcode_geo.csv.zip" + self.lookup_datafile_path: str = "/opt/nifi/user-scripts/db/open_postcode_geo.csv" + self.postcode_field_name: str = "address_postcode" + self.geolocation_field_name: str = "address_geolocation" + + self.loaded_csv_file_rows: list[list] = [] + self.postcode_lookup_index: dict[str, int] = {} + + self._properties: list[PropertyDescriptor] = [ + PropertyDescriptor(name="lookup_datafile_url", + description="specify the URL for the geolocation lookup datafile zip", + default_value="https://download.getthedata.com/downloads/open" \ + "_postcode_geo.csv.zip", + required=True, + validators=[StandardValidators.URL_VALIDATOR]), + PropertyDescriptor(name="lookup_datafile_path", + description="specify the local path for the geolocation lookup datafile csv", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + default_value="/opt/nifi/user-scripts/db/open_postcode_geo.csv"), + PropertyDescriptor(name="postcode_field_name", + description="postcode field name in the records", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + default_value="address_postcode"), + PropertyDescriptor(name="geolocation_field_name", + description="new field to store the geolocation coords," + " if it is not present it will be created in each record", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + default_value="address_postcode") + ] + + self.descriptors: list[PropertyDescriptor] = self._properties + + @override + def onScheduled(self, context: ProcessContext) -> None: + """ Initializes processor resources when scheduled. + Args: + context (ProcessContext): The process context. + This argument is required by the NiFi framework. + """ + + self.logger.debug("onScheduled() called β€” initializing processor resources") + + if self._check_geolocation_lookup_datafile(): + with open(self.lookup_datafile_path) as csv_file: + csv_reader = csv.reader(csv_file) + self.loaded_csv_file_rows = [row for row in csv_reader] + + self.postcode_lookup_index = {val[9]: idx + for idx, val in enumerate(self.loaded_csv_file_rows)} + + def _check_geolocation_lookup_datafile(self) -> bool: + """ Downloads the geolookup csv file for UK postcodes. + + Raises: + e: file not found + + Returns: + bool: file exists or not + """ + + base_output_extract_dir_path: str = "/opt/nifi/user-scripts/db" + output_extract_dir_path: str = os.path.join(base_output_extract_dir_path, "open_postcode_geo") + output_download_path: str = os.path.join(base_output_extract_dir_path, "open_postcode_geo.zip") + datafile_csv_initial_path: str = os.path.join(output_extract_dir_path, "open_postcode_geo.csv") + file_found: bool = False + + if os.path.exists(self.lookup_datafile_path): + self.logger.info(f"geolocation lookup datafile already exists at {self.lookup_datafile_path}") + file_found = True + else: + try: + if os.path.exists(output_download_path) is False and os.path.isfile(self.lookup_datafile_path) is False: + download_file_from_url(self.lookup_datafile_url, output_download_path, ssl_verify=False) + self.logger.debug(f"downloaded geolocation lookup datafile to {self.lookup_datafile_path}") + + if output_download_path.endswith('.zip'): + with ZipFile(output_download_path, 'r') as zip_ref: + zip_ref.extractall(output_extract_dir_path) + self.logger.debug(f"extracted geolocation lookup datafile to {output_extract_dir_path}") + else: + self.logger.debug(f"file {self.lookup_datafile_path} already exists.... skipping download") + + if os.path.exists(datafile_csv_initial_path) and datafile_csv_initial_path != self.lookup_datafile_path: + self.logger.debug(f"geolocation lookup datafile found at {self.lookup_datafile_path} \ + after extraction") + shutil.copy2(datafile_csv_initial_path, self.lookup_datafile_path) + self.logger.debug(f"copied geolocation lookup datafile to {self.lookup_datafile_path}") + file_found = True + + except Exception as e: + self.logger.error(f"failed to download geolocation lookup datafile: {str(e)}") + traceback.print_exc() + raise e + + # cleanup downloaded files + safe_delete_paths([output_download_path, output_extract_dir_path]) + + return file_found + + @override + def transform(self, context: ProcessContext, flowFile: JavaObject) -> list[FlowFileTransformResult]: + """ Transforms the input FlowFile by adding geolocation data based on postcode lookup. + Args: + context (ProcessContext): The process context. + flowFile (JavaObject): The input FlowFile to be transformed. + Returns: + FlowFileTransformResult: The result of the transformation, including updated attributes and contents. + Raises: + Exception: If any error occurs during processing. + + NOTE: the input json should be small enough to fit into memory otherwise it might cause memory issues, + keep it < 20MB, under 20k records (depending on record size). + Use SplitRecord processor to split large files into smaller chunks before processing. + """ + + try: + self.process_context: ProcessContext = context + self.set_properties(context.getProperties()) + + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + + valid_records: list[dict] = [] + error_records: list[dict] = [] + + if isinstance(records, dict): + records = [records] + + if self.postcode_lookup_index: + for record in records: + if self.postcode_field_name in record: + _postcode = str(record[self.postcode_field_name]).replace(" ", "") + _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1) + + if _data_col_row_idx != -1: + _selected_row = self.loaded_csv_file_rows[_data_col_row_idx] + _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip() + try: + record[self.geolocation_field_name] = { + "lat": float(_lat), + "lon": float(_long) + } + except ValueError: + self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}") + error_records.append(record) + valid_records.append(record) + else: + raise FileNotFoundError("geolocation lookup datafile is not available and data was not loaded, " \ + "please check URLs") + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["mime.type"] = "application/json" + + results: list[FlowFileTransformResult] = [] + + if valid_records: + results.append( + FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(valid_records).encode("utf-8"), + ) + ) + + if error_records: + error_attrs = attributes.copy() + error_attrs["record.count.errors"] = str(len(error_records)) + results.append( + FlowFileTransformResult( + relationship="failure", + attributes=error_attrs, + contents=json.dumps(error_records).encode("utf-8"), + ) + ) + + return results + except Exception: + self.logger.error("Exception during flowfile processing:\n" + traceback.format_exc()) + return [ + FlowFileTransformResult( + relationship="failure", + contents=flowFile.getContentsAsBytes(), + attributes={"exception": "unhandled processing error"}, + ) + ] diff --git a/nifi/user-python-extensions/record_decompress_cerner_blob.py b/nifi/user-python-extensions/record_decompress_cerner_blob.py index 233c9c893..b94bd05bd 100644 --- a/nifi/user-python-extensions/record_decompress_cerner_blob.py +++ b/nifi/user-python-extensions/record_decompress_cerner_blob.py @@ -1,38 +1,34 @@ +import sys + +sys.path.insert(0, "/opt/nifi/user-scripts") + import base64 import json import sys import traceback -from logging import Logger -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( ProcessContext, PropertyDescriptor, StandardValidators, ) from py4j.java_gateway import JavaObject, JVMView +from utils.cerner_blob import DecompressLzwCernerBlob +from utils.helpers.base_nifi_processor import BaseNiFiProcessor + -""" This script decompresses Cerner LZW compressed blobs from a JSON input stream. +class JsonRecordDecompressCernerBlob(BaseNiFiProcessor): + """ This script decompresses Cerner LZW compressed blobs from a JSON input stream. It expects a JSON array of records, each containing a field with the binary data. All RECORDS are expected to have the same fields, and presumably belonging to the same DOCUMENT. All the fields of these records should have the same field values, except for the binary data field. The binary data field is expected to be a base64 encoded string, which will be concatenated according to the blob_sequence_order_field_name field, preserving the order of the blobs and composing the whole document (supposedly). - The final base64 enncoded string will be decoded back to binary data, then decompressed using LZW algorithm. -""" + The final base64 encoded string will be decoded back to binary data, then decompressed using LZW algorithm. -# this script is using a custom utility for decompressing Cerner blobs -# from nifi/user-python-extensions/record_decompress_cerner_blob.py -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") - -from utils.cerner_blob import DecompressLzwCernerBlob # noqa: I001,E402 - - -class JsonRecordDecompressCernerBlob(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) + """ class Java: @@ -42,11 +38,7 @@ class ProcessorDetails: version = '0.0.1' def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm + super().__init__(jvm) self.operation_mode: str = "base64" self.binary_field_name: str = "binarydoc" @@ -104,25 +96,7 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict): - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) - - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Transforms the input FlowFile by decompressing Cerner blob data from JSON records. @@ -136,13 +110,15 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Raises: Exception: If any error occurs during processing or decompression. """ - output_contents = [] + + output_contents: list = [] + try: self.process_context = context self.set_properties(context.getProperties()) # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore + input_raw_bytes: bytes = flowFile.getContentsAsBytes() records = [] @@ -167,43 +143,43 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_merged_record = {} for record in records: - if self.blob_sequence_order_field_name in record.keys(): - concatenated_blob_sequence_order[int(record[self.blob_sequence_order_field_name])] = record[self.binary_field_name] + if self.blob_sequence_order_field_name in record: + concatenated_blob_sequence_order[int(record[self.blob_sequence_order_field_name])] = \ + record[self.binary_field_name] # take fields from the first record, doesn't matter which one, # as they are expected to be the same except for the binary data field for k, v in records[0].items(): - if k not in output_merged_record.keys() and k != self.binary_field_name: + if k not in output_merged_record and k != self.binary_field_name: output_merged_record[k] = v output_merged_record[self.binary_field_name] = b"" full_compressed_blob = bytearray() - for k, v in concatenated_blob_sequence_order.items(): + for k in sorted(concatenated_blob_sequence_order.keys()): + v = concatenated_blob_sequence_order[k] try: temporary_blob = v if self.binary_field_source_encoding == "base64": - temporary_blob: bytes = base64.b64decode(temporary_blob) + temporary_blob = base64.b64decode(temporary_blob) full_compressed_blob.extend(temporary_blob) except Exception as e: self.logger.error(f"Error decoding b64 blob part {k}: {str(e)}") try: decompress_blob = DecompressLzwCernerBlob() - decompress_blob.decompress(full_compressed_blob) # type: ignore + decompress_blob.decompress(full_compressed_blob) output_merged_record[self.binary_field_name] = decompress_blob.output_stream except Exception as exception: self.logger.error(f"Error decompressing cerner blob: {str(exception)} \n") - if self.output_mode == "base64": output_merged_record[self.binary_field_name] = \ base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset) output_contents.append(output_merged_record) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) attributes["binary_field"] = str(self.binary_field_name) attributes["output_text_field_name"] = str(self.output_text_field_name) diff --git a/nifi/user-python-extensions/sample_processor.py b/nifi/user-python-extensions/sample_processor.py new file mode 100644 index 000000000..ddee7e475 --- /dev/null +++ b/nifi/user-python-extensions/sample_processor.py @@ -0,0 +1,166 @@ +import sys +from typing import Any + +sys.path.insert(0, "/opt/nifi/user-scripts") + +import io +import json +import traceback +from logging import Logger + +from avro.datafile import DataFileReader, DataFileWriter +from avro.io import DatumReader, DatumWriter +from avro.schema import Schema +from nifiapi.flowfiletransform import FlowFileTransformResult +from nifiapi.properties import ( + ProcessContext, + PropertyDescriptor, + StandardValidators, +) +from nifiapi.relationship import Relationship +from py4j.java_gateway import JavaObject, JVMView +from utils.helpers.base_nifi_processor import BaseNiFiProcessor + + +class SampleTestProcessor(BaseNiFiProcessor): + + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + + class ProcessorDetails: + version = '0.0.1' + + def __init__(self, jvm: JVMView): + """ + Args: + jvm (JVMView): Required, Store if you need to use Java classes later. + """ + super().__init__(jvm) + + + # Example properties β€” meant to be overridden or extended in subclasses + self.sample_property_one: bool = True + self.sample_property_two: str = "" + self.sample_property_three: str = "default_value_one" + + # this is directly mirrored to the UI + self._properties = [ + PropertyDescriptor(name="sample_property_one", + description="sample property one description", + default_value="true", + required=True, + validators=StandardValidators.BOOLEAN_VALIDATOR), + PropertyDescriptor(name="sample_property_two", + description="sample property two description", + required=False, + default_value="some_value"), + PropertyDescriptor(name="sample_property_three", + required=True, + description="sample property three description", + default_value="default_value_one", + allowable_values=["default_value_one", "default_value_two", "default_value_three"], + validators=StandardValidators.NON_EMPTY_VALIDATOR) + ] + + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def onScheduled(self, context: ProcessContext) -> None: + """ + Called automatically by NiFi once when the processor is scheduled to run + (i.e., enabled or started). This method is used for initializing and + allocating resources that should persist across multiple FlowFile + executions. + + Typical use cases include: + - Loading static data from disk (e.g., CSV lookup tables, configuration files) + - Establishing external connections (e.g., databases, APIs) + - Building in-memory caches or models used by onTrigger/transform() + + The resources created here remain in memory for the lifetime of the + processor and are shared by all concurrent FlowFile executions on this + node. They should be lightweight and thread-safe. To release or clean up + such resources, use the @OnStopped method, which NiFi calls when the + processor is disabled or stopped. + """ + pass + + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + + """ + NOTE: This is a sample method meant to be overridden and reimplemented by subclasses. + + Main processor logic. This example reads Avro records from the incoming flowfile, + and writes them back out to a new flowfile. It also adds the processor properties + to the flowfile attributes. IT IS A SAMPLE ONLY, + you are meant to use this as a starting point for other processors + + Args: + context (ProcessContext): The process context containing processor properties. + flowFile (JavaObject): The FlowFile object containing the input data. + + Raises: + Exception: If any error occurs during processing. + + Returns: + FlowFileTransformResult: The result containing the transformed contents and updated attributes. + """ + + output_contents: list[Any] = [] + + try: + self.process_context: ProcessContext = context + self.set_properties(context.getProperties()) + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + self.logger.info("Successfully transformed Avro content for OCR") + + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + # read avro record + self.logger.debug("Reading flowfile content as bytes") + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) + + # below is an example of how to handle avro records, each record + schema: Schema | None = reader.datum_reader.writers_schema + + for record in reader: + #do stuff + pass + + # streams need to be closed + input_byte_buffer.close() + reader.close() + + # Write them to a binary avro stre + output_byte_buffer = io.BytesIO() + writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) + + writer.flush() + writer.close() + output_byte_buffer.seek(0) + + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["sample_property_one"] = str(self.sample_property_one) + attributes["sample_property_two"] = str(self.sample_property_two) + attributes["sample_property_three"] = str(self.sample_property_three) + + return FlowFileTransformResult(relationship="success", + attributes=attributes, + contents=json.dumps(output_contents)) + except Exception as exception: + self.logger.error("Exception during Avro processing: " + traceback.format_exc()) + raise exception diff --git a/nifi/user-python-extensions/test_processor.py b/nifi/user-python-extensions/test_processor.py deleted file mode 100644 index b8e4b4618..000000000 --- a/nifi/user-python-extensions/test_processor.py +++ /dev/null @@ -1,125 +0,0 @@ -import io -import json -import traceback -from logging import Logger - -from avro.datafile import DataFileReader -from avro.io import DatumReader -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult -from nifiapi.properties import ( - ProcessContext, - PropertyDescriptor, - StandardValidators, -) -from py4j.java_gateway import JavaObject, JVMView - - -class SampleTestProcessor(FlowFileTransform): - identifier = None - logger: Logger = Logger(__qualname__) - - - class Java: - implements = ['org.apache.nifi.python.processor.FlowFileTransform'] - - class ProcessorDetails: - version = '0.0.1' - - def __init__(self, jvm: JVMView): - """ - Args: - jvm (JVMView): Required, Store if you need to use Java classes later. - """ - self.jvm = jvm - - self.sample_property_one: bool = True - self.sample_property_two: str = "" - self.sample_property_three: str = "default_value_one" - - # this is directly mirrored to the UI - self._properties = [ - PropertyDescriptor(name="sample_property_one", - description="sample property one description", - default_value="true", - required=True, - validators=StandardValidators.BOOLEAN_VALIDATOR), - PropertyDescriptor(name="sample_property_two", - description="sample property two description", - required=False, - default_value="some_value"), - PropertyDescriptor(name="sample_property_three", - required=True, - description="sample property three description", - default_value="default_value_one", - allowable_values=["default_value_one", "default_value_two", "default_value_three"], - validators=StandardValidators.NON_EMPTY_VALIDATOR) - ] - - self.descriptors: list[PropertyDescriptor] = self._properties - - def getPropertyDescriptors(self) -> list[PropertyDescriptor]: - return self.descriptors - - def set_logger(self, logger: Logger): - self.logger = logger - - def set_properties(self, properties: dict): - """ Gets the properties from the processor's context and sets them as instance variables. - - Args: - properties (dict): dictionary containing property names and values. - """ - - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) - - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore - """ Main processor logic. This example reads Avro records from the incoming flowfile, - and writes them back out to a new flowfile. It also adds the processor properties - to the flowfile attributes. IT IS A SAMPLE ONLY, - you are meant to use this as a starting point for other processors - """ - output_contents = [] - try: - self.process_context = context - self.set_properties(context.getProperties()) - - # read avro record - input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore - self.logger.debug("Reading flowfile content as bytes") - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) - - # below is an example of how to handle avro records, each records - # schema: Schema | None = reader.datum_reader.writers_schema - # for record in reader: - # do stuff - - input_byte_buffer.close() - reader.close() - - # Write them to a binary avro stream - # output_byte_buffer = io.BytesIO() - # writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) - - # writer.flush() - # writer.close() - # output_byte_buffer.seek(0) - - - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore - attributes["sample_property_one"] = str(self.sample_property_one) - attributes["sample_property_two"] = str(self.sample_property_two) - attributes["sample_property_three"] = str(self.sample_property_three) - - self.logger.info("Successfully transformed Avro content for OCR") - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents)) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception diff --git a/nifi/user-scripts/utils/generic.py b/nifi/user-scripts/utils/generic.py index 270c93a66..5d5be2ead 100644 --- a/nifi/user-scripts/utils/generic.py +++ b/nifi/user-scripts/utils/generic.py @@ -1,11 +1,46 @@ import json import logging import os +import shutil import ssl +import subprocess +import sys import traceback import urllib.request from collections import defaultdict -from typing import Union +from collections.abc import Callable, Iterable + +logger = logging.getLogger(__name__) + + +def safe_delete_paths(paths: Iterable[str | os.PathLike], chmod_mode: int = 755) -> None: + """ + Forcefully deletes a list of files or directories. + - Makes files and dirs writable before deletion. + - Removes both normal files and symlinks. + - Recursively deletes directories. + + Args: + paths: iterable of file or directory paths. + chmod_mode: permission mode to apply before deleting. + """ + + for path in paths: + if not os.path.exists(path): + continue + try: + # Handle symlinks separately + if os.path.islink(path) or os.path.isfile(path): + os.chmod(path, chmod_mode) + os.remove(path) + + if os.path.isdir(path): + # Make sure everything inside is writable + subprocess.run(["chmod", "-R", str(chmod_mode), path], check=False) + shutil.rmtree(path, ignore_errors=False) + logger.info(f"Deleted: {path}") + except Exception as e: + logger.error(f"Could not delete {path}: {e}") def chunk(input_list: list, num_slices: int): @@ -14,14 +49,13 @@ def chunk(input_list: list, num_slices: int): # function to convert a dictionary to json and write to file (d: dictionary, fn: string (filename)) -def dict2json_file(input_dict: dict, file_path: str): +def dict2json_file(input_dict: dict, file_path: str) -> None: # write the json file with open(file_path, "a+", encoding="utf-8") as outfile: json.dump(input_dict, outfile, ensure_ascii=False, indent=None, separators=(",", ":")) -def dict2json_truncate_add_to_file(input_dict: dict, file_path: str): - +def dict2json_truncate_add_to_file(input_dict: dict, file_path: str) -> None: if os.path.exists(file_path): with open(file_path, "a+") as outfile: outfile.seek(outfile.tell() - 1, os.SEEK_SET) @@ -36,7 +70,7 @@ def dict2json_truncate_add_to_file(input_dict: dict, file_path: str): dict2json_file(input_dict, file_path) -def dict2jsonl_file(input_dict: dict | defaultdict, file_path: str): +def dict2jsonl_file(input_dict: dict | defaultdict, file_path: str) -> None: with open(file_path, 'a', encoding='utf-8') as outfile: for k,v in input_dict.items(): o = {k: v} @@ -86,6 +120,7 @@ def download_file_from_url(url: str, output_path: str, ssl_verify: bool = False, except Exception as e: raise Exception(f"Failed to download file from {url} to {output_path}: {e}") from e + # ----------------------------------------------------------------------------------------------------------------- # Function for handling property parsing, used in NiFi processors mainly, but can beused elsewhere # ----------------------------------------------------------------------------------------------------------------- @@ -105,7 +140,7 @@ def parse_value(value: str) -> str|int|float|bool: # ----------------------------------------------------------------------------------------------------------------- # Safe execution wrapper with consistent error logging # ----------------------------------------------------------------------------------------------------------------- -def safe_execute(logger: logging.Logger, func, *args, **kwargs): +def safe_execute(logger: logging.Logger, func, *args, **kwargs) -> Callable: """ Executes a function safely, logging errors with full traceback. diff --git a/nifi/user-scripts/utils/helpers/base_nifi_processor.py b/nifi/user-scripts/utils/helpers/base_nifi_processor.py index a40523e07..634a05509 100644 --- a/nifi/user-scripts/utils/helpers/base_nifi_processor.py +++ b/nifi/user-scripts/utils/helpers/base_nifi_processor.py @@ -1,9 +1,8 @@ -import json import logging -import traceback from logging import Logger +from typing import Generic, TypeVar -from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.flowfiletransform import FlowFileTransform from nifiapi.properties import ( ProcessContext, PropertyDescriptor, @@ -65,7 +64,9 @@ class ProcessorDetails: return decorator -class BaseNiFiProcessor(FlowFileTransform): +T = TypeVar("T") + +class BaseNiFiProcessor(FlowFileTransform, Generic[T]): """Base class providing common NiFi Python processor utilities. NOTE: This is an example implementation meant to be reimplemented by processors inheriting it . For the moment all inherting processors must define @@ -89,28 +90,12 @@ def __init__(self, jvm: JVMView): self.logger: Logger = logging.getLogger(self.__class__.__name__) self.process_context: ProcessContext - # Example properties β€” meant to be overridden or extended in subclasses - self.sample_property_one: bool = True - self.sample_property_two: str = "" - self.sample_property_three: str = "default_value_one" - - # this is directly mirrored to the UI - self._properties = [ - PropertyDescriptor(name="sample_property_one", + self._properties: list = [ + PropertyDescriptor(name="sample_property_one", description="sample property one description", default_value="true", required=True, - validators=[StandardValidators.BOOLEAN_VALIDATOR]), - PropertyDescriptor(name="sample_property_two", - description="sample property two description", - required=False, - default_value="some_value"), - PropertyDescriptor(name="sample_property_three", - required=True, - description="sample property three description", - default_value="default_value_one", - allowable_values=["default_value_one", "default_value_two", "default_value_three"], - validators=[StandardValidators.NON_EMPTY_VALIDATOR]) + validators=StandardValidators.BOOLEAN_VALIDATOR), ] self._relationships = [ @@ -147,26 +132,25 @@ def set_properties(self, properties: dict) -> None: setattr(self, name, val) self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore - """ - NOTE: This is a sample method meant to be overridden and reimplemented by subclasses. - - Main processor logic. This example reads Avro records from the incoming flowfile, - and writes them back out to a new flowfile. It also adds the processor properties - to the flowfile attributes. IT IS A SAMPLE ONLY, - you are meant to use this as a starting point for other processors + def onScheduled(self, context: ProcessContext) -> None: """ - output_contents = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore - self.logger.info("Successfully transformed Avro content for OCR") - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents)) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception + Called automatically by NiFi once when the processor is scheduled to run + (i.e., enabled or started). This method is used for initializing and + allocating resources that should persist across multiple FlowFile + executions. + + Typical use cases include: + - Loading static data from disk (e.g., CSV lookup tables, configuration files) + - Establishing external connections (e.g., databases, APIs) + - Building in-memory caches or models used by onTrigger/transform() + + The resources created here remain in memory for the lifetime of the + processor and are shared by all concurrent FlowFile executions on this + node. They should be lightweight and thread-safe. To release or clean up + such resources, use the @OnStopped method, which NiFi calls when the + processor is disabled or stopped. + """ + pass + + def transform(self, context: ProcessContext, flowFile: JavaObject): + raise NotImplementedError diff --git a/nifi/user-templates/opensearch_docs_ingest_annotations_to_es.json b/nifi/user-templates/opensearch_docs_ingest_annotations_to_es.json new file mode 100644 index 000000000..848c06713 --- /dev/null +++ b/nifi/user-templates/opensearch_docs_ingest_annotations_to_es.json @@ -0,0 +1 @@ +{"flowContents":{"identifier":"4f6ef035-0199-1000-2908-9b0d750bc18c","instanceIdentifier":"2c99a7e3-98d1-37d9-2345-149e41abaa26","name":"opensearch_docs_ingest_annotations_to_es","comments":"","position":{"x":-488.0,"y":-720.0},"processGroups":[],"remoteProcessGroups":[],"processors":[{"identifier":"4f6ef03a-0199-1000-bf6b-34d8129e4932","instanceIdentifier":"bc394929-a745-3a6d-b88b-ba9292b7f54a","name":"PutElasticsearchJson","comments":"","position":{"x":-224.0,"y":-48.0},"type":"org.apache.nifi.processors.elasticsearch.PutElasticsearchJson","bundle":{"group":"org.apache.nifi","artifact":"nifi-elasticsearch-restapi-nar","version":"2.6.0"},"properties":{"put-es-json-dynamic_templates":null,"Max JSON Field String Length":"20 MB","put-es-record-batch-size":"100","put-es-record-index-op":"index","put-es-output-error-responses":"false","el-rest-fetch-index":"encounters_annotations","put-es-json-scripted-upsert":"false","put-es-json-charset":"UTF-8","el-rest-client-service":"06280792-dc54-3d7a-9d35-76ce834dcec3","put-es-record-log-error-responses":"false","put-es-json-id-attr":"annotation_id","el-rest-type":null,"put-es-not_found-is-error":"true","put-es-json-script":null},"propertyDescriptors":{"put-es-json-dynamic_templates":{"name":"put-es-json-dynamic_templates","displayName":"Dynamic Templates","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Max JSON Field String Length":{"name":"Max JSON Field String Length","displayName":"Max JSON Field String Length","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-record-batch-size":{"name":"put-es-record-batch-size","displayName":"Batch Size","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-record-index-op":{"name":"put-es-record-index-op","displayName":"Index Operation","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-output-error-responses":{"name":"put-es-output-error-responses","displayName":"Output Error Responses","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-fetch-index":{"name":"el-rest-fetch-index","displayName":"Index","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-json-scripted-upsert":{"name":"put-es-json-scripted-upsert","displayName":"Scripted Upsert","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-json-charset":{"name":"put-es-json-charset","displayName":"Character Set","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-client-service":{"name":"el-rest-client-service","displayName":"Client Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"put-es-record-log-error-responses":{"name":"put-es-record-log-error-responses","displayName":"Log Error Responses","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-json-id-attr":{"name":"put-es-json-id-attr","displayName":"Identifier Attribute","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-type":{"name":"el-rest-type","displayName":"Type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-not_found-is-error":{"name":"put-es-not_found-is-error","displayName":"Treat \"Not Found\" as Success","identifiesControllerService":false,"sensitive":false,"dynamic":false},"put-es-json-script":{"name":"put-es-json-script","displayName":"Script","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"DEBUG","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["successful"],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"7824060e-49ad-3398-8b25-28be95944992","instanceIdentifier":"64bac204-019a-1000-0de5-7317fa1baece","name":"EvaluateJsonPath-create_annotation_id_store_as_attritbute","comments":"","position":{"x":408.0,"y":-48.0},"type":"org.apache.nifi.processors.standard.EvaluateJsonPath","bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"2.6.0"},"properties":{"annotation_id":"$.annotation_id","Destination":"flowfile-attribute","Max String Length":"20 MB","Return Type":"auto-detect","Null Value Representation":"empty string","Path Not Found Behavior":"ignore"},"propertyDescriptors":{"annotation_id":{"name":"annotation_id","displayName":"annotation_id","identifiesControllerService":false,"sensitive":false,"dynamic":true},"Destination":{"name":"Destination","displayName":"Destination","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Max String Length":{"name":"Max String Length","displayName":"Max String Length","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Return Type":{"name":"Return Type","displayName":"Return Type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Null Value Representation":{"name":"Null Value Representation","displayName":"Null Value Representation","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Path Not Found Behavior":{"name":"Path Not Found Behavior","displayName":"Path Not Found Behavior","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":[],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef039-0199-1000-a2bf-35cc59e5c5d4","instanceIdentifier":"ba54edee-cf06-3323-e8fc-2319dbb87cfd","name":"PrepareRecordForNlp","comments":"","position":{"x":-224.0,"y":-688.0},"type":"PrepareRecordForNlp","bundle":{"group":"org.apache.nifi","artifact":"python-extensions","version":"0.0.1"},"properties":{"document_text_field_name":"text","process_flow_file_type":"json","document_id_field_name":"id"},"propertyDescriptors":{"document_text_field_name":{"name":"document_text_field_name","displayName":"document_text_field_name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"process_flow_file_type":{"name":"process_flow_file_type","displayName":"process_flow_file_type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"document_id_field_name":{"name":"document_id_field_name","displayName":"document_id_field_name","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"INFO","runDurationMillis":25,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["original"],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef038-0199-1000-a0ed-ac10e18267f9","instanceIdentifier":"a2c6ca7c-9418-3108-59af-12d17238643d","name":"SearchElasticsearch","comments":"","position":{"x":-224.0,"y":-888.0},"type":"org.apache.nifi.processors.elasticsearch.SearchElasticsearch","bundle":{"group":"org.apache.nifi","artifact":"nifi-elasticsearch-restapi-nar","version":"2.6.0"},"properties":{"el-rest-pagination-type":"pagination-scroll","es-rest-size":null,"es-rest-query-sort":null,"Max JSON Field String Length":"20 MB","el-rest-query-clause":null,"el-rest-query-definition-style":"full","es-rest-query-script-fields":null,"es-rest-query-aggs":null,"el-rest-pagination-keep-alive":"10 mins","el-rest-format-hits":"SOURCE_ONLY","el-rest-split-up-hits":"splitUp-no","el-rest-query":"{ \"_source\": [\"id\", \"text\"],\n \"query\": {\n \"bool\": {\n \"must\": [\n { \"exists\": { \"field\": \"text\" } }\n ]\n }\n }\n}","el-rest-split-up-aggregations":"splitUp-no","el-rest-format-aggregations":"BUCKETS_ONLY","el-rest-fetch-index":"encounters","el-rest-output-no-hits":"false","el-rest-client-service":"06280792-dc54-3d7a-9d35-76ce834dcec3","el-rest-type":null,"restart-on-finish":"false","es-rest-query-fields":null,"el-query-attribute":null},"propertyDescriptors":{"el-rest-pagination-type":{"name":"el-rest-pagination-type","displayName":"Pagination Type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"es-rest-size":{"name":"es-rest-size","displayName":"Size","identifiesControllerService":false,"sensitive":false,"dynamic":false},"es-rest-query-sort":{"name":"es-rest-query-sort","displayName":"Sort","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Max JSON Field String Length":{"name":"Max JSON Field String Length","displayName":"Max JSON Field String Length","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-query-clause":{"name":"el-rest-query-clause","displayName":"Query Clause","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-query-definition-style":{"name":"el-rest-query-definition-style","displayName":"Query Definition Style","identifiesControllerService":false,"sensitive":false,"dynamic":false},"es-rest-query-script-fields":{"name":"es-rest-query-script-fields","displayName":"Script Fields","identifiesControllerService":false,"sensitive":false,"dynamic":false},"es-rest-query-aggs":{"name":"es-rest-query-aggs","displayName":"Aggregations","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-pagination-keep-alive":{"name":"el-rest-pagination-keep-alive","displayName":"Pagination Keep Alive","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-format-hits":{"name":"el-rest-format-hits","displayName":"Search Results Format","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-split-up-hits":{"name":"el-rest-split-up-hits","displayName":"Search Results Split","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-query":{"name":"el-rest-query","displayName":"Query","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-split-up-aggregations":{"name":"el-rest-split-up-aggregations","displayName":"Aggregation Results Split","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-format-aggregations":{"name":"el-rest-format-aggregations","displayName":"Aggregation Results Format","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-fetch-index":{"name":"el-rest-fetch-index","displayName":"Index","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-output-no-hits":{"name":"el-rest-output-no-hits","displayName":"Output No Hits","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-rest-client-service":{"name":"el-rest-client-service","displayName":"Client Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"el-rest-type":{"name":"el-rest-type","displayName":"Type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"restart-on-finish":{"name":"restart-on-finish","displayName":"Restart On Finish?","identifiesControllerService":false,"sensitive":false,"dynamic":false},"es-rest-query-fields":{"name":"es-rest-query-fields","displayName":"Fields","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-query-attribute":{"name":"el-query-attribute","displayName":"Query Attribute","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"PRIMARY","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":[],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef036-0199-1000-0f81-2e8b77aa5c73","instanceIdentifier":"dcd2d3ab-d6ec-3d18-ccc8-2c96b049b7e5","name":"SplitJson","comments":"","position":{"x":408.0,"y":-264.0},"type":"org.apache.nifi.processors.standard.SplitJson","bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"2.6.0"},"properties":{"Max String Length":"20 MB","Null Value Representation":"empty string","JsonPath Expression":"$.*"},"propertyDescriptors":{"Max String Length":{"name":"Max String Length","displayName":"Max String Length","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Null Value Representation":{"name":"Null Value Representation","displayName":"Null Value Representation","identifiesControllerService":false,"sensitive":false,"dynamic":false},"JsonPath Expression":{"name":"JsonPath Expression","displayName":"JsonPath Expression","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["original"],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef037-0199-1000-3a77-cb27fc945e04","instanceIdentifier":"7a00c8f1-408d-3f43-293b-1fee1c451654","name":"ParseCogStackServiceResult","comments":"","position":{"x":-224.0,"y":-264.0},"type":"ParseCogStackServiceResult","bundle":{"group":"org.apache.nifi","artifact":"python-extensions","version":"0.0.1"},"properties":{"document_text_field_name":"text","medcat_output_mode":"not_set","service_message_type":"medcat","output_text_field_name":"text","document_id_field_name":"id","medcat_deid_keep_annotations":"false"},"propertyDescriptors":{"document_text_field_name":{"name":"document_text_field_name","displayName":"document_text_field_name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"medcat_output_mode":{"name":"medcat_output_mode","displayName":"medcat_output_mode","identifiesControllerService":false,"sensitive":false,"dynamic":false},"service_message_type":{"name":"service_message_type","displayName":"service_message_type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"output_text_field_name":{"name":"output_text_field_name","displayName":"output_text_field_name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"document_id_field_name":{"name":"document_id_field_name","displayName":"document_id_field_name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"medcat_deid_keep_annotations":{"name":"medcat_deid_keep_annotations","displayName":"medcat_deid_keep_annotations","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":25,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["original"],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef03b-0199-1000-ce55-a83a235f6052","instanceIdentifier":"7f451015-89a1-3dab-0a9f-6a6f9b33ec6b","name":"InvokeHTTP","comments":"","position":{"x":-224.0,"y":-472.0},"type":"org.apache.nifi.processors.standard.InvokeHTTP","bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"2.6.0"},"properties":{"Request Content-Encoding":"DISABLED","proxy-configuration-service":null,"Request Multipart Form-Data Filename Enabled":"true","Request Chunked Transfer-Encoding Enabled":"false","Response Header Request Attributes Prefix":null,"HTTP/2 Disabled":"True","Connection Timeout":"5 secs","Response Cookie Strategy":"DISABLED","Socket Read Timeout":"15 secs","Socket Idle Connections":"5","Request Body Enabled":"true","OAuth2 Access Token Refresh Strategy":"ON_TOKEN_EXPIRATION","HTTP URL":"http://cogstack-medcat-service-production:5000/api/process_bulk","Request OAuth2 Access Token Provider":null,"Socket Idle Timeout":"5 mins","Response Redirects Enabled":"True","Socket Write Timeout":"15 secs","Request Header Attributes Pattern":null,"Response FlowFile Naming Strategy":"RANDOM","Response Cache Enabled":"false","Request Date Header Enabled":"True","Request Failure Penalization Enabled":"false","Response Body Attribute Size":"256","SSL Context Service":null,"Response Generation Required":"false","Request User-Agent":null,"Response Header Request Attributes Enabled":"false","HTTP Method":"POST","Request Username":null,"Request Content-Type":"${mime.type}","Response Body Attribute Name":null,"Request Digest Authentication Enabled":"false","Request Multipart Form-Data Name":null,"Response Cache Size":"10MB","Response Body Ignored":"false"},"propertyDescriptors":{"Request Content-Encoding":{"name":"Request Content-Encoding","displayName":"Request Content-Encoding","identifiesControllerService":false,"sensitive":false,"dynamic":false},"proxy-configuration-service":{"name":"proxy-configuration-service","displayName":"Proxy Configuration Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"Request Multipart Form-Data Filename Enabled":{"name":"Request Multipart Form-Data Filename Enabled","displayName":"Request Multipart Form-Data Filename Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Chunked Transfer-Encoding Enabled":{"name":"Request Chunked Transfer-Encoding Enabled","displayName":"Request Chunked Transfer-Encoding Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Header Request Attributes Prefix":{"name":"Response Header Request Attributes Prefix","displayName":"Response Header Request Attributes Prefix","identifiesControllerService":false,"sensitive":false,"dynamic":false},"HTTP/2 Disabled":{"name":"HTTP/2 Disabled","displayName":"HTTP/2 Disabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Connection Timeout":{"name":"Connection Timeout","displayName":"Connection Timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Cookie Strategy":{"name":"Response Cookie Strategy","displayName":"Response Cookie Strategy","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Password":{"name":"Request Password","displayName":"Request Password","identifiesControllerService":false,"sensitive":true,"dynamic":false},"Socket Read Timeout":{"name":"Socket Read Timeout","displayName":"Socket Read Timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Socket Idle Connections":{"name":"Socket Idle Connections","displayName":"Socket Idle Connections","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Body Enabled":{"name":"Request Body Enabled","displayName":"Request Body Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"OAuth2 Access Token Refresh Strategy":{"name":"OAuth2 Access Token Refresh Strategy","displayName":"OAuth2 Access Token Refresh Strategy","identifiesControllerService":false,"sensitive":false,"dynamic":false},"HTTP URL":{"name":"HTTP URL","displayName":"HTTP URL","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request OAuth2 Access Token Provider":{"name":"Request OAuth2 Access Token Provider","displayName":"Request OAuth2 Access Token Provider","identifiesControllerService":true,"sensitive":false,"dynamic":false},"Socket Idle Timeout":{"name":"Socket Idle Timeout","displayName":"Socket Idle Timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Redirects Enabled":{"name":"Response Redirects Enabled","displayName":"Response Redirects Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Socket Write Timeout":{"name":"Socket Write Timeout","displayName":"Socket Write Timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Header Attributes Pattern":{"name":"Request Header Attributes Pattern","displayName":"Request Header Attributes Pattern","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response FlowFile Naming Strategy":{"name":"Response FlowFile Naming Strategy","displayName":"Response FlowFile Naming Strategy","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Cache Enabled":{"name":"Response Cache Enabled","displayName":"Response Cache Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Date Header Enabled":{"name":"Request Date Header Enabled","displayName":"Request Date Header Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Failure Penalization Enabled":{"name":"Request Failure Penalization Enabled","displayName":"Request Failure Penalization Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Body Attribute Size":{"name":"Response Body Attribute Size","displayName":"Response Body Attribute Size","identifiesControllerService":false,"sensitive":false,"dynamic":false},"SSL Context Service":{"name":"SSL Context Service","displayName":"SSL Context Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"Response Generation Required":{"name":"Response Generation Required","displayName":"Response Generation Required","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request User-Agent":{"name":"Request User-Agent","displayName":"Request User-Agent","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Header Request Attributes Enabled":{"name":"Response Header Request Attributes Enabled","displayName":"Response Header Request Attributes Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"HTTP Method":{"name":"HTTP Method","displayName":"HTTP Method","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Username":{"name":"Request Username","displayName":"Request Username","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Content-Type":{"name":"Request Content-Type","displayName":"Request Content-Type","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Body Attribute Name":{"name":"Response Body Attribute Name","displayName":"Response Body Attribute Name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Digest Authentication Enabled":{"name":"Request Digest Authentication Enabled","displayName":"Request Digest Authentication Enabled","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Request Multipart Form-Data Name":{"name":"Request Multipart Form-Data Name","displayName":"Request Multipart Form-Data Name","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Cache Size":{"name":"Response Cache Size","displayName":"Response Cache Size","identifiesControllerService":false,"sensitive":false,"dynamic":false},"Response Body Ignored":{"name":"Response Body Ignored","displayName":"Response Body Ignored","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"style":{},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["Original"],"scheduledState":"ENABLED","retryCount":10,"retriedRelationships":[],"backoffMechanism":"PENALIZE_FLOWFILE","maxBackoffPeriod":"10 mins","componentType":"PROCESSOR","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"}],"inputPorts":[],"outputPorts":[],"connections":[{"identifier":"4f6ef051-0199-1000-540c-72aa95c7f5aa","instanceIdentifier":"b01f1d84-1072-3080-891a-40fd8a2430ed","name":"","source":{"id":"4f6ef036-0199-1000-0f81-2e8b77aa5c73","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"SplitJson","comments":"","instanceIdentifier":"dcd2d3ab-d6ec-3d18-ccc8-2c96b049b7e5"},"destination":{"id":"4f6ef03c-0199-1000-fada-c641cccb88e0","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"4f5dcd68-d034-310a-f613-9ea0f05b05b7"},"labelIndex":0,"zIndex":16,"selectedRelationships":["failure"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef048-0199-1000-e3e1-ed0581d1e39b","instanceIdentifier":"9e2f6834-11d1-3152-b6fe-af3f4d8efc8e","name":"","source":{"id":"4f6ef038-0199-1000-a0ed-ac10e18267f9","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"SearchElasticsearch","comments":"","instanceIdentifier":"a2c6ca7c-9418-3108-59af-12d17238643d"},"destination":{"id":"4f6ef039-0199-1000-a2bf-35cc59e5c5d4","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PrepareRecordForNlp","comments":"","instanceIdentifier":"ba54edee-cf06-3323-e8fc-2319dbb87cfd"},"labelIndex":0,"zIndex":1,"selectedRelationships":["hits","aggregations"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04a-0199-1000-fa89-a31050551c4c","instanceIdentifier":"d92a0fa2-42aa-3074-eb81-c5ace7c44e5f","name":"","source":{"id":"4f6ef03b-0199-1000-ce55-a83a235f6052","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"InvokeHTTP","comments":"","instanceIdentifier":"7f451015-89a1-3dab-0a9f-6a6f9b33ec6b"},"destination":{"id":"4f6ef041-0199-1000-414e-f0d8b090d0fc","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"2e3fbd7b-19b7-36ed-5a15-675a54f93739"},"labelIndex":0,"zIndex":6,"selectedRelationships":["No Retry","Retry","Failure"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"d25b7cf5-d43f-3bed-b1dc-c57fa67c3085","instanceIdentifier":"64bbf311-019a-1000-3d1e-4853dd41daa4","name":"","source":{"id":"7824060e-49ad-3398-8b25-28be95944992","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"EvaluateJsonPath-create_annotation_id_store_as_attritbute","comments":"","instanceIdentifier":"64bac204-019a-1000-0de5-7317fa1baece"},"destination":{"id":"9235994b-7e87-3c4c-9367-3120d9ae1701","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"64bbe06a-019a-1000-1db4-6451cafadd67"},"labelIndex":0,"zIndex":20,"selectedRelationships":["unmatched"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef050-0199-1000-ed55-315999e71dd3","instanceIdentifier":"504a3379-407b-3eba-6324-ecd181d29315","name":"","source":{"id":"4f6ef03a-0199-1000-bf6b-34d8129e4932","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PutElasticsearchJson","comments":"","instanceIdentifier":"bc394929-a745-3a6d-b88b-ba9292b7f54a"},"destination":{"id":"4f6ef040-0199-1000-58f9-341dd241015a","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"751b9f99-488f-3eb9-8f73-df4f56d4e911"},"labelIndex":0,"zIndex":14,"selectedRelationships":["original"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04b-0199-1000-8171-7f5508365aa5","instanceIdentifier":"7d02fcf7-37a0-3094-82ed-a16710d9334f","name":"","source":{"id":"4f6ef038-0199-1000-a0ed-ac10e18267f9","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"SearchElasticsearch","comments":"","instanceIdentifier":"a2c6ca7c-9418-3108-59af-12d17238643d"},"destination":{"id":"4f6ef03f-0199-1000-e999-88a80c4dc736","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"c560755b-7141-35a8-1563-458a06bcd081"},"labelIndex":0,"zIndex":3,"selectedRelationships":["failure","retry"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef047-0199-1000-833f-d750174bb0e8","instanceIdentifier":"347d8b64-9e08-340e-4efc-62b7ca03b298","name":"","source":{"id":"4f6ef039-0199-1000-a2bf-35cc59e5c5d4","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PrepareRecordForNlp","comments":"","instanceIdentifier":"ba54edee-cf06-3323-e8fc-2319dbb87cfd"},"destination":{"id":"4f6ef03b-0199-1000-ce55-a83a235f6052","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"InvokeHTTP","comments":"","instanceIdentifier":"7f451015-89a1-3dab-0a9f-6a6f9b33ec6b"},"labelIndex":0,"zIndex":2,"selectedRelationships":["success"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef049-0199-1000-3b5e-ae207d2827df","instanceIdentifier":"8a081b73-52ed-3f2f-9ce7-cc0fa30c8edb","name":"","source":{"id":"4f6ef039-0199-1000-a2bf-35cc59e5c5d4","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PrepareRecordForNlp","comments":"","instanceIdentifier":"ba54edee-cf06-3323-e8fc-2319dbb87cfd"},"destination":{"id":"4f6ef03e-0199-1000-8d30-641327254f9a","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"e0a72e5c-2089-3b15-0516-5d3a1a541e49"},"labelIndex":0,"zIndex":4,"selectedRelationships":["failure"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04e-0199-1000-cf82-d9da33757a0d","instanceIdentifier":"f55c2b0e-b583-3232-0fe7-41826d97088e","name":"","source":{"id":"4f6ef03a-0199-1000-bf6b-34d8129e4932","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PutElasticsearchJson","comments":"","instanceIdentifier":"bc394929-a745-3a6d-b88b-ba9292b7f54a"},"destination":{"id":"4f6ef042-0199-1000-a1ff-821b7e239448","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"25106100-86d1-3c48-9790-eea9dc409a10"},"labelIndex":0,"zIndex":13,"selectedRelationships":["failure","retry","errors"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04d-0199-1000-69b3-4d8ec67ea6f2","instanceIdentifier":"e2b80e81-9954-35d1-54c3-8dff40ff28a3","name":"","source":{"id":"4f6ef036-0199-1000-0f81-2e8b77aa5c73","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"SplitJson","comments":"","instanceIdentifier":"dcd2d3ab-d6ec-3d18-ccc8-2c96b049b7e5"},"destination":{"id":"7824060e-49ad-3398-8b25-28be95944992","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"EvaluateJsonPath-create_annotation_id_store_as_attritbute","comments":"","instanceIdentifier":"64bac204-019a-1000-0de5-7317fa1baece"},"labelIndex":0,"zIndex":17,"selectedRelationships":["split"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"15b0139a-cd12-333c-9ceb-0b05b63d3de6","instanceIdentifier":"64bbb5de-019a-1000-705d-f03c3904fc2c","name":"","source":{"id":"7824060e-49ad-3398-8b25-28be95944992","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"EvaluateJsonPath-create_annotation_id_store_as_attritbute","comments":"","instanceIdentifier":"64bac204-019a-1000-0de5-7317fa1baece"},"destination":{"id":"bc6e34fd-b83a-38dd-8814-cae922476dd2","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"64bba566-019a-1000-bf0c-7ce9d37de92a"},"labelIndex":0,"zIndex":19,"selectedRelationships":["failure"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04c-0199-1000-b3c2-d23a834b7ddf","instanceIdentifier":"ded8d131-3043-318d-873d-89c370743fed","name":"","source":{"id":"4f6ef037-0199-1000-3a77-cb27fc945e04","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"ParseCogStackServiceResult","comments":"","instanceIdentifier":"7a00c8f1-408d-3f43-293b-1fee1c451654"},"destination":{"id":"4f6ef036-0199-1000-0f81-2e8b77aa5c73","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"SplitJson","comments":"","instanceIdentifier":"dcd2d3ab-d6ec-3d18-ccc8-2c96b049b7e5"},"labelIndex":0,"zIndex":11,"selectedRelationships":["success"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"e5aa33e5-2d74-3311-9119-13ac3b46cdaa","instanceIdentifier":"64bb96c1-019a-1000-5298-db2f99471abb","name":"","source":{"id":"7824060e-49ad-3398-8b25-28be95944992","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"EvaluateJsonPath-create_annotation_id_store_as_attritbute","comments":"","instanceIdentifier":"64bac204-019a-1000-0de5-7317fa1baece"},"destination":{"id":"4f6ef03a-0199-1000-bf6b-34d8129e4932","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"PutElasticsearchJson","comments":"","instanceIdentifier":"bc394929-a745-3a6d-b88b-ba9292b7f54a"},"labelIndex":0,"zIndex":18,"selectedRelationships":["matched"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef04f-0199-1000-bad6-ec3062cabd16","instanceIdentifier":"500e7104-aa4c-3ba3-c851-3a1fbf25e661","name":"","source":{"id":"4f6ef03b-0199-1000-ce55-a83a235f6052","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"InvokeHTTP","comments":"","instanceIdentifier":"7f451015-89a1-3dab-0a9f-6a6f9b33ec6b"},"destination":{"id":"4f6ef037-0199-1000-3a77-cb27fc945e04","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"ParseCogStackServiceResult","comments":"","instanceIdentifier":"7a00c8f1-408d-3f43-293b-1fee1c451654"},"labelIndex":0,"zIndex":5,"selectedRelationships":["Response"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef046-0199-1000-dd75-b063e0d1543d","instanceIdentifier":"b7cbd425-06e9-352a-3c13-47a5c9949af9","name":"","source":{"id":"4f6ef037-0199-1000-3a77-cb27fc945e04","type":"PROCESSOR","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"ParseCogStackServiceResult","comments":"","instanceIdentifier":"7a00c8f1-408d-3f43-293b-1fee1c451654"},"destination":{"id":"4f6ef03d-0199-1000-f2d3-036a48f6cab5","type":"FUNNEL","groupId":"4f6ef035-0199-1000-2908-9b0d750bc18c","name":"Funnel","comments":"","instanceIdentifier":"b4e621de-d47c-3259-e197-74e0a7095bcd"},"labelIndex":0,"zIndex":12,"selectedRelationships":["failure"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"}],"labels":[{"identifier":"4f6ef045-0199-1000-7730-eb9d736c9d6e","instanceIdentifier":"d676761e-61b1-3331-19be-15f6461be323","position":{"x":136.0,"y":-888.0},"label":"IMPORTANT: if you don't want for the processor to go on forever please set the 'Restart On Finish?' to 'false.\n\n","zIndex":1,"width":176.0,"height":128.0,"style":{"font-size":"12px"},"componentType":"LABEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"64d5a523-019a-1000-cc5b-0a7cdbb9ab09","instanceIdentifier":"fe821160-92f7-33f9-3135-5c5844df2221","position":{"x":-224.0,"y":-968.0},"label":"IMPORTANT: the query pulls only the id and the text fields (where the field is present).\n\n\n","zIndex":1,"width":352.0,"height":72.0,"style":{"font-size":"12px"},"componentType":"LABEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef043-0199-1000-68ba-f8082b95bf3b","instanceIdentifier":"9b0edd57-8a63-3891-81de-fe2a812a5fa3","position":{"x":136.0,"y":-472.0},"label":"IMPORTANT: make sure the 'Request Body Enabled' is set to 'true', which means the file will get sent as a json rather than a binary.","zIndex":1,"width":176.0,"height":128.0,"style":{"font-size":"12px"},"componentType":"LABEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef044-0199-1000-7659-6d80322a5725","instanceIdentifier":"479cbbea-3933-3b6f-7e55-fa7d3531d245","position":{"x":136.0,"y":-688.0},"label":"IMPORTANT: make sure the url is correct, if you have medcat service on an other server, the default DEID instance port is 5556","zIndex":1,"width":176.0,"height":128.0,"style":{"font-size":"12px"},"componentType":"LABEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"e8de00c2-e3ac-3481-b7ca-f3e46d5066c4","instanceIdentifier":"64dd8ee7-019a-1000-1fea-00c963dd77a1","position":{"x":768.0,"y":56.0},"label":"Here we get the id of the annotation and put it as an attribute for use with ElasticSearch for unique record indexing.","zIndex":2,"width":376.0,"height":152.0,"style":{"font-size":"12px"},"componentType":"LABEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"}],"funnels":[{"identifier":"4f6ef041-0199-1000-414e-f0d8b090d0fc","instanceIdentifier":"2e3fbd7b-19b7-36ed-5a15-675a54f93739","position":{"x":-552.0,"y":-432.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"9235994b-7e87-3c4c-9367-3120d9ae1701","instanceIdentifier":"64bbe06a-019a-1000-1db4-6451cafadd67","position":{"x":560.0,"y":168.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef03d-0199-1000-f2d3-036a48f6cab5","instanceIdentifier":"b4e621de-d47c-3259-e197-74e0a7095bcd","position":{"x":-552.0,"y":-216.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef042-0199-1000-a1ff-821b7e239448","instanceIdentifier":"25106100-86d1-3c48-9790-eea9dc409a10","position":{"x":-552.0,"y":-8.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef03c-0199-1000-fada-c641cccb88e0","instanceIdentifier":"4f5dcd68-d034-310a-f613-9ea0f05b05b7","position":{"x":560.0,"y":-392.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef03f-0199-1000-e999-88a80c4dc736","instanceIdentifier":"c560755b-7141-35a8-1563-458a06bcd081","position":{"x":-552.0,"y":-848.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"bc6e34fd-b83a-38dd-8814-cae922476dd2","instanceIdentifier":"64bba566-019a-1000-bf0c-7ce9d37de92a","position":{"x":1104.0,"y":-8.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef040-0199-1000-58f9-341dd241015a","instanceIdentifier":"751b9f99-488f-3eb9-8f73-df4f56d4e911","position":{"x":-72.0,"y":168.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"},{"identifier":"4f6ef03e-0199-1000-8d30-641327254f9a","instanceIdentifier":"e0a72e5c-2089-3b15-0516-5d3a1a541e49","position":{"x":-552.0,"y":-640.0},"componentType":"FUNNEL","groupIdentifier":"4f6ef035-0199-1000-2908-9b0d750bc18c"}],"controllerServices":[{"identifier":"06280792-dc54-3d7a-9d35-76ce834dcec3","instanceIdentifier":"6cf1af26-0198-1000-53a1-750027cad544","name":"ElasticSearchClientServiceImpl","comments":"","type":"org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl","bundle":{"group":"org.apache.nifi","artifact":"nifi-elasticsearch-client-service-nar","version":"2.6.0"},"properties":{"el-cs-suppress-nulls":"always-suppress","el-cs-strict-deprecation":"false","el-cs-http-hosts":"https://elasticsearch-1:9200,https://elasticsearch-2:9200","el-cs-path-prefix":null,"el-cs-run-as-user":null,"proxy-configuration-service":null,"el-cs-charset":"UTF-8","api-key-id":null,"el-cs-send-meta-header":"true","el-cs-username":"admin","el-cs-enable-compression":"false","el-cs-sniff-cluster-nodes":"false","el-cs-sniffer-interval":"5 mins","el-cs-sniffer-failure-delay":"1 min","el-cs-node-selector":"ANY","authorization-scheme":"BASIC","el-cs-sniff-failure":"false","el-cs-sniffer-request-timeout":"1 sec","el-cs-connect-timeout":"5000","el-cs-socket-timeout":"60000","el-cs-ssl-context-service":"1d435682-6896-31ac-9c97-82b4a4462858","el-cs-oauth2-token-provider":null},"propertyDescriptors":{"el-cs-suppress-nulls":{"name":"el-cs-suppress-nulls","displayName":"Suppress Null/Empty Values","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-strict-deprecation":{"name":"el-cs-strict-deprecation","displayName":"Strict Deprecation","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-http-hosts":{"name":"el-cs-http-hosts","displayName":"HTTP Hosts","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-run-as-user":{"name":"el-cs-run-as-user","displayName":"Run As User","identifiesControllerService":false,"sensitive":false,"dynamic":false},"proxy-configuration-service":{"name":"proxy-configuration-service","displayName":"Proxy Configuration Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"el-cs-charset":{"name":"el-cs-charset","displayName":"Charset","identifiesControllerService":false,"sensitive":false,"dynamic":false},"jwt-shared-secret":{"name":"jwt-shared-secret","displayName":"JWT Shared Secret","identifiesControllerService":false,"sensitive":true,"dynamic":false},"el-cs-send-meta-header":{"name":"el-cs-send-meta-header","displayName":"Send Meta Header","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-username":{"name":"el-cs-username","displayName":"Username","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-enable-compression":{"name":"el-cs-enable-compression","displayName":"Enable Compression","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-sniff-cluster-nodes":{"name":"el-cs-sniff-cluster-nodes","displayName":"Sniff Cluster Nodes","identifiesControllerService":false,"sensitive":false,"dynamic":false},"api-key":{"name":"api-key","displayName":"API Key","identifiesControllerService":false,"sensitive":true,"dynamic":false},"el-cs-node-selector":{"name":"el-cs-node-selector","displayName":"Node Selector","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-sniff-failure":{"name":"el-cs-sniff-failure","displayName":"Sniff on Failure","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-sniffer-request-timeout":{"name":"el-cs-sniffer-request-timeout","displayName":"Sniffer Request Timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-ssl-context-service":{"name":"el-cs-ssl-context-service","displayName":"SSL Context Service","identifiesControllerService":true,"sensitive":false,"dynamic":false},"el-cs-oauth2-token-provider":{"name":"el-cs-oauth2-token-provider","displayName":"OAuth2 Access Token Provider","identifiesControllerService":true,"sensitive":false,"dynamic":false},"el-cs-path-prefix":{"name":"el-cs-path-prefix","displayName":"Path Prefix","identifiesControllerService":false,"sensitive":false,"dynamic":false},"api-key-id":{"name":"api-key-id","displayName":"API Key ID","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-sniffer-interval":{"name":"el-cs-sniffer-interval","displayName":"Sniffer Interval","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-sniffer-failure-delay":{"name":"el-cs-sniffer-failure-delay","displayName":"Sniffer Failure Delay","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-password":{"name":"el-cs-password","displayName":"Password","identifiesControllerService":false,"sensitive":true,"dynamic":false},"authorization-scheme":{"name":"authorization-scheme","displayName":"Authorization Scheme","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-connect-timeout":{"name":"el-cs-connect-timeout","displayName":"Connect timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false},"el-cs-socket-timeout":{"name":"el-cs-socket-timeout","displayName":"Read timeout","identifiesControllerService":false,"sensitive":false,"dynamic":false}},"controllerServiceApis":[{"type":"org.apache.nifi.elasticsearch.ElasticSearchClientService","bundle":{"group":"org.apache.nifi","artifact":"nifi-elasticsearch-client-service-api-nar","version":"2.6.0"}}],"scheduledState":"DISABLED","bulletinLevel":"WARN","componentType":"CONTROLLER_SERVICE","groupIdentifier":"2c99a7e3-98d1-37d9-2345-149e41abaa26"}],"defaultFlowFileExpiration":"0 sec","defaultBackPressureObjectThreshold":10000,"defaultBackPressureDataSizeThreshold":"1 GB","scheduledState":"ENABLED","executionEngine":"INHERITED","maxConcurrentTasks":1,"statelessFlowTimeout":"1 min","flowFileConcurrency":"UNBOUNDED","flowFileOutboundPolicy":"STREAM_WHEN_AVAILABLE","componentType":"PROCESS_GROUP"},"externalControllerServices":{},"parameterContexts":{},"flowEncodingVersion":"1.0","parameterProviders":{},"latest":false} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8b058ce6a..2f17afdc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,3 @@ -# pyproject.toml - [tool.ruff] line-length = 120 exclude = ["nifi/user-scripts/legacy_scripts"] @@ -29,6 +27,8 @@ fixable = ["ALL"] plugins = ["pydantic.mypy"] ignore_missing_imports = true strict = false +files = "." +mypy_path = "./typings/" [tool.isort] line_length = 120 diff --git a/typings/nifiapi/flowfiletransform.pyi b/typings/nifiapi/flowfiletransform.pyi new file mode 100644 index 000000000..32345142d --- /dev/null +++ b/typings/nifiapi/flowfiletransform.pyi @@ -0,0 +1,17 @@ +from typing import Any, Protocol + +from nifiapi.properties import ProcessContext +from py4j.java_gateway import JavaObject + +class FlowFileTransformResult: + def __init__( + self, + relationship: str, + attributes: dict[str, str], + contents: bytes | str | None = None + ) -> None : ... + + + +class FlowFileTransform(Protocol): + def transform(self, context: ProcessContext, flowFile: JavaObject) -> Any: ... diff --git a/typings/py4j/java_gateway.pyi b/typings/py4j/java_gateway.pyi new file mode 100644 index 000000000..80bce30e5 --- /dev/null +++ b/typings/py4j/java_gateway.pyi @@ -0,0 +1,10 @@ +from typing import Any, Protocol + +class JavaObject(Protocol): + """Stub for Py4J JavaObject with the methods we use in NiFi""" + def getContentsAsBytes(self) -> bytes: ... + def getAttributes(self) -> dict[str, Any]: ... + +class JVMView(Protocol): + """Stub JVM view""" + pass \ No newline at end of file