From 3d409f58da053e1cfa686281e8ddd25aacd25457 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Thu, 30 Oct 2025 15:18:01 +0000 Subject: [PATCH 01/13] NiFi: Updated processors. --- .../convert_avro_binary_field_to_base64.py | 34 ++++++++++++++++--- .../convert_json_record_schema.py | 2 +- .../parse_service_response.py | 34 ++++++++++++++++--- .../prepare_record_for_ocr.py | 29 +++++++++++++--- 4 files changed, 83 insertions(+), 16 deletions(-) diff --git a/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py b/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py index 5dec18b7f..01a3fdf57 100644 --- a/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py +++ b/nifi/user-python-extensions/convert_avro_binary_field_to_base64.py @@ -2,6 +2,7 @@ import copy import io import json +import sys import traceback from logging import Logger @@ -14,8 +15,14 @@ PropertyDescriptor, StandardValidators, ) +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 + class ConvertAvroBinaryRecordFieldToBase64(FlowFileTransform): identifier = None @@ -58,7 +65,22 @@ def __init__(self, jvm: JVMView): validators=[StandardValidators.NON_EMPTY_VALIDATOR]), ] + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors @@ -66,17 +88,19 @@ def getPropertyDescriptors(self) -> list[PropertyDescriptor]: def set_logger(self, logger: Logger): self.logger = logger - def set_properties(self, properties: dict): + def set_properties(self, properties: dict) -> None: """ Gets the properties from the processor's context and sets them as instance variables. Args: properties (dict): dictionary containing property names and values. """ - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore """ diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 65cf24da6..cc719ae14 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -17,6 +17,7 @@ from utils.generic import parse_value # noqa: I001,E402 + class ConvertJsonRecordSchema(FlowFileTransform): identifier = None logger: Logger = Logger(__qualname__) @@ -122,7 +123,6 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: return new_record - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore output_contents: list = [] try: diff --git a/nifi/user-python-extensions/parse_service_response.py b/nifi/user-python-extensions/parse_service_response.py index e1e8ed552..7030c1bc8 100644 --- a/nifi/user-python-extensions/parse_service_response.py +++ b/nifi/user-python-extensions/parse_service_response.py @@ -1,4 +1,5 @@ import json +import sys import traceback from logging import Logger @@ -8,8 +9,14 @@ PropertyDescriptor, StandardValidators, ) +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView +# we need to add it to the sys imports +sys.path.insert(0, "/opt/nifi/user-scripts") + +from utils.generic import parse_value # noqa: I001,E402 + class ParseCogStackServiceResult(FlowFileTransform): identifier = None @@ -74,7 +81,22 @@ def __init__(self, jvm: JVMView): ) ] + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors @@ -82,17 +104,19 @@ def getPropertyDescriptors(self) -> list[PropertyDescriptor]: def set_logger(self, logger: Logger): self.logger = logger - def set_properties(self, properties: dict): + def set_properties(self, properties: dict) -> None: """ Gets the properties from the processor's context and sets them as instance variables. Args: properties (dict): dictionary containing property names and values. """ - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore """ diff --git a/nifi/user-python-extensions/prepare_record_for_ocr.py b/nifi/user-python-extensions/prepare_record_for_ocr.py index b9a19f6b9..01a5924a2 100644 --- a/nifi/user-python-extensions/prepare_record_for_ocr.py +++ b/nifi/user-python-extensions/prepare_record_for_ocr.py @@ -14,12 +14,14 @@ PropertyDescriptor, StandardValidators, ) +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView # we need to add it to the sys imports sys.path.insert(0, "/opt/nifi/user-scripts") from utils.helpers.avro_json_encoder import AvroJSONEncoder # noqa: I001,E402 +from utils.generic import parse_value # noqa: I001,E402 class PrepareRecordForOcr(FlowFileTransform): @@ -73,7 +75,22 @@ def __init__(self, jvm: JVMView): allowable_values=["avro", "json"]), ] + self._relationships = [ + Relationship( + name="success", + description="All FlowFiles processed successfully." + ), + Relationship( + name="failure", + description="FlowFiles that failed processing." + ) + ] + self.descriptors: list[PropertyDescriptor] = self._properties + self.relationships: list[Relationship] = self._relationships + + def getRelationships(self) -> list[Relationship]: + return self.relationships def getPropertyDescriptors(self) -> list[PropertyDescriptor]: return self.descriptors @@ -81,17 +98,19 @@ def getPropertyDescriptors(self) -> list[PropertyDescriptor]: def set_logger(self, logger: Logger): self.logger = logger - def set_properties(self, properties: dict): + def set_properties(self, properties: dict) -> None: """ Gets the properties from the processor's context and sets them as instance variables. Args: properties (dict): dictionary containing property names and values. """ - for k, v in list(properties.items()): - self.logger.debug(f"property set '{k.name}' with value '{v}'") - if hasattr(self, k.name): - setattr(self, k.name, v) + for k, v in properties.items(): + name = k.name if hasattr(k, "name") else str(k) + val = parse_value(v) + if hasattr(self, name): + setattr(self, name, val) + self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore output_contents = [] From 365928fbfeda2bb6b63cd46c8a3825915938e359 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Thu, 30 Oct 2025 15:36:18 +0000 Subject: [PATCH 02/13] NiFi: Updated json schema mapping script(allow for duplicate val->key maps) --- .../convert_json_record_schema.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index cc719ae14..2f85f56e8 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -1,7 +1,9 @@ import json import sys import traceback +from collections import defaultdict from logging import Logger +from typing import Any from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult from nifiapi.properties import ( @@ -109,22 +111,30 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: new_record: dict = {} # reverse the json_mapper_schema to map old_field -> new_field - json_mapper_schema_reverse: dict = {v: k for k, v in json_mapper_schema.items() if v} + json_mapper_schema_reverse = defaultdict(list) + for new_field, old_field in json_mapper_schema.items(): + if old_field: # skip nulls / preset-only + json_mapper_schema_reverse[old_field].append(new_field) + # Iterate through existing record fields for curr_field_name, curr_field_value in record.items(): if curr_field_name in json_mapper_schema_reverse: - new_field_name = json_mapper_schema_reverse[curr_field_name] - # check if the mapping is not a dict (nested field) - if isinstance(new_field_name, str): - new_record.update({new_field_name: curr_field_value}) - + # multiple new fields can receive same source value + for new_field_name in json_mapper_schema_reverse[curr_field_name]: + new_record[new_field_name] = curr_field_value elif self.preserve_non_mapped_fields: - new_record.update({curr_field_name: curr_field_value}) + # preserve original fields not defined in mapping + new_record[curr_field_name] = curr_field_value + + # Add preset fields defined with null in schema + for new_field, old_field in json_mapper_schema.items(): + if old_field is None: + new_record.setdefault(new_field, None) return new_record def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore - output_contents: list = [] + output_contents: list[dict[Any, Any]] = [] try: self.process_context: ProcessContext = context self.set_properties(context.getProperties()) From 8f676d9f629e329242db8652c2aa30d3fa88cfcf Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Thu, 30 Oct 2025 20:20:49 +0000 Subject: [PATCH 03/13] NiFi: scripts - convert record schema support nested fields. --- .../convert_json_record_schema.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index 2f85f56e8..ced44df79 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -113,7 +113,8 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: # reverse the json_mapper_schema to map old_field -> new_field json_mapper_schema_reverse = defaultdict(list) for new_field, old_field in json_mapper_schema.items(): - if old_field: # skip nulls / preset-only + # skip nulls & composite fields + if isinstance(old_field, str) and old_field: json_mapper_schema_reverse[old_field].append(new_field) # Iterate through existing record fields @@ -130,6 +131,13 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: for new_field, old_field in json_mapper_schema.items(): if old_field is None: new_record.setdefault(new_field, None) + elif isinstance(old_field, list): + parts = [] + for sub_field in old_field: + val = record.get(sub_field) + if val is not None and val != "": + parts.append(str(val)) + new_record[new_field] = "\n".join(parts) if parts else None return new_record From f736ce2991f1e051559179924f496face886a2b7 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:23:07 +0000 Subject: [PATCH 04/13] Docs: updated security section + lint settings. --- docs/.markdownlint.yaml | 15 ++++++++++++ docs/security/services.md | 50 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 docs/.markdownlint.yaml diff --git a/docs/.markdownlint.yaml b/docs/.markdownlint.yaml new file mode 100644 index 000000000..a890a148f --- /dev/null +++ b/docs/.markdownlint.yaml @@ -0,0 +1,15 @@ +# Disable rules that conflict with Sphinx/MyST conventions +default: true + +MD013: false # Line length — Sphinx often has long code blocks / directives +MD033: false # Inline HTML — needed for raw HTML and directives +MD041: false # First line should be a heading — breaks for included partials +MD024: false # Multiple headings with same content — Sphinx pages can reuse titles +MD034: false # Bare URL — Sphinx cross-refs handle this +MD043: false # Required heading structure — not needed for Sphinx TOC + +# Optional fine-tuning +MD007: + indent: 2 # Consistent list indentation +MD004: + style: dash # Consistent unordered list marker (-) diff --git a/docs/security/services.md b/docs/security/services.md index e69de29bb..dde2e082b 100644 --- a/docs/security/services.md +++ b/docs/security/services.md @@ -0,0 +1,50 @@ +# Service Security + +## General services TLS configuration + +All services in cogstack that are not listed in `deploy/services.yml`and use TLS will be described on this page. For services that are still part of the stack but in `dervices/` (again, service not also present in `deploy/services.yml`) the TLS setup is handled differently, and the setup is described in each service's README.md. Generally, most should just use the root-ca certs from `security/certificates/root/`. + +## Gitea TLS Configuration + +This section describes how **Gitea** is secured using the shared **Root Certificate Authority (CA)** generated by `create_root_ca_cert.sh`. + +Unlike other services (such as NiFi or Elastic), **Gitea does not require its own dedicated certificate pair** or an NGINX reverse proxy. +It operates directly with the **Root CA** to provide HTTPS encryption and mutual trust within the CogStack-NiFi stack. + +--- + +### 📁 Certificate source + +All certificates used by Gitea originate from: + +```text +security/certificates/root/ +``` + +| File | Purpose | +|------|----------| +| `root-ca.pem` | Public CA certificate used by Gitea for HTTPS trust | +| `root-ca.key` | Root CA private key (used only when generating new certificates) | +| `root-ca.p12` | Optional PKCS#12 keystore (not required by Gitea) | + + +### 🧠 Notes + +- The **Root CA** (`root-ca.pem`) is shared across all CogStack services for internal TLS trust. +- You do **not** need to create a new `gitea.crt` or `gitea.key`; the Root CA cert/key pair is sufficient. +- Ensure `root-ca.key` remains private and is not committed to version control. +- The same CA also secures NiFi, Elasticsearch, OpenSearch, Kibana, and JupyterHub. + +--- + +### ✅ Verification + +To confirm Gitea is serving HTTPS correctly: + +```bash +curl -vk --cacert ./security/certificates/root/root-ca.pem https://gitea.local:2222/ +``` + +You should see a valid TLS handshake and an HTTP 200 response. + +--- From c2b8fba67cac095cdf9184e5c3e711be8decf811 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:25:14 +0000 Subject: [PATCH 05/13] Docs: added es/os security section. --- docs/security/elasticsearch_opensearch.md | 220 ++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 docs/security/elasticsearch_opensearch.md diff --git a/docs/security/elasticsearch_opensearch.md b/docs/security/elasticsearch_opensearch.md new file mode 100644 index 000000000..10c4d64bf --- /dev/null +++ b/docs/security/elasticsearch_opensearch.md @@ -0,0 +1,220 @@ +# Elasticsearch / OpenSearch Security + +This section describes how to secure both **Elasticsearch (native)** and **OpenSearch** clusters used in the CogStack-NiFi stack, including certificate setup, user management, and role configuration. + +All related certificates are stored in `security/certificates/elastic/`, and are generated from the shared **Root CA** created via [`create_root_ca_cert.sh`](certificates.md). + +--- + +## 🔒 Overview + +Both **Elasticsearch** and **OpenSearch** deployments require: + +- TLS certificates for all nodes and HTTPS endpoints, +- secure credentials for built-in and custom users, +- properly configured roles and role mappings. + +Certificates and credentials are generated using the scripts provided in `security/scripts/` and are controlled through the `.env` files under `security/env/`. + +--- + +## 📄 Environment files used + +All scripts reference the following environment configuration files: + +| File | Purpose | +|------|----------| +| `certificates_elasticsearch.env` | Hostnames, instance names, and certificate parameters for ES / OpenSearch nodes | +| `certificates_general.env` | Root CA configuration | +| `elasticsearch_users.env` | Internal user credentials | + +Reload them before running any security-related script: + +```bash +cd ../deploy +source export_env_vars.sh +cd ../security +``` + +--- + +## 🧩 Common certificate layout + +Certificate naming and folder structure are consistent across both ES and OpenSearch: + +```text +security/certificates/elastic/ +├── elasticsearch/ +│ ├── elastic-stack-ca.crt.pem +│ ├── elastic-stack-ca.key.pem +│ ├── elasticsearch/ +│ │ ├── elasticsearch-{1,2,3}/ +│ │ │ ├── http-elasticsearch-*.crt +│ │ │ ├── http-elasticsearch-*.key +│ │ │ ├── http-elasticsearch-*.p12 +│ │ │ ├── elasticsearch-*.crt +│ │ │ ├── elasticsearch-*.key +│ │ │ └── elasticsearch-*.p12 +│ └── kibana/ +│ ├── sample-kibana.yml +│ └── README.txt +└── opensearch/ + ├── admin.*, es_kibana_client.*, root-ca.* + └── elasticsearch/{1,2,3}/... +``` + +Each version has its own generation scripts, but they all depend on the same `.env` configuration and naming patterns. + +--- + +## 🏗️ Generating certificates + +### Elasticsearch (native) + +To generate certificates for Elasticsearch: + +```bash +bash ./create_es_native_certs.sh +``` + +This script creates all required node and HTTP certificates under: + +```text +security/certificates/elastic/elasticsearch/elasticsearch-{1,2,3}/ +``` + +The script uses variables such as: + +- `ES_INSTANCE_NAME_*` — Node names (match `ELASTICSEARCH_NODE_*_NAME` in `/deploy/elasticsearch.env`) +- `ES_INSTANCE_ALTERNATIVE_*_NAME` — Alternative hostnames +- `ES_HOSTNAMES` — List of all node hostnames +- `ES_CLIENT_SUBJ_ALT_NAMES` / `ES_NODE_SUBJ_ALT_NAMES` — Additional domain aliases for SAN fields + +Make sure the environment variables are set correctly before running the script. + +--- + +### OpenSearch + +For OpenSearch nodes: + +```bash +bash ./create_opensearch_node_cert.sh elasticsearch-1 elasticsearch-2 elasticsearch-3 +``` + +Then generate the admin and client certificates: + +```bash +bash ./create_opensearch_client_admin_certs.sh +``` + +This produces: + +| File | Purpose | +|------|----------| +| `admin.pem`, `admin-key.pem` | Admin dashboard certificate | +| `es_kibana_client.pem`, `es_kibana_client.key` | Client certificate for Kibana/OpenDashboard | +| `*.jks` | Node keystores/truststores for HTTPS and inter-node encryption | + +The resulting certificates are placed in: + +```text +security/certificates/elastic/opensearch/ +``` + +--- + +## ⚙️ Version variable + +Set the ES/OS version in `deploy/elasticsearch.env` before launching containers: + +```bash +ELASTICSEARCH_VERSION=opensearch +# or +ELASTICSEARCH_VERSION=elasticsearch +``` + +This ensures the correct certificate directory (`elasticsearch` or `opensearch`) is mounted into containers. + +--- + +## 📁 Kibana / OpenDashboard certificates + +| Platform | Required Certificates | Source Folder | +|-----------|----------------------|----------------| +| **Kibana** | `elasticsearch-{1,2,3}.crt`, `elasticsearch-{1,2,3}.key`, `elastic-stack-ca.crt.pem` | `security/certificates/elastic/elasticsearch/` | +| **OpenDashboard (OpenSearch)** | `admin.pem`, `admin-key.pem`, `es_kibana_client.pem`, `es_kibana_client.key` | `security/certificates/elastic/opensearch/` | + +All certificate references in `services/kibana/config/kibana_opensearch.yml` or `services.yml` must point to these locations. + +--- + +## 🔐 Users and roles + +### OpenSearch + +1. Edit `security/es_roles/opensearch/internal_users.yml` to define users. +2. Optionally generate password hashes: + + ```bash + bash ./create_opensearch_internal_passwords.sh + ``` + +3. Apply changes by recreating containers: + + ```bash + docker compose down -v + docker compose up -d + ``` + +4. Use `create_opensearch_users.sh` to populate roles and user mappings. + +OpenSearch includes default roles (`admin`, `kibanaserver`, `readall`, `snapshotrestore`, etc.) — always change their passwords after first run. + +--- + +### Elasticsearch (native) + +Run after containers start: + +```bash +bash ./create_es_native_credentials.sh +``` + +This script creates system users, roles, and a service account token for Kibana. + +You can modify credentials in `security/env/elasticsearch_users.env`. + +**New roles** created: + +- `ingest` — for NiFi and pipeline ingestion (`cogstack_*`, `nifi_*` indices) +- `cogstack_access` — read-only access to `cogstack_*` and `nifi_*` + +**New users**: + +- `nifi` → `ingest` +- `cogstack_user` → `cogstack_access` + +--- + +## ⚠️ Notes + +- The `security/certificates/` folder is also **mounted inside NiFi** so NiFi processors can access ES/OS securely without restarting. +- For OpenSearch role details, see the [OpenSearch Security Plugin documentation](https://opensearch.org/docs/latest/security-plugin/index/). +- For Elasticsearch, refer to the [official Elastic Security docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/configuring-security.html). + +--- + +## ✅ Verification + +To verify HTTPS access and trust: + +```bash +curl -vk --cacert ./root-ca.pem https://elasticsearch-1:9200 +``` + +To check inter-node encryption (inside a container): + +```bash +openssl s_client -connect elasticsearch-2:9300 -CAfile ./root-ca.pem +``` From e9ee98e37634b3dad06ee08089d08db847b356ec Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:25:50 +0000 Subject: [PATCH 06/13] NiFi: updated dockerfile groovy version. --- nifi/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nifi/Dockerfile b/nifi/Dockerfile index 03df83bea..66ad8fc89 100644 --- a/nifi/Dockerfile +++ b/nifi/Dockerfile @@ -98,10 +98,10 @@ WORKDIR /opt/nifi/nifi-current/lib/ RUN mkdir -p /opt/nifi/groovy WORKDIR /opt/nifi/groovy/ -RUN curl https://archive.apache.org/dist/groovy/5.0.0/distribution/apache-groovy-binary-5.0.0.zip --output apache-groovy-binary-5.0.0.zip --max-time 3600 && \ - unzip apache-groovy-binary-5.0.0.zip && \ - rm apache-groovy-binary-5.0.0.zip -ENV GROOVY_BIN=/opt/nifi/groovy/groovy-5.0.0/bin +RUN curl https://archive.apache.org/dist/groovy/5.0.2/distribution/apache-groovy-binary-5.0.2.zip --output apache-groovy-binary-5.0.2.zip --max-time 3600 && \ + unzip apache-groovy-binary-5.0.2.zip && \ + rm apache-groovy-binary-5.0.2.zip +ENV GROOVY_BIN=/opt/nifi/groovy/groovy-5.0.2/bin RUN $GROOVY_BIN/grape -V install org.apache.avro avro 1.12.0 # copy configuration files From 5c2b02c6e75644ad69d94ba837e2526fcb6424ed Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:53:02 +0000 Subject: [PATCH 07/13] NiFi: restructured user-schema folder. --- nifi/user-schemas/avro/.keep | 0 nifi/user-schemas/elasticsearch/base_index_settings.json | 3 +-- nifi/user-schemas/elasticsearch/indices/.keep | 0 nifi/user-schemas/elasticsearch/templates/.keep | 0 nifi/user-schemas/json/.keep | 0 ...ack_common_schema_elasticsearch_index_mapping_template.json | 0 .../{ => legacy}/cogstack_common_schema_mapping.json | 0 7 files changed, 1 insertion(+), 2 deletions(-) create mode 100644 nifi/user-schemas/avro/.keep create mode 100644 nifi/user-schemas/elasticsearch/indices/.keep create mode 100644 nifi/user-schemas/elasticsearch/templates/.keep create mode 100644 nifi/user-schemas/json/.keep rename nifi/user-schemas/{ => legacy}/cogstack_common_schema_elasticsearch_index_mapping_template.json (100%) rename nifi/user-schemas/{ => legacy}/cogstack_common_schema_mapping.json (100%) diff --git a/nifi/user-schemas/avro/.keep b/nifi/user-schemas/avro/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/nifi/user-schemas/elasticsearch/base_index_settings.json b/nifi/user-schemas/elasticsearch/base_index_settings.json index b150ec474..9f8a2a59a 100644 --- a/nifi/user-schemas/elasticsearch/base_index_settings.json +++ b/nifi/user-schemas/elasticsearch/base_index_settings.json @@ -11,7 +11,6 @@ "dynamic_date_formats": [ "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd", - "epoch_millis", "basic_date", "date_hour", "date_hour_minute", @@ -36,7 +35,7 @@ "dates": { "match_mapping_type": "string", "match_pattern": "regex", - "match": "(?i).*(date|time|when|dt|dttm|timestamp|created|updated|modified|inserted|recorded|logged|entered|performed|signed|cosigned|completed|admit|discharge|visit|appointment|service|start|end|effective|expiry|validfrom|validto|close)$", + "match": "(?i).*(date|_datetime|_dt|_dttm|_dat|_datime|_time|_ts|_timestamp|time|when|dt|dttm|timestamp|created|updated|modified|inserted|recorded|logged|entered|performed|signed|cosigned|completed|admit|discharge|visit|appointment|service|start|end|effective|expiry|validfrom|validto|close)$", "mapping": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||basic_date||date_hour_minute_second" diff --git a/nifi/user-schemas/elasticsearch/indices/.keep b/nifi/user-schemas/elasticsearch/indices/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/nifi/user-schemas/elasticsearch/templates/.keep b/nifi/user-schemas/elasticsearch/templates/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/nifi/user-schemas/json/.keep b/nifi/user-schemas/json/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/nifi/user-schemas/cogstack_common_schema_elasticsearch_index_mapping_template.json b/nifi/user-schemas/legacy/cogstack_common_schema_elasticsearch_index_mapping_template.json similarity index 100% rename from nifi/user-schemas/cogstack_common_schema_elasticsearch_index_mapping_template.json rename to nifi/user-schemas/legacy/cogstack_common_schema_elasticsearch_index_mapping_template.json diff --git a/nifi/user-schemas/cogstack_common_schema_mapping.json b/nifi/user-schemas/legacy/cogstack_common_schema_mapping.json similarity index 100% rename from nifi/user-schemas/cogstack_common_schema_mapping.json rename to nifi/user-schemas/legacy/cogstack_common_schema_mapping.json From c0428110f13afee7d68cd713e31f220d2ee53537 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:54:26 +0000 Subject: [PATCH 08/13] NiFi: updated json schema record converter refs to new field remapping paths. --- nifi/user-python-extensions/convert_json_record_schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi/user-python-extensions/convert_json_record_schema.py b/nifi/user-python-extensions/convert_json_record_schema.py index ced44df79..649be8316 100644 --- a/nifi/user-python-extensions/convert_json_record_schema.py +++ b/nifi/user-python-extensions/convert_json_record_schema.py @@ -38,7 +38,7 @@ def __init__(self, jvm: JVMView): """ self.jvm = jvm - self.json_mapper_schema_path: str = "/opt/nifi/user-schemas/cogstack_common_schema_mapping.json" + self.json_mapper_schema_path: str = "/opt/nifi/user-schemas/json/cogstack_common_schema_mapping.json" self.preserve_non_mapped_fields: bool = True # this is directly mirrored to the UI @@ -47,7 +47,7 @@ def __init__(self, jvm: JVMView): description="The path to the json schema mapping file, " \ "the schema directory is mounted as a volume in" \ " the nifi container in the /opt/nifi/user-schemas/ folder", - default_value="/opt/nifi/user-schemas/cogstack_common_schema_mapping.json", + default_value="/opt/nifi/user-schemas/json/cogstack_common_schema_mapping.json", required=True, validators=[StandardValidators.NON_EMPTY_VALIDATOR]), PropertyDescriptor(name="preserve_non_mapped_fields", From 276fd68b6b0b39d732fd4c07b16150ee23386e9f Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Fri, 31 Oct 2025 11:54:56 +0000 Subject: [PATCH 09/13] NiFi: docker image recreation script updt. --- nifi/recreate_nifi_docker_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi/recreate_nifi_docker_image.sh b/nifi/recreate_nifi_docker_image.sh index 30f5f5d7a..9bfeb4043 100644 --- a/nifi/recreate_nifi_docker_image.sh +++ b/nifi/recreate_nifi_docker_image.sh @@ -13,4 +13,4 @@ if [[ $NIFI_GID == 1000 ]]; then NIFI_GID=$(id -g) fi -docker build --build-arg GID=${NIFI_GID} --build-arg UID=${NIFI_UID} -t cogstacksystems/cogstack-nifi:latest -f Dockerfile . \ No newline at end of file +docker build --build-arg GID=${NIFI_GID} --build-arg UID=${NIFI_UID} -t cogstacksystems/cogstack-nifi:latest -f Dockerfile . From ef50ad4a6fb5ba9b985dacfebaac361829b48360 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Tue, 4 Nov 2025 17:36:40 +0000 Subject: [PATCH 10/13] NiFi: updated user-scripts (refactored nifi base processor) + added file download util. --- nifi/user-scripts/utils/generic.py | 26 ++++- .../utils/helpers/base_nifi_processor.py | 95 +++++++++++++++---- 2 files changed, 99 insertions(+), 22 deletions(-) diff --git a/nifi/user-scripts/utils/generic.py b/nifi/user-scripts/utils/generic.py index 0f17e3bfc..270c93a66 100644 --- a/nifi/user-scripts/utils/generic.py +++ b/nifi/user-scripts/utils/generic.py @@ -1,7 +1,9 @@ import json import logging import os +import ssl import traceback +import urllib.request from collections import defaultdict from typing import Union @@ -34,7 +36,7 @@ def dict2json_truncate_add_to_file(input_dict: dict, file_path: str): dict2json_file(input_dict, file_path) -def dict2jsonl_file(input_dict: Union[dict| defaultdict], file_path: str): +def dict2jsonl_file(input_dict: dict | defaultdict, file_path: str): with open(file_path, 'a', encoding='utf-8') as outfile: for k,v in input_dict.items(): o = {k: v} @@ -61,6 +63,28 @@ def get_logger(name: str) -> logging.Logger: logger.propagate = False return logger +def download_file_from_url(url: str, output_path: str, ssl_verify: bool = False, chunk_size: int = 8192) -> None: + """Download a file from a URL to a local destination. + + Args: + url (str): The URL of the file to download. + output_path (str): The local file path to save the downloaded file. + ssl_verify (bool): Whether to verify SSL certificates. Defaults to False. + chunk_size (int): Size of data chunks to read at a time. Defaults to 8192 bytes. + + Raises: + Exception: If the download fails. + """ + + try: + context = ssl.create_default_context() if ssl_verify else ssl._create_unverified_context() + + with urllib.request.urlopen(url, context=context) as response, open(output_path, 'wb') as out_file: + while chunk := response.read(chunk_size): + out_file.write(chunk) + + except Exception as e: + raise Exception(f"Failed to download file from {url} to {output_path}: {e}") from e # ----------------------------------------------------------------------------------------------------------------- # Function for handling property parsing, used in NiFi processors mainly, but can beused elsewhere diff --git a/nifi/user-scripts/utils/helpers/base_nifi_processor.py b/nifi/user-scripts/utils/helpers/base_nifi_processor.py index 986cf3ea8..a40523e07 100644 --- a/nifi/user-scripts/utils/helpers/base_nifi_processor.py +++ b/nifi/user-scripts/utils/helpers/base_nifi_processor.py @@ -1,7 +1,5 @@ -import io import json import logging -import sys import traceback from logging import Logger @@ -13,30 +11,77 @@ ) from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject, JVMView - -# this script is using a custom utility for decompressing Cerner blobs -# from nifi/user-python-extensions/record_decompress_cerner_blob.py -# we need to add it to the sys imports -sys.path.insert(0, "/opt/nifi/user-scripts") - -from utils.generic import parse_value # noqa: I001,E402 - +from utils.generic import parse_value + + +def _make_wrapper_method(name): + """Return a function that delegates to the base's implementation on self.""" + def wrapper(self, *args, **kwargs): + # call Base class implementation + base_impl = getattr(super(self.__class__, self), name, None) + if base_impl is None: + raise AttributeError(f"Base class missing {name}") + return base_impl(*args, **kwargs) + wrapper.__name__ = name + return wrapper + +def nifi_processor(*, processor_details: dict | None = None): + """ + NOTE (4-11-2025): at the moment this decorator is a bit useless as the curre + NiFi version does not support automatic discovery of processor details from Python processors + it only scans for the Java nested class "ProcessorDetails" and stops there, limited + discovery capabilities for now. Hopefully in future versions this can be used. + + Class decorator that injects: + - class Java with implements set + - class ProcessorDetails (optional) + - thin wrappers for getPropertyDescriptors, getRelationships, transform + Use like: + @nifi_processor(processor_details={"version":"0.1.0"}) + class MyProc(BaseNiFiProcessor): ... + """ + def decorator(cls): + # Inject Java if missing (exact nested-class syntax NiFi looks for) + if not hasattr(cls, "Java"): + class Java: + implements = ["org.apache.nifi.python.processor.FlowFileTransform"] + cls.Java = Java + + # Inject ProcessorDetails if provided and missing + if processor_details and not hasattr(cls, "ProcessorDetails"): + class ProcessorDetails: + pass + for k, v in processor_details.items(): + setattr(ProcessorDetails, k, v) + cls.ProcessorDetails = ProcessorDetails + + # Ensure NiFi-visible methods exist on the class itself: + # If subclass hasn't defined them, create a thin delegating wrapper + for method_name in ("getPropertyDescriptors", "getRelationships", "transform"): + if not hasattr(cls, method_name): + setattr(cls, method_name, _make_wrapper_method(method_name)) + + return cls + return decorator + class BaseNiFiProcessor(FlowFileTransform): - """Base class providing common NiFi Python processor utilities.""" + """Base class providing common NiFi Python processor utilities. + NOTE: This is an example implementation meant to be reimplemented by processors inheriting it . + For the moment all inherting processors must define + their own Java and ProcessorDetails nested classes unfortunately, until dynamic. + """ identifier = None logger: Logger = Logger(__qualname__) - - class Java: - implements = ['org.apache.nifi.python.processor.FlowFileTransform'] - - class ProcessorDetails: - version = '0.0.1' - def __init__(self, jvm: JVMView): """ + This is the base class for all NiFi Python processors. It provides common functionality + such as property handling, relationship definitions, and logging. + + This is an example implementation meant to be reimplemented by processors inheriting it . + Args: jvm (JVMView): Required, Store if you need to use Java classes later. """ @@ -44,6 +89,7 @@ def __init__(self, jvm: JVMView): self.logger: Logger = logging.getLogger(self.__class__.__name__) self.process_context: ProcessContext + # Example properties — meant to be overridden or extended in subclasses self.sample_property_one: bool = True self.sample_property_two: str = "" self.sample_property_three: str = "default_value_one" @@ -54,7 +100,7 @@ def __init__(self, jvm: JVMView): description="sample property one description", default_value="true", required=True, - validators=StandardValidators.BOOLEAN_VALIDATOR), + validators=[StandardValidators.BOOLEAN_VALIDATOR]), PropertyDescriptor(name="sample_property_two", description="sample property two description", required=False, @@ -64,7 +110,7 @@ def __init__(self, jvm: JVMView): description="sample property three description", default_value="default_value_one", allowable_values=["default_value_one", "default_value_two", "default_value_three"], - validators=StandardValidators.NON_EMPTY_VALIDATOR) + validators=[StandardValidators.NON_EMPTY_VALIDATOR]) ] self._relationships = [ @@ -81,6 +127,8 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships + self.logger.info(f"Initialized {self.__class__.__name__} processor.") + def getRelationships(self) -> list[Relationship]: return self.relationships @@ -100,13 +148,18 @@ def set_properties(self, properties: dict) -> None: self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})") def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore - """ Main processor logic. This example reads Avro records from the incoming flowfile, + """ + NOTE: This is a sample method meant to be overridden and reimplemented by subclasses. + + Main processor logic. This example reads Avro records from the incoming flowfile, and writes them back out to a new flowfile. It also adds the processor properties to the flowfile attributes. IT IS A SAMPLE ONLY, you are meant to use this as a starting point for other processors """ output_contents = [] try: + self.process_context: ProcessContext = context + self.set_properties(context.getProperties()) # add properties to flowfile attributes attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore self.logger.info("Successfully transformed Avro content for OCR") From 44d6d2738e2c45be283d7e17537b78b7087f867d Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 5 Nov 2025 10:55:29 +0000 Subject: [PATCH 11/13] Docs update (added workflow status). --- .github/workflows/doc-build.yml | 2 +- README.md | 4 ++++ docs/index.rst | 8 ++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.github/workflows/doc-build.yml b/.github/workflows/doc-build.yml index ecf1f86fc..24b29f05f 100644 --- a/.github/workflows/doc-build.yml +++ b/.github/workflows/doc-build.yml @@ -1,4 +1,4 @@ -name: sphinx-doc-build +name: doc-build on: push: diff --git a/README.md b/README.md index 31c2aafa3..0fb963b42 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # CogStack-NiFi +[![nifi](https://github.com/CogStack/CogStack-NiFi/actions/workflows/docker-nifi.yml/badge.svg?branch=main)](https://github.com/CogStack/CogStack-NiFi/actions/workflows/docker-nifi.yml) +[![doc-build](https://github.com/CogStack/CogStack-NiFi/actions/workflows/doc-build.yml/badge.svg?branch=main)](https://github.com/CogStack/CogStack-NiFi/actions/workflows/doc-build.yml) +[![elasticsearch-stack](https://github.com/CogStack/CogStack-NiFi/actions/workflows/docker-elasticsearch-stack.yml/badge.svg?branch=main)](https://github.com/CogStack/CogStack-NiFi/actions/workflows/docker-elasticsearch-stack.yml) + ## Introduction This repository proposes a possible next step in the evolution of free-text data processing originally implemented in [CogStack-Pipeline](https://github.com/CogStack/CogStack-Pipeline), moving towards a more modular, Platform-as-a-Service (PaaS) approach. diff --git a/docs/index.rst b/docs/index.rst index 9a9286256..c2ee0db63 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -11,12 +11,16 @@ Welcome to CogStack-Nifi's documentation! :caption: Contents: main.md + news.md nifi/main.md + security/main.md + security/certificates.md + security/elasticsearch_opensearch.md + security/nifi.md + security/services.yml deploy/main.md deploy/services.md deploy/workflows.md - security.md - news.md Indices and tables From 5209b7c3132878c9e7f58d27726b536e70972279 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 5 Nov 2025 11:13:13 +0000 Subject: [PATCH 12/13] NiFi: added python processor logging opts. --- nifi/conf/logback.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nifi/conf/logback.xml b/nifi/conf/logback.xml index b2e7914eb..6d0a61dcf 100644 --- a/nifi/conf/logback.xml +++ b/nifi/conf/logback.xml @@ -177,6 +177,14 @@ + + + + + + + + From 304dae0f99cc18daa08fa033c1fb550dcab27b57 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 5 Nov 2025 16:14:06 +0000 Subject: [PATCH 13/13] Deploy: Makefile update (j-hub compose file ref). --- deploy/Makefile | 2 +- deploy/export_env_vars.sh | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/deploy/Makefile b/deploy/Makefile index 529d7ee88..83e4e2de2 100644 --- a/deploy/Makefile +++ b/deploy/Makefile @@ -64,7 +64,7 @@ start-samples: $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) samples-db start-jupyter: - $(WITH_ENV) docker compose -f ../services/cogstack-jupyter-hub/docker/docker-compose.yml $(DC_START_CMD) cogstack-jupyter-hub + $(WITH_ENV) docker compose -f ../services/cogstack-jupyter-hub/docker/docker-compose.base.yml -f ../services/cogstack-jupyter-hub/docker/docker-compose.prod.yml $(DC_START_CMD) cogstack-jupyter-hub start-medcat-service: $(WITH_ENV) docker compose -f ../services/cogstack-nlp/medcat-service/docker/docker-compose.yml $(DC_START_CMD) nlp-medcat-service-production diff --git a/deploy/export_env_vars.sh b/deploy/export_env_vars.sh index 6405afd23..ea8266095 100755 --- a/deploy/export_env_vars.sh +++ b/deploy/export_env_vars.sh @@ -8,8 +8,6 @@ echo "🔧 Running $(basename "${BASH_SOURCE[0]}")..." set -a -current_dir=$(pwd) - SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" DEPLOY_DIR="$SCRIPT_DIR" SECURITY_DIR="$SCRIPT_DIR/../security/env"