Skip to content

bethvourc/cadence2

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Cadence v2.0 (Flink-Python CSV Processor with SQLite Integration)

This project is a hybrid data processing pipeline that leverages Apache Flink (via PyFlink) and native Python to process large CSV files, clean and chunk them, and store them in a local SQLite database.

Project Overview

The pipeline performs the following steps:

  1. Flink Job Execution:

    • Reads large CSV files in chunks using pandas.
    • Cleans and formats the data rows.
    • Splits them into smaller chunk_*.csv files for manageable processing.
  2. Python-SQLite Integration:

    • Reads each chunked file.
    • Inserts cleaned data into a local hsi.db SQLite database.

This setup is containerized using Docker and deploys Flink in Session Cluster mode via Docker Compose.


Folder Structure

cadence/
├── data/
│   ├── uploads/               # Input CSV files
│   ├── chunks/                # Output files written by Flink for SQLite ingestion
│   └── hsi.db                 # SQLite database (auto-created)
├── flink_pipeline.py          # Main processing script
├── Dockerfile                 # Builds the PyFlink client container
├── docker-compose.yml         # Spins up Flink session cluster (JobManager + TaskManager)
├── requirements.txt           # Python dependencies

Requirements

  • Docker & Docker Compose
  • Python 3.10 (used in container)
  • CSV input files in data/uploads/ directory

Setup & Usage

1️. Start Flink Session Cluster

From the root directory:

docker compose up -d

This starts:

  • JobManager on port 8081 (Web UI)
  • TaskManager connected to the same Flink session

Access the Flink dashboard: http://localhost:8081


2️. Build the PyFlink Client

docker build -t pyflink-client -f Dockerfile .

3️. Run the Processing Job

docker run --rm \
  --network=cadence_flink-net (replace with your local network - docker newtwork ls) \
  -v $(pwd)/data:/app/data \
  pyflink-client

This will:

  • Process files inside /app/data/uploads
  • Output chunked chunk_*.csv files into /app/data/chunks
  • Populate the hsi.db SQLite database

What the Job Does

flink_pipeline.py

  1. Preprocesses Input Files

    • Cleans columns like Index, Open, CloseUSD, etc.
    • Converts rows into CSV string format.
  2. Writes Chunked Files

    • Each chunk contains 10,000 rows by default.
    • Files are saved to /app/data/chunks/chunk_N.csv.
  3. SQLite Insertion

    • Loads each chunk_*.csv
    • Inserts rows into an SQLite table: hsi_data

Sample Schema: hsi_data

CREATE TABLE hsi_data (
    index_name TEXT,
    date TEXT,
    open REAL,
    high REAL,
    low REAL,
    close REAL,
    adj_close REAL,
    volume INTEGER,
    close_usd REAL
);

Python Dependencies

Listed in requirements.txt:

apache-flink
pyflink
pandas

These are installed inside the PyFlink client Docker image.


Graceful Shutdown

To stop the Flink cluster:

docker compose down

To clear generated data:

rm -rf data/chunks/*
rm -f data/hsi.db

Future Improvements

  • Stream to Kafka instead of writing to disk
  • Add SQL-based filtering or aggregation
  • Use Flink Table API for advanced processing
  • Extend to output to PostgreSQL or Cloud DB

Author

Built by Bethvour Chike This is a hybrid Flink-native and Python-native data engineering project for fast, local CSV ingestion and analysis.

About

This project is a hybrid data processing pipeline that leverages Apache Flink (via PyFlink) and native Python to process large CSV files, clean and chunk them, and store them in a local SQLite database.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors