This project is an end-to-end data engineering pipeline that processes raw, noisy vessel data from a CSV file to produce a clean and comprehensive set of "golden records". It identifies unique vessels and consolidates their attributes, creating a single source of truth that is stored in a PostgreSQL database.
The pipeline is fully containerized using Docker and Docker Compose, making it portable and easy to run.
The application runs as a multi-container setup managed by Docker Compose, consisting of two main services:
postgres-db: A PostgreSQL database container to store the final, cleaned data.spark-app: A container that runs a PySpark script to perform the core data processing.
[source CSV] -> spark-app (PySpark) -> postgres-db (golden_records table)
- Docker
- Docker Compose
-
Build and Run the Pipeline
From the root of the project directory, run the following command:
docker-compose up --build
This command will:
- Build the Docker image for the
spark-appservice. - Start the
postgres-dbcontainer. - Once the database is ready, it will start the
spark-appcontainer. - The Spark job will automatically run, process the data, and save the results to the
golden_recordstable in the database. - The process will exit automatically once the Spark job is complete.
- Build the Docker image for the
-
Verify the Output
Once the job has finished, you can query the PostgreSQL database to see the results. The following command will connect to the database and show the first 10 golden records:
docker-compose exec postgres-db psql -U sparkuser -d vessels -c "SELECT imo, name_history, builtYear, destination FROM golden_records LIMIT 10;"
To observe our previous example which is most frequently occurring IMO (To view the output in a more readable, expanded format for wide tables by using the
-xflag):docker-compose exec postgres-db psql -x -U sparkuser -d vessels -c "SELECT * FROM golden_records WHERE imo = '9710749';"
The Spark script performs the data processing in several key stages:
-
Ingestion and Cleaning
- The raw CSV data is read into a Spark DataFrame.
- A custom function cleans the CSV header, creating unique names for columns that appear more than once (e.g.,
draughtanddraught_1). - The
imocolumn is cast to an integer type to ensure data quality. - Rows with invalid IMO numbers are filtered out using a checksum validation algorithm.
-
Golden Record Aggregation
To create a single, comprehensive record for each vessel (
imo), a three-part aggregation strategy is used:-
Static Attributes (Mode): For attributes that should be stable (e.g.,
builtYear,vessel_type,length), the script calculates the most frequently occurring value (mode) across all records for that vessel. This makes the result resilient to data entry errors in older records. -
Dynamic Attributes (Latest): For attributes that change with each voyage (e.g.,
destination,eta,last_position_speed), the script selects the single most recent value by using a Window function partitioned byimoand ordered byUpdateDate. -
Evolving Attributes (History): For key identifiers that can change over time (e.g.,
name,flag,mmsi), the script collects a complete set of unique historical values usingcollect_set. This ensures no historical data is lost.
-
-
Loading
- The three aggregated datasets (static, dynamic, and historical) are joined together on the
imokey. - The historical array columns are converted to clean, comma-separated strings.
- The final, complete DataFrame is written to the
golden_recordstable in the PostgreSQL database using a JDBC connection.
- The three aggregated datasets (static, dynamic, and historical) are joined together on the