Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
.vscode
.venv
.ruff_cache
venv
**__pycache__
**/venv

# keys and certificates
*.pem
Expand Down
9 changes: 8 additions & 1 deletion deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set -a && source ./export_env_vars.sh;
endef

# utility commands

git-freeze-security:
@../scripts/git_freeze_security.sh

Expand All @@ -19,8 +20,14 @@ git-unfreeze-security:
git-update-submodules:
@../scripts/git_update_submodules_in_repo.sh

load-env:
$(WITH_ENV) echo "Environment variables loaded."

show-env:
${WITH_ENV} >/dev/null 2>&1; printenv | sort

# start services
#

start-nifi:
$(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow

Expand Down
100 changes: 94 additions & 6 deletions docs/deploy/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,92 @@ This design allows each service to be:
- NiFi-specific configuration (properties, custom processors, drivers, Python scripts, etc.) is under:
[`./nifi`](https://github.com/CogStack/CogStack-NiFi/tree/main/nifi/)

## 🧰 Makefile Command Overview

A concise reference for controlling the full CogStack deployment stack (NiFi, Elasticsearch, JupyterHub, MedCAT, OCR-service, GitEA, Beats, DB, etc.).
All commands automatically load environment variables via `export_env_vars.sh`.

---

### 🔧 Utilities

| Command | Description |
|------------------------|---------------------------------------------|
| `make load-env` | Load all environment variables |
| `make show-env` | Print environment variables (sorted) |
| `make git-freeze-security` | Freeze all security submodules (read-only) |
| `make git-unfreeze-security` | Unfreeze security submodules |
| `make git-update-submodules` | Update all submodules |

---

### 🚀 Start Services

| Command | Description |
|---------------------------------|-------------|
| `make start-nifi` | Start NiFi, NiFi-Nginx, NiFi Registry |
| `make start-elastic` | Start ES-1, ES-2, Kibana |
| `make start-elastic-cluster` | Start ES-1, ES-2, ES-3 |
| `make start-elastic-1/2/3` | Start individual Elasticsearch nodes |
| `make start-metricbeat-1/2/3` | Start Metricbeat agents |
| `make start-filebeat-1/2/3` | Start Filebeat agents |
| `make start-kibana` | Start Kibana only |
| `make start-samples` | Start samples DB |
| `make start-jupyter` | Start JupyterHub (prod config) |
| `make start-medcat-service` | Start MedCAT service |
| `make start-medcat-service-deid`| Start DE-ID MedCAT service |
| `make start-medcat-trainer` | Start MedCAT Trainer + Solr + Nginx |
| `make start-ocr-services` | Start OCR-service (full + text-only) |
| `make start-git-ea` | Start GitEA |
| `make start-production-db` | Start Databank DB |
| **`make start-data-infra`** | Start NiFi + Elastic + Samples DB |
| **`make start-all`** | Full stack: data infra + NLP + JupyterHub + OCR |

---

### 🛑 Stop Services

| Command | Description |
|---------------------------------|-------------|
| `make stop-nifi` | Stop NiFi stack |
| `make stop-elastic` | Stop ES-1, ES-2, Kibana |
| `make stop-elastic-cluster` | Stop ES-1, ES-2 |
| `make stop-elastic-1/2/3` | Stop individual ES nodes |
| `make stop-metricbeat-1/2/3` | Stop Metricbeat agents |
| `make stop-filebeat-1/2/3` | Stop Filebeat agents |
| `make stop-kibana` | Stop Kibana |
| `make stop-samples` | Stop samples DB |
| `make stop-jupyter` | Stop JupyterHub |
| `make stop-medcat-service` | Stop MedCAT service |
| `make stop-medcat-service-deid` | Stop DE-ID MedCAT service |
| `make stop-medcat-trainer` | Stop MedCAT Trainer stack |
| `make stop-ocr-services` | Stop OCR-service stack |
| `make stop-git-ea` | Stop GitEA |
| `make stop-production-db` | Stop Databank DB |
| **`make stop-data-infra`** | Stop NiFi + Elastic + Samples |
| **`make stop-all`** | Stop entire stack |

---

### 🧹 Cleanup

| Command | Description |
|------------------|---------------------------------------------|
| `make down-all` | Docker Compose `down` for all core services |
| `make cleanup` | Full teardown, including volumes |

---

### 📝 Notes

- All `start-*` commands use `docker compose -f services.yml` unless referencing a specific service’s Dockerfile.
- `start-all` and `stop-all` act as the top-level orchestration entry points.
- Environment variables are **always sourced** using the integrated `WITH_ENV` macro.

---

If you want, I can also generate a **minimal cheat sheet**, or an **ASCII tree diagram** that shows how `start-all` expands into all services.

## 🚀 Starting the Services

All core services defined in `services.yml` can be started using the Makefile in the `deploy/` directory.
Expand All @@ -69,7 +155,7 @@ This is useful for:

---

#### 🧩 Core NiFi Services
### 🧩 Core NiFi Services

```bash
make start-nifi
Expand Down Expand Up @@ -105,6 +191,8 @@ Ideal for running ingestion pipelines and ETL workflows.

#### 🛢️ Elasticsearch / OpenSearch Services

Please note that to switch from OpenSearch (Amazon open-source fork) to ElasticSearch you will need to change some environment variables, see the [configuration](./configuration.md) section.

```bash
make start-elastic
```
Expand Down Expand Up @@ -137,7 +225,7 @@ Starts Kibana for inspecting logs, checking index mappings, monitoring ES health

---

#### 🗄️ Databases
### 🗄️ Databases

```bash
make start-samples
Expand All @@ -155,7 +243,7 @@ Use when testing SQL ingestion or verifying DB-driven NiFi flows.

---

#### 📚 JupyterHub
### 📚 JupyterHub

```bash
make start-jupyter
Expand All @@ -165,7 +253,7 @@ Starts the CogStack JupyterHub instance. Used for notebooks, analysis, model tes

---

#### 🧠 NLP Services (MedCAT & Trainer)
### 🧠 NLP Services (MedCAT Service & Trainer)

```bash
make start-medcat-service
Expand All @@ -187,7 +275,7 @@ Starts the full MedCAT Trainer stack (Trainer UI + Solr + NGINX). Useful for ann

---

#### 📝 OCR Services
### 📝 OCR Services

```bash
make start-ocr-services
Expand All @@ -202,7 +290,7 @@ Use for PDF ingestion, OCR debugging, and pipeline validation.

---

#### 🛠️ Miscellaneous Services (GIT EA)'
### 🛠️ Miscellaneous Services (GIT EA)

```bash
make start-git-ea
Expand Down
4 changes: 2 additions & 2 deletions docs/deploy/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Execute the following commands in the root directory of the repo:

1. `git-lfs pull`
2. (OPTIONAL, if you already have the software in [this section installed](#-software-requirements-linuxmacos))`sudo bash ./scripts/installation_utils/install_docker_and_utils.sh` , and wait for it to finish, it may take a while to get all the packages..
3. `sudo bash ./scripts/git_update_submodules_in_repo.sh`
3. `cd deploy && git-update-submodules`
4. check that docker works correctly : `docker pull hello-world`
5. if no errors, run: `docker run --rm hello-world`, it should run without issues
6. if there are any issues check the below warning section
Expand All @@ -54,7 +54,7 @@ IMPORTANT NOTE: Do a `git-lfs pull` so that you have everything downloaded from

:::{warning}
Ensure all Git submodules are initialized and updated:
`sudo bash ./scripts/git_update_submodules_in_repo.sh`
`cd deploy && git-update-submodules`
:::

:::{warning}
Expand Down
69 changes: 27 additions & 42 deletions nifi/user-python-extensions/convert_avro_binary_field_to_base64.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,44 @@
import sys

sys.path.insert(0, "/opt/nifi/user-scripts")

import base64
import copy
import io
import json
import sys
import traceback
from logging import Logger

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro.schema import RecordSchema, Schema, parse
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import (
ProcessContext,
PropertyDescriptor,
StandardValidators,
)
from nifiapi.relationship import Relationship
from overrides import override
from py4j.java_gateway import JavaObject, JVMView
from utils.helpers.base_nifi_processor import BaseNiFiProcessor

# we need to add it to the sys imports
sys.path.insert(0, "/opt/nifi/user-scripts")

from utils.generic import parse_value # noqa: I001,E402
class ConvertAvroBinaryRecordFieldToBase64(BaseNiFiProcessor):
"""NiFi Python processor to convert a binary field in Avro records to base64-encoded string.

Reads each FlowFile as Avro, locates the configured binary_field_name, and rewrites the Avro schema,
so that field becomes a nullable string, preventing NiFi’s JSON
converters from turning raw bytes into integer arrays.

class ConvertAvroBinaryRecordFieldToBase64(FlowFileTransform):
identifier = None
logger: Logger = Logger(__qualname__)
Streams every record through a new Avro writer, base64-encoding the binary payload when operation_mode=base64
(or leaving bytes untouched for raw), then reattaching the remaining fields
so downstream processors still see the original record structure.

Emits the updated Avro binary along success with attributes capturing document ID field,
binary field, mode, and MIME type application/avro-binary;

Exception routes to failure after logging a stack trace.
"""

class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
Expand All @@ -36,11 +47,7 @@ class ProcessorDetails:
version = '0.0.1'

def __init__(self, jvm: JVMView):
"""
Args:
jvm (JVMView): Required, Store if you need to use Java classes later.
"""
self.jvm = jvm
super().__init__(jvm)

self.operation_mode: str = "base64"
self.binary_field_name: str = "binarydoc"
Expand Down Expand Up @@ -78,31 +85,9 @@ def __init__(self, jvm: JVMView):

self.descriptors: list[PropertyDescriptor] = self._properties
self.relationships: list[Relationship] = self._relationships

def getRelationships(self) -> list[Relationship]:
return self.relationships

def getPropertyDescriptors(self) -> list[PropertyDescriptor]:
return self.descriptors

def set_logger(self, logger: Logger):
self.logger = logger

def set_properties(self, properties: dict) -> None:
""" Gets the properties from the processor's context and sets them as instance variables.

Args:
properties (dict): dictionary containing property names and values.
"""

for k, v in properties.items():
name = k.name if hasattr(k, "name") else str(k)
val = parse_value(v)
if hasattr(self, name):
setattr(self, name, val)
self.logger.debug(f"property set '{name}' -> {val!r} (type={type(val).__name__})")

def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: # type: ignore
@override
def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
Transforms an Avro flow file by converting a specified binary field to a base64-encoded string.

Expand All @@ -115,14 +100,15 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
Exception: For any other errors during Avro processing.

Returns:
FlowFileTransformResult: The result containing the transformed flow file, updated attributes, and relationship.
FlowFileTransformResult: The result containing the transformed flow file, updated attributes,
and relationship.
"""
try:
self.process_context = context
self.set_properties(context.getProperties())

# read avro record
input_raw_bytes: bytearray = flowFile.getContentsAsBytes() # type: ignore
input_raw_bytes: bytes = flowFile.getContentsAsBytes()
input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader())

Expand Down Expand Up @@ -171,8 +157,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
writer.flush()
output_byte_buffer.seek(0)

# add properties to flowfile attributes
attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} # type: ignore
attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
attributes["document_id_field_name"] = str(self.document_id_field_name)
attributes["binary_field"] = str(self.binary_field_name)
attributes["operation_mode"] = str(self.operation_mode)
Expand Down
Loading
Loading