Skip to content

GIZELLYPY/airflow_processing_data

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ETL Workflow for Employee Data Analysis Using Apache Airflow and PySpark

ETL for Airflow Data Processing - Employees Analysis Workflow

This repository contains an Apache Airflow DAG (Directed Acyclic Graph) designed for data processing workflows using PySpark. The DAG is orchestrated to load, process, and transform data from CSV files into a structured format for further analysis and storage. image

Data Flow

Data is initially read from CSV files, processed through various transformations in the Silver layer, and ultimately aggregated into structured tables in the Golden layer. Each task within the DAG handles a specific part of this pipeline, ensuring data is methodically transformed and stored at each stage.

Quality Checks Using Great Expectations

At the Silver layer, quality checks are implemented using the Great Expectations library to ensure data integrity before it is loaded into the Gold layer. These checks validate data quality aspects such as:

  • Correctness of data formats.
  • Non-duplication of key fields.
  • Completeness and accuracy of the data.

Repository Structure

The codebase is structured into various directories, mainly focusing on the scripts under src/dag/tasks, which include the individual tasks that make up the DAG.

Overview of dag.py

The main DAG definition can be found in src/dag/dag.py. This script sets up the DAG with the following tasks:

Start the workflow.

Process data from multiple CSV inputs. Load processed data into a staging area (Silver layer). Aggregate transformed data into the final Gold layer for analytics. Each task is implemented as a Python function that gets executed by a PythonOperator in Airflow.

Task Implementations

Here's a brief on some of the key tasks:

input_base_cargos.py

Purpose: Processes job position data from BaseCargos.csv. Functions:

  • treat_deslocated_columns: Normalizes and cleans the DataFrame.
  • rename_columns: Maps Portuguese column names to English.
  • input_on_silver_base_cargos: Main function to process the data and save it.
input_base_cep.py

Purpose: Handles postal code data transformation from BaseCEP.csv. Functions:

  • read_base_cep: Reads and formats the CSV to a DataFrame.
  • rename_columns: Renames columns to standardized English names.
  • input_on_silver_base_cep: Entry point for processing and storing postal code data.
input_base_cliente.py

Purpose: Processes client data from BaseClientes.csv. Functions:

  • Various preprocessing functions to clean and transform data fields.
  • input_on_silver_base_cliente: Integrates all preprocessing steps and saves the output.
input_base_funcionarios.py

Purpose:Manages employee data from BaseFuncionarios.csv. Functions:

  • drop_unnecessary_columns: Removes columns not needed.
  • process_date_columns: Converts date formats.
  • input_on_silver_base_funcionarios: Processes and anonymizes employee data before saving.
utils.py

Contains utility functions like read_csv_to_df and save_data_agg to assist with file handling and data storage operations.

input_golden.py

Purpose: Aggregates data from the Silver layer to the Golden layer. Functions:

  • join_and_group_by_region: Combines datasets and groups by region.
  • calculate_average_time_on_job: Calculates average employment duration.
  • process_and_input_on_golden: Executes all steps to process data into the Golden layer.

Some Data Vizualization

image

image

image

image

image

About

ETL Workflow for Employee Data Analysis Using Apache Airflow and PySpark

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors