Welcome to the In-House Data Lake project! This comprehensive data lake solution empowers organizations to efficiently manage and process data in real-time. It seamlessly captures Change Data Capture (CDC) records from PostgreSQL using Debezium, streams them to Apache Kafka with Schema Registry, and performs incremental data processing with Apache Hudi. Processed data is stored in MinIO S3, with table metadata managed by Hive Metastore and exposed for interactive SQL queries via Trino (Presto). The entire setup is containerized with Docker for easy deployment, and it requires Docker and Apache Spark 3.4 installed as prerequisites.
Editable source for this architecture diagram:
screenshots/diagram-v2.mmd
flowchart LR
subgraph Source[Source Database]
PG[(PostgreSQL\n`hudidb`)]
end
subgraph CDC[CDC Layer]
DBZ[Debezium Connect\n:8083]
KAFKA[(Kafka)]
SR[Schema Registry\n:8081]
end
subgraph Lake[Data Lake on MinIO]
SPARK[Hudi Streamer\nSpark 3.4]
MINIO[(MinIO S3\nwarehouse bucket)]
HMS[Hive Metastore\n:9083]
end
subgraph Query[Query Layer]
TRINO[Trino / Presto\n:8080]
end
USER[[User / SQL Client]]
PG -- WAL/CDC --> DBZ
DBZ --> KAFKA
DBZ --> SR
SPARK -->|read CDC| KAFKA
SPARK -->|schema| SR
SPARK -->|write Hudi table| MINIO
SPARK -->|sync table metadata| HMS
TRINO -->|read metadata| HMS
TRINO -->|read Hudi files| MINIO
USER -->|run SQL| TRINO
Before you begin, ensure you have the following prerequisites installed on your system:
- Docker
- Apache Spark 3.4
To get started with this project, follow these steps:
-
Clone the repository:
git clone git@github.com:Rahul7794/datalake.git cd datalake -
Build and run the Docker containers:
cd hudi-datalake docker-compose up -d -
Once the docker containers are up and running, one should be able to view the containers by:
docker ps
-
Now It's time to run the debezium postgres connectors which dumps cdc changes to kafka topics
curl -H 'Content-Type: application/json' localhost:8083/connectors --data ' { "name": "transactions-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "hudidb", "plugin.name": "pgoutput", "database.port": "5432", "database.user": "postgres", "database.password": "root123", "database.dbname" : "dev", "topic.prefix": "test", "database.server.name": "test1", "schema.include.list": "v1" } }'
Property Description Example Value nameName of the connector, unique within Kafka Connect. transactions-connectorconnector.classClass of the connector to be used (Debezium connector for PostgreSQL). io.debezium.connector.postgresql.PostgresConnectordatabase.hostnameHostname of the PostgreSQL database server. hudidbplugin.nameName of the PostgreSQL plugin to be used for capturing changes. pgoutputdatabase.portPort on which the PostgreSQL database is listening. 5432database.userUsername to connect to the PostgreSQL database. postgresdatabase.passwordPassword for the PostgreSQL user. root123database.dbnameName of the PostgreSQL database from which change data should be captured. devtopic.prefixPrefix for Kafka topics to which change data will be streamed. testdatabase.server.nameUnique name for this database server (used as a namespace for Kafka topics). test1schema.include.listSchema(s) within the database to monitor for changes. v1 -
Now verify the topic information by running below command:
docker run --tty \ --network psql-kafka_default \ confluentinc/cp-kafkacat \ kafkacat -b kafka:9092 -C \ -s key=s -s value=avro \ -r http://schema-registry:8081 \ -t test1.v1.retail_transactions
-
Once the data is in kafka topic, we can now run the hudi deltastreamer which takes cdc changes from kafka and performs a continuous incremental processing and dumps processed data to defined location(localfolder/s3).
After verfiying spark version and downloading jar in the locationutilities-jarby downloading the jar from the url mentioned inutilities-jar/jar.txtspark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \ --properties-file config/spark-config.properties \ --master 'local[*]' \ --executor-memory 1g \ utilities-jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \ --table-type COPY_ON_WRITE \ --target-base-path file:///Users/rahul/PythonWorkSpace/datalake/hudidb/ \ --target-table retail_transactions \ --source-ordering-field tran_date \ --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \ --op UPSERT \ --continuous \ --source-limit 4000000 \ --min-sync-interval-seconds 20 \ --hoodie-conf bootstrap.servers=localhost:9092 \ --hoodie-conf schema.registry.url=http://localhost:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/test1.v1.retail_transactions-value/versions/latest \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=test1.v1.retail_transactions \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=tran_id \ --hoodie-conf hoodie.datasource.write.partitionpath.field=store_city \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.precombine.field=tran_dateCommand explanation
| Property | Description | Example Value |
|---|---|---|
--class |
Main class to be executed with Spark. | org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer |
--packages |
Comma-separated list of packages to be used by Spark. | org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 |
--properties-file |
Path to the properties file containing Spark configuration. | config/spark-config.properties |
--master |
Specifies the Spark master URL. | 'local[*]' |
--executor-memory |
Amount of memory to allocate per executor. | 1g |
<utilities-jar> |
Path to the Hudi utilities JAR file. | utilities-jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar |
--table-type |
Type of the Hudi table (COPY_ON_WRITE or MERGE_ON_READ). | COPY_ON_WRITE |
--target-base-path |
Base path where the Hudi dataset will be stored. | file:///Users/rahul/PythonWorkSpace/datalake/hudidb/ |
--target-table |
Name of the target Hudi table. | retail_transactions |
--source-ordering-field |
Field used for ordering incoming records. | tran_date |
--source-class |
Source class for reading data (e.g., Debezium source). | org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource |
--payload-class |
Class for parsing the payload (e.g., Debezium Avro payload). | org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload |
--op |
Operation type (e.g., UPSERT). | UPSERT |
--continuous |
Indicates continuous streaming mode. | (Flag, no value) |
--source-limit |
Maximum number of records to read from the source. | 4000000 |
--min-sync-interval-seconds |
Minimum sync interval in seconds. | 20 |
--hoodie-conf |
Various Hudi configuration properties. Multiple flags can be used with different properties. | (Multiple properties, see examples below) |
bootstrap.servers |
Kafka bootstrap servers for connecting to Kafka. | localhost:9092 |
schema.registry.url |
URL for the Avro Schema Registry. | http://localhost:8081 |
hoodie.deltastreamer.schemaprovider.registry.url |
URL for the schema provider in DeltaStreamer. | http://localhost:8081/subjects/test1.v1.retail_transactions-value/versions/latest |
hoodie.deltastreamer.source.kafka.value.deserializer.class |
Kafka Avro deserializer class. | io.confluent.kafka.serializers.KafkaAvroDeserializer |
hoodie.deltastreamer.source.kafka.topic |
Kafka topic for source data. | test1.v1.retail_transactions |
auto.offset.reset |
Kafka offset reset strategy. | earliest |
hoodie.datasource.write.recordkey.field |
Field in the record used as the record key. | tran_id |
hoodie.datasource.write.partitionpath.field |
Field used for partitioning the data. | store_city |
hoodie.datasource.write.keygenerator.class |
Key generator class for Hudi. | org.apache.hudi.keygen.SimpleKeyGenerator |
hoodie.datasource.write.hive_style_partitioning |
Enable Hive-style partitioning. | true |
hoodie.datasource.write.precombine.field |
Field for pre-combining records. | tran_date |
The above writes the data to localfolder mentioned in --target-base-path
If we want to dump records to localstack s3 we need pass few extra properties information mentioned in config/spark-config-s3.properties and the spark submit commands will change slightly:
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \
--repositories 'https://repo.maven.apache.org/maven2' \
--properties-file config/spark-config-s3.properties \
--master 'local[*]' \
--executor-memory 1g \
utilities-jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
--table-type COPY_ON_WRITE \
--target-base-path s3a://warehouse/retail_transactions/ \
--target-table retail_transactions \
--source-ordering-field tran_date \
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
--op UPSERT \
--continuous \
--source-limit 4000000 \
--min-sync-interval-seconds 20 \
--hoodie-conf bootstrap.servers=localhost:9092 \
--hoodie-conf schema.registry.url=http://localhost:8081 \
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/test1.v1.retail_transactions-value/versions/latest \
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=test1.v1.retail_transactions \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf hoodie.datasource.write.recordkey.field=tran_id \
--hoodie-conf hoodie.datasource.write.partitionpath.field=store_city \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.precombine.field=tran_date- Monitor the data flow and storage under the path mentioned in
--target-base-path
To query Hudi data in MinIO with Trino, this repo now includes:
- Hive Metastore service (
hive-metastore) for table metadata - Trino coordinator (
trino) with Hudi catalog - Metastore DB (
metastore-db)
cd hudi-datalake
docker-compose up -dThis starts Trino at http://localhost:8080 and Hive Metastore at thrift://localhost:9083.
If you want to run the Spark Hudi job in a container (instead of local Spark), use:
cd hudi-datalake
docker-compose -f docker-compose.spark.yml run --rm spark-hudi-streamerNotes:
- This uses the same command from
commands.txt(S3 target + Hive sync) with container-friendly endpoints (kafka,schema-registry,minio,hive-metastore). - Keep the main stack up first via
docker-compose up -d. - Ensure
utilities-jar/hudi-utilities-slim-bundle_2.12-0.14.0.jarexists before running. - Docker run uses
config/spark-config-s3-docker.properties(endpointhttp://minio:9000).
The Spark container command already runs with --continuous, so keep it running in one terminal:
cd hudi-datalake
docker-compose -f docker-compose.spark.yml run --rm spark-hudi-streamerCleanup applied for continuous mode:
- Removed unnecessary
--source-limitfrom containerized continuous job - Removed redundant inline
spark.hadoop.fs.s3a.*overrides (already inconfig/spark-config-s3-docker.properties) - Switched deprecated keys to
hoodie.streamer.source.kafka.*
When writing Hudi data to MinIO (s3a://warehouse/retail_transactions/), include the Hive sync properties:
--enable-hive-sync \
--hoodie-conf hoodie.datasource.hive_sync.enable=true \
--hoodie-conf hoodie.datasource.hive_sync.mode=hms \
--hoodie-conf hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083 \
--hoodie-conf hoodie.datasource.hive_sync.database=default \
--hoodie-conf hoodie.datasource.hive_sync.table=retail_transactions \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=store_city \
--hoodie-conf hoodie.datasource.hive_sync.support_timestamp=trueThis registers/updates the Hudi table metadata in Hive Metastore so Trino can discover it.
Example checks:
SHOW CATALOGS;
SHOW SCHEMAS FROM hudi;
SHOW TABLES FROM hudi.default;
SELECT tran_id, tran_date, store_city, total
FROM hudi.default.retail_transactions
ORDER BY tran_id;Run these against Postgres while streamer is in continuous mode:
docker exec -it hudidb psql -U postgres -d dev -c "SET search_path TO v1; INSERT INTO retail_transactions VALUES (1001, CURRENT_DATE, 11, 'BOSTON', 'MA', 2, 44.50);"
docker exec -it hudidb psql -U postgres -d dev -c "SET search_path TO v1; UPDATE retail_transactions SET quantity = quantity + 1, total = total + 10 WHERE tran_id = 2;"
docker exec -it hudidb psql -U postgres -d dev -c "SET search_path TO v1; DELETE FROM retail_transactions WHERE tran_id = 3;"If UPDATE/DELETE fails with replica identity error on an existing DB, run once:
docker exec -it hudidb psql -U postgres -d dev -c "ALTER TABLE v1.retail_transactions REPLICA IDENTITY FULL;"(init.sh now sets this automatically for fresh environments.)
Then query from Trino:
SELECT tran_id, store_city, quantity, total
FROM hudi.default.retail_transactions
ORDER BY tran_id;You can also use the ready SQL file: hudi-datalake/sql/retail_transactions_cdc.sql.
hudi-datalake/docker-compose.yml- Added
metastore-db,hive-metastore,trinoservices - Parameterized MinIO/Postgres credentials through
.env
- Added
hudi-datalake/.env- Centralized local credentials for the stack
hudi-datalake/trino/etc/*- Added Trino coordinator + Hudi catalog configuration for MinIO + HMS
├── commands.txt
├── config
│ ├── spark-config-s3.properties
│ └── spark-config.properties
├── hudi-datalake
│ ├── connector.json
│ ├── .env
│ ├── docker-compose.yml
│ ├── docker-compose.spark.yml
│ ├── dockerfile
│ ├── hive
│ │ └── conf
│ │ └── core-site.xml
│ └── init.sh
│ ├── sql
│ │ └── retail_transactions_cdc.sql
│ └── trino
│ └── etc
│ ├── catalog
│ │ └── hudi.properties
│ ├── config.properties
│ ├── jvm.config
│ └── node.properties
├── readme.md
├── screenshots
│ └── dockerps.png
└── utilities-jar
└── jar.txt

