Last update: January 26, 2026
This repo is part of a larger project GraphRAG app, in which I show how the GraphRAG pattern works.
Part 1 is a basic Data Pipeline made with Dagster, which orchestrates the data ingestion from multiple sources for making a knowledge graph (in Neo4j graph database) and an embedding model (in ChromaDB vector database) for a RAG system. This app is ready to be deployed on the cloud in a Docker container.
This pipeline is specifically tuned for the Electronic Music domain. It captures the rich, interconnected history of electronic artists, from early pioneers to contemporary producers. The dataset encompasses a wide range of sub-genres—including Techno, House, Ambient, IDM, and Drum & Bass—modeling the complex relationships between artists, their releases, and the evolving taxonomy of electronic musical styles.
Figure 1. Artist Depeche Mode, visualization of graph query and modelling.
This data pipeline orchestrates data ingestion from multiple sources to build a rich dataset of music artists:
- Structured Data
- Wikidata API (Artists & Genres—using SPARQL & Action API)
- Last.fm API (Similar Artists & Tags)
- MusicBrainz API (Releases & Tracks)
- Unstructured Data
- Wikipedia API (Articles about Artists and Genres)
The goal is to prepare unstructured data (Wikipedia articles of musicians, bands, and artists) and split it into chunks enriched with structured metadata. This prepares the data for a hybrid search approach:
- Semantic Search: Preparing text chunks for vectorization.
- Deterministic Search: Using Neo4j (Graph Database).
We leverage Polars for high-performance data transformation, Pydantic for rigorous data validation, and Dagster Resources for clean infrastructure management.
- Orchestration: Dagster (Assets, Resources, Partitions, Asset Checks, I/O Managers)
- Databases: Neo4j (Graph), ChromaDB (Vector)
- Data Engineering: Polars (manipulation), Msgspec (serialization), Ftfy (text cleaning)
- Data Validation & Config: Pydantic, Pydantic Settings
- AI & ML: PyTorch, Transformers, Sentence Transformers, Nomic (Embeddings), LangChain (Text Splitters), Einops
- Networking & Utils: curl-cffi (Async HTTP with browser impersonation), Structlog, Tqdm
- Cloud: Dagster GCP (Google Cloud Platform integration)
- Language: Python 3.13+
- Tooling and Testing: uv, Ruff, Ty, Bandit
This project implements a strict separation of concerns following Dagster's philosophy, dividing the codebase into distinct layers with clear responsibilities.
┌──────────────────────────────────────────────────────────────────────────────┐
│ DAGSTER DEFINITIONS LAYER │
│ "The How" - Infrastructure & Configuration │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────────┐ │
│ │ definitions.py │ │ resources.py │ │ io_managers.py │ │
│ │ (Entry Point) │ │ (Factories/DI) │ │ (Parquet/JSONL Persistence) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────────┬────────────────┘ │
│ │ │ │ │
│ ┌────────┴────────┐ ┌────────┴────────┐ ┌────────────┴──────────┐ │
│ │ checks.py │ │ partitions.py │ │ settings.py │ │
│ │ (Quality Gates) │ │ (By Decade) │ │ (pydantic-settings) │ │
│ └─────────────────┘ └─────────────────┘ └───────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ ASSET LAYER (defs/assets/) │
│ "The What" - Business Logic & Transformation │
│ ┌────────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ build_artist │ │ extract_artists │ │ extract_releases │ │
│ │ _index.py │──▶│ .py │──▶│ .py │ │
│ │ (Wikidata) │ │ (Wikidata+Last) │ │ (MusicBrainz) │ │
│ └────────────────┘ └────────┬─────────┘ └───────────┬───────────┘ │
│ │ │ │
│ ┌─────────────────────────────┼─────────────────────────┤ │
│ │ ▼ ▼ │
│ │ ┌──────────────────┐ ┌────────────────┐ ┌─────────────────────┐ │
│ │ │ extract_genres │ │ extract_tracks │ │ extract_countries │ │
│ │ │ .py │ │ .py │ │ .py │ │
│ │ └──────────────────┘ └────────────────┘ └─────────────────────┘ │
│ │ │
│ │ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ │ ┌─────────────────────┐ │ │
│ │ │ │ extract_artists │──┐ │ │
│ │ │ │ _articles.py │ │ ┌─────────────────────┐ │ │
│ │ │ └─────────────────────┘ ├──▶│ merge_wikipedia │ │ │
│ │ │ ┌─────────────────────┐ │ │ _articles.py │ │ │
│ │ │ │ extract_genres │──┘ └─────────┬───────────┘ │ │
│ │ │ │ _articles.py │ │ │ │
│ │ │ └─────────────────────┘ ▼ │ │
│ │ │ ┌───────────────────┐ │ │
│ │ │ │ ingest_vector_db │ │ │
│ │ │ │ .py (ChromaDB) │ │ │
│ │ │ └───────────────────┘ │ │
│ │ │ │ │
│ │ │ ┌───────────────────┐ │ │
│ │ │ │ ingest_graph_db │◀── artists, releases, tracks, │ │
│ │ │ │ .py (Neo4j) │ genres, countries │ │
│ │ │ └───────────────────┘ │ │
│ │ └────────────────────────────────────────────────────────────────────┘ │
│ └──────────────────────────────────────────────────────────────────────────│
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────────────┐
│ UTILITIES LAYER (utils/) │
│ Domain-Agnostic, Reusable Components │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ NETWORK & I/O PRIMITIVES │ │
│ │ ┌─────────────────────┐ ┌────────────────────────────────────┐ │ │
│ │ │ network_helpers.py │ │ io_helpers.py │ │ │
│ │ │ (HTTP retries, │ │ (JSON/text files, │ │ │
│ │ │ concurrency) │ │ cache key generation) │ │ │
│ │ └─────────────────────┘ └────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ DOMAIN ADAPTERS (API Clients) │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ wikidata_ │ │ musicbrainz_ │ │ lastfm_ │ │ │
│ │ │ helpers.py │ │ helpers.py │ │ helpers.py │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ wikipedia_ │ │ neo4j_ │ │ chroma_ │ │ │
│ │ │ helpers.py │ │ helpers.py │ │ helpers.py │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DATA TRANSFORMATION │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ data_transformation_helpers.py │ │ │
│ │ │ (Text normalization, deduplication, Unicode fixing) │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
Figure 2. Dagster Assets Catalog, showing the Data Pipeline with all Assets materialized.
The architecture strictly separates "The What" (business logic) from "The How" (infrastructure), following Dagster best practices.
This folder contains the Data Definition Graph.
| Aspect | Description |
|---|---|
| Role | Defines what data exists, how it is computed, and its dependencies |
| Content | Pure transformation logic. Takes data in (as parameters) and returns data out (as return values) |
| Change Frequency | High. This is where you edit code when business requirements change |
| Dagster Rule | Assets are unaware of where they run or where data is stored. They "ask" for a resource and "return" a dataframe |
Key Design Patterns:
- Assets return
pl.LazyFrame,list[Model], orMaterializeResult— never write files directly - Domain-specific constants (e.g.,
WIKIDATA_PROP_GENRE = "P136") belong in assets, not utils - Validation logic and business rules are implemented within assets
- Assets delegate parsing/filtering to helper modules for reusability
These files define the Execution Environment.
| File | Role | Change Frequency |
|---|---|---|
resources.py |
Connection factories (Neo4j, HTTP clients, ChromaDB). Handles secrets, timeouts, connection pooling | Low |
io_managers.py |
Bridge between Python memory and File System/Cloud Storage. Handles serialization (JSONL vs Parquet) and path organization | Low |
partitions.py |
Slicing strategy. Defines the "shape" of pipeline execution (by decade) | Low |
checks.py |
Quality contracts. Defines rules the data must obey after materialization | Medium |
Key Design Patterns:
- Explicit Resource Factories: Resources expose
get_client()context managers rather than implicit lifecycle hooks - Secrets via EnvVar:
resources.pyusesEnvVar("NEO4J_PASSWORD")for secrets, notsettings.py - Streaming I/O:
io_managers.pyusessink_parquet()for LazyFrames (O(1) memory)
Contains domain-agnostic, reusable helpers that can be used across different projects.
| Module | Responsibility | Key Functions |
|---|---|---|
network_helpers.py |
HTTP requests with exponential backoff, concurrency control, async generators | make_async_request_with_retries(), run_tasks_concurrently(), yield_batches_concurrently() |
io_helpers.py |
JSON/text file I/O, cache key generation, async file operations | async_read_json_file(), async_write_json_file(), generate_cache_key() |
data_transformation_helpers.py |
Text normalization, Unicode fixing (ftfy), Polars expressions | normalize_and_clean_text(), deduplicate_by_priority() |
wikidata_helpers.py |
Wikidata SPARQL & Action API adapter with caching | run_extraction_pipeline(), async_fetch_wikidata_entities_batch(), extract_wikidata_*() |
wikipedia_helpers.py |
Wikipedia API adapter with section parsing | async_fetch_wikipedia_article(), parse_wikipedia_sections() |
musicbrainz_helpers.py |
MusicBrainz API adapter with pagination and filtering | fetch_artist_release_groups_async(), filter_release_groups(), select_best_release(), parse_release_year() |
lastfm_helpers.py |
Last.fm API adapter with response parsing | async_fetch_lastfm_data_with_cache(), parse_lastfm_artist_response() |
neo4j_helpers.py |
Generic Cypher execution with retry logic | execute_cypher(), clear_database() |
chroma_helpers.py |
ChromaDB embedding utilities | NomicEmbeddingFunction, get_device(), generate_doc_id() |
Design Rules for Utils:
- No global config: Utils never import
settings.pydirectly — configuration is passed as arguments - Dependency injection: API keys, paths, URLs, timeouts passed as function parameters
- No domain logic: Schema definitions (like Neo4j indexes) belong in assets, not utils
- 100% reusable: All helpers can be used across different projects without modification
┌─────────────────────────────────────────────────────────────────────┐
│ 1. RESOURCES (defs/resources.py) │
│ Provide raw connections (AsyncClient, Neo4j Driver, ChromaDB) │
│ │ │
│ ▼ │
│ 2. UTILS (utils/*.py) │
│ Use connections to perform specific actions │
│ (fetch_sparql_query, parse_lastfm_artist_response) │
│ │ │
│ ▼ │
│ 3. ASSETS (defs/assets/*.py) │
│ Orchestrate Utils to achieve business goals │
│ (Extract Artists, Build Knowledge Graph, Ingest Vector DB) │
│ │ │
│ ▼ │
│ 4. I/O MANAGERS (defs/io_managers.py) │
│ Persist asset outputs to storage (Parquet, JSONL) │
│ │ │
│ ▼ │
│ 5. CHECKS (defs/checks.py) │
│ Verify final output quality and data trust │
└─────────────────────────────────────────────────────────────────────┘
The pipeline transforms raw data from external APIs into two optimized formats: a Knowledge Graph (for structural queries) and Vector-Ready Text Chunks (for semantic search).
Figure 3. Dagster Assets Lineage, showing the Data Pipeline components and dependencies.
graph TD
subgraph "Stage 1: Artist Discovery"
A[Wikidata SPARQL] -->|Partitioned by Decade| B(build_artist_index_by_decade)
B -->|AllPartitionMapping| C[artist_index]
end
subgraph "Stage 2: Data Enrichment"
C -->|Wikidata + Last.fm| D[extract_artists]
D --> E[extract_releases]
E --> F[extract_tracks]
D --> G[extract_genres]
D --> H[extract_countries]
end
subgraph "Stage 3: Knowledge Graph"
D & E & F & G & H --> I[ingest_graph_db]
I -->|Nodes & Relationships| J[(Neo4j Aura)]
end
subgraph "Stage 4: RAG Preparation"
D -->|Wikipedia URLs| K1[extract_artists_articles]
G -->|Wikipedia URLs| K2[extract_genres_articles]
K1 -->|Clean & Chunk| L1[Text Splitter]
K2 -->|Clean & Chunk| L2[Text Splitter]
L1 & L2 -->|Merge| M[merge_wikipedia_articles]
M --> N[wikipedia_articles.jsonl]
end
subgraph "Stage 5: Vector Ingestion"
N -->|Nomic Embeddings| P[ingest_vector_db]
P -->|Upsert| Q[(ChromaDB)]
end
style A fill:#e1f5fe
style J fill:#c8e6c9
style Q fill:#fff3e0
| Asset | Input Dependencies | Output Type | I/O Manager |
|---|---|---|---|
build_artist_index_by_decade |
None (Wikidata SPARQL) | pl.LazyFrame |
Parquet |
artist_index |
build_artist_index_by_decade (all partitions) |
pl.LazyFrame |
Parquet |
artists |
artist_index |
list[Artist] |
Parquet |
genres |
artists |
pl.LazyFrame |
Parquet |
releases |
artists |
list[Release] |
Parquet |
tracks |
releases |
list[Track] |
Parquet |
countries |
artists |
list[Country] |
Parquet |
artists_articles |
artists |
list[Article] |
JSONL |
genres_articles |
genres |
list[Article] |
JSONL |
wikipedia_articles |
artists_articles, genres_articles |
pl.LazyFrame |
JSONL |
graph_db |
artists, releases, tracks, genres, countries |
MaterializeResult |
None (sink) |
vector_db |
wikipedia_articles |
MaterializeResult |
None (sink) |
The pipeline uses decade-based partitioning for the initial artist discovery phase:
DECADES_TO_EXTRACT = {
"1930s": (1930, 1939),
"1940s": (1940, 1949),
"1950s": (1950, 1959),
"1960s": (1960, 1969),
"1970s": (1970, 1979),
"1980s": (1980, 1989),
"1990s": (1990, 1999),
"2000s": (2000, 2009),
"2010s": (2010, 2019),
"2020s": (2020, 2029),
}Benefits:
- Parallelizes SPARQL queries across decades (10 concurrent runs)
- Provides natural checkpointing (failed decade can be retried independently)
- Keeps memory usage constant O(1) regardless of total dataset size
We process Wikipedia articles to create a high-quality corpus for Retrieval-Augmented Generation (RAG).
- Ingestion: Fetches full articles for every valid artist found in the Knowledge Graph.
- Cleaning: Removes Wiki-markup, references, and non-text elements (using excluded headers).
- Chunking: Splits text into 2048-token windows with 512-token overlap using a recursive character splitter (offloaded to threads for performance).
- Enrichment: Each chunk is "stamped" with global metadata (Genre, Year, Artist Name) to allow for hybrid filtering during retrieval (e.g., "Find chunks about 'Techno' from the '90s").
| Field | Type | Description |
|---|---|---|
article |
String | The enriched text content prepended with search_document:. |
metadata |
JSON | Contextual tags: title, artist_name, genres, inception_year, wikipedia_url. |
Download the dataset from Hugging Face: pacoreyes/electronic-music-wikipedia-rag
We construct a deterministic Knowledge Graph to map the relationships between the musical entities. This allows for precise multi-hop queries (e.g., "Find all sub-genres of 'House' that originated in 'France'").
erDiagram
Artist ||--o{ Genre : PLAYS_GENRE
Artist ||--o{ Artist : SIMILAR_TO
Artist ||--o| Country : FROM_COUNTRY
Release ||--|| Artist : PERFORMED_BY
Genre ||--o{ Genre : SUBGENRE_OF
Artist {
string id
string name
string mbid
string_list aliases
}
Release {
string id
string title
int year
string_list tracks
}
Genre {
string id
string name
string_list aliases
}
Country {
string id
string name
string_list aliases
}
- Artist: The core entity (e.g., "Daft Punk").
- Release: Major releases (Albums/Singles) linked to artists. Contains embedded track list as a property.
- Genre: A hierarchical taxonomy of musical styles (e.g., "French House" -> "House" -> "Electronic").
- Country: Geographic entities representing artist origin (e.g., "France", "Germany"). Includes alternative names (aliases) for improved searchability.
(Artist)-[:PLAYS_GENRE]->(Genre)(Artist)-[:SIMILAR_TO]->(Artist): Derived from Last.fm community data.(Artist)-[:FROM_COUNTRY]->(Country): Artist's country of origin.(Release)-[:PERFORMED_BY]->(Artist)(Genre)-[:SUBGENRE_OF]->(Genre): Enables hierarchical graph traversal.
The graph database uses two types of indexes for query optimization:
Property Indexes (single-property lookups):
| Index Name | Label | Property |
|---|---|---|
artist_id_idx |
Artist | id |
artist_name_idx |
Artist | name |
release_id_idx |
Release | id |
genre_id_idx |
Genre | id |
genre_name_idx |
Genre | name |
country_id_idx |
Country | id |
country_name_idx |
Country | name |
Fulltext Indexes (multi-property text search):
| Index Name | Label | Properties |
|---|---|---|
artist_fulltext_idx |
Artist | name, aliases |
genre_fulltext_idx |
Genre | name, aliases |
release_fulltext_idx |
Release | title, tracks |
country_fulltext_idx |
Country | name, aliases |
Fulltext indexes enable queries like:
CALL db.index.fulltext.queryNodes("artist_fulltext_idx", "Kraftwerk") YIELD node
RETURN node.name, node.aliasesDataset Statistics:
- Articles: 4,681 processed.
- 30,005 chunks (documents)
- Nodes: 98,677.
- 112 countries
- 754 genres
- 4681 artists
- 98,715 Releases
- Edges: 123,574.
Content: 29,979 documents about 4,679 artists embedded with model nomic-ai/nomic-embed-text-v1.5 with rich metadata.
┌─────────────────────────────────────────────────────────────────────┐
│ article (str) │
│ └── Raw text enriched with context from flattened metadata │
│ │
│ metadata (dict) │
│ ├── title: str │
│ ├── artist_name: str │
│ ├── aliases: list[str] │
│ ├── tags: list[str] (optional) │
│ ├── similar_artists: list[str] (optional) │
│ ├── genres: list[str] (optional) │
│ ├── inception_year: int (optional) │
│ ├── country: str │
│ ├── wikipedia_url: str │
│ ├── wikidata_uri: str │
│ ├── chunk_index: int │
│ └── total_chunks: int │
└─────────────────────────────────────────────────────────────────────┘
- Collection Name:
music_rag_collection - Embedding Model:
nomic-embed-text-v1.5(Nomic AI) - Distance Metric: Cosine Similarity
- Device Support: Automatic detection of CUDA, MPS (Apple Silicon), or CPU
The ingest_vector_db asset reads from wikipedia_articles.jsonl, generates embeddings with automatic GPU/MPS acceleration, and upserts them into the vector store.
The pipeline enforces lazy evaluation throughout to ensure O(1) memory usage:
# Assets return LazyFrames, not eager DataFrames
@asset
def artist_index(...) -> pl.LazyFrame:
return clean_lf # Never .collect() inside asset
# I/O Managers handle streaming writes
class PolarsParquetIOManager:
def handle_output(self, context, obj):
if isinstance(obj, pl.LazyFrame):
obj.sink_parquet(path) # Stream directly to diskFor the RAG dataset, we use sparse JSON to minimize storage:
# Only non-null fields are written
{"id": "Q123_chunk_1", "metadata": {"title": "Daft Punk", "genres": ["House"]}}
# NOT: {"id": "...", "metadata": {"title": "...", "genres": [...], "aliases": null, "tags": null}}Domain models generate Polars-compatible schemas automatically:
# In models.py
RELEASE_SCHEMA = _generate_polars_schema(Release)
TRACK_SCHEMA = _generate_polars_schema(Track)
COUNTRY_SCHEMA = _generate_polars_schema(Country)This ensures type consistency between msgspec Structs and Polars DataFrames without manual schema duplication.
Resources expose explicit factory methods rather than implicit lifecycle hooks:
class Neo4jResource(ConfigurableResource):
uri: str
username: str
password: str
@contextmanager
def get_driver(self, context) -> Generator[Driver, None, None]:
driver = GraphDatabase.driver(self.uri, auth=(self.username, self.password))
try:
driver.verify_connectivity()
yield driver
finally:
driver.close()
# Usage in asset
@asset
def ingest_graph_db(neo4j: Neo4jResource, ...):
with neo4j.get_driver(context) as driver:
# Use driverAssets delegate parsing and transformation logic to helper modules for reusability:
# In asset (orchestration only)
from data_pipeline.utils.lastfm_helpers import parse_lastfm_artist_response
lastfm_data = await async_fetch_lastfm_data_with_cache(...)
lastfm_info = parse_lastfm_artist_response(lastfm_data) # Delegated parsing
return Artist(
tags=lastfm_info.tags,
similar_artists=lastfm_info.similar_artists,
...
)Neo4j Aura connections include retry logic with exponential backoff:
def _execute_with_retry(driver, query, max_retries=3, base_delay=2.0):
for attempt in range(max_retries + 1):
try:
with driver.session() as session:
return session.run(query).single()
except (ServiceUnavailable, SessionExpired) as e:
if attempt < max_retries:
time.sleep(base_delay * (2 ** attempt))
else:
raiseInstead of creating separate Track nodes, the pipeline embeds track information directly into Release nodes as a list property. This design reduces graph complexity while preserving track order:
# Tracks are formatted with position prefix: "1. Track Title", "2. Another Track"
# Stored as Release.tracks property in Neo4j
tracks_property = ["1. Welcome to the Pleasuredome", "2. War", "3. Two Tribes"]Benefits:
- Reduces node count and relationship overhead
- Maintains track ordering naturally
- Enables fulltext search across release titles and track names via Neo4j fulltext indexes
Multi-level caching reduces API calls and enables resumable runs:
| Cache Level | Location | Format | Purpose |
|---|---|---|---|
| Wikidata entities | .cache/wikidata/{qid}.json |
JSON | Entity metadata |
| Wikipedia articles | .cache/wikipedia/{qid}.txt |
Plain text | Article content |
| MusicBrainz releases | .cache/musicbrainz/{mbid}_releases.json |
JSON | Release groups |
| MusicBrainz tracks | .cache/musicbrainz/{mbid}_tracks.json |
JSON | Track listings |
| Last.fm data | .cache/last_fm/{hash}.json |
JSON | Tags, similar artists |
graph_rag_data_pipeline_1/
├── src/
│ └── data_pipeline/
│ ├── definitions.py # Dagster entry point
│ ├── settings.py # Pydantic settings (paths, timeouts)
│ ├── models.py # Domain models (msgspec Structs)
│ ├── defs/
│ │ ├── assets/ # Business logic layer
│ │ │ ├── build_artist_index.py
│ │ │ ├── extract_artists.py
│ │ │ ├── extract_artists_articles.py
│ │ │ ├── extract_countries.py
│ │ │ ├── extract_genres.py
│ │ │ ├── extract_genres_articles.py
│ │ │ ├── extract_releases.py
│ │ │ ├── extract_tracks.py
│ │ │ ├── merge_wikipedia_articles.py
│ │ │ ├── ingest_graph_db.py
│ │ │ └── ingest_vector_db.py
│ │ ├── resources.py # Connection factories
│ │ ├── io_managers.py # Parquet/JSONL persistence
│ │ ├── partitions.py # Decade partitioning
│ │ └── checks.py # Data quality gates
│ └── utils/ # Reusable helpers
│ ├── network_helpers.py
│ ├── io_helpers.py
│ ├── data_transformation_helpers.py
│ ├── wikidata_helpers.py
│ ├── wikipedia_helpers.py
│ ├── musicbrainz_helpers.py
│ ├── lastfm_helpers.py
│ ├── neo4j_helpers.py
│ └── chroma_helpers.py
├── tests/ # Mirrors src/ structure
│ └── data_pipeline/
│ ├── test_settings.py
│ ├── defs/
│ │ ├── assets/
│ │ │ ├── test_build_artist_index.py
│ │ │ ├── test_extract_artist.py
│ │ │ ├── test_extract_artists_articles.py
│ │ │ ├── test_extract_countries.py
│ │ │ ├── test_extract_genres.py
│ │ │ ├── test_extract_genres_articles.py
│ │ │ ├── test_extract_releases.py
│ │ │ ├── test_extract_tracks.py
│ │ │ ├── test_merge_wikipedia_articles.py
│ │ │ ├── test_ingest_graph_db.py
│ │ │ └── test_ingest_vector_db.py
│ │ ├── test_checks.py
│ │ ├── test_io_managers.py
│ │ ├── test_partitions.py
│ │ └── test_resources.py
│ └── utils/
│ ├── test_chroma_helpers.py
│ ├── test_data_transformation_helpers.py
│ ├── test_io_helpers.py
│ ├── test_lastfm_helpers.py
│ ├── test_musicbrainz_helpers.py
│ ├── test_neo4j_helpers.py
│ ├── test_network_helpers.py
│ ├── test_wikidata_helpers.py
│ └── test_wikipedia_helpers.py
├── scripts/ # Standalone CLI utilities
├── data_volume/ # Local data & caches
│ ├── .cache/ # API response caches
│ └── datasets/ # Materialized assets
└── .env # Secrets (not committed)
- Python 3.13+
- uv (Astral's Python package manager)
- A Neo4j instance (Aura cloud recommended)
To ensure replicability, follow these steps to set up the environment and dependencies:
-
Clone the repository:
git clone <repository-url> cd graph_rag_data_pipeline_1
-
Install dependencies using
uv: This will create a virtual environment and install all required packages (including dev dependencies) fromuv.lock.uv sync
-
Configure Environment Variables: The application requires several API keys and database credentials. Create a
.envfile in the root directory and populate it with your keys:# Neo4j Graph Database NEO4J_URI=neo4j+s://<your-instance-id>.databases.neo4j.io NEO4J_USERNAME=neo4j NEO4J_PASSWORD=<your-password> # API Keys LASTFM_API_KEY=<your-lastfm-api-key> NOMIC_API_KEY=<your-nomic-api-key> # Optional: Dagster Environment (PROD or DEV) DAGSTER_ENV=DEV
Note: You can obtain a free Nomic API key at atlas.nomic.ai and a Last.fm API key at last.fm/api.
-
Activate the virtual environment:
source .venv/bin/activate -
Launch the Dagster development server: This command starts the Dagster UI and allows you to trigger and monitor the pipeline.
dg dev
-
Execute the Pipeline:
- Open http://localhost:3000 in your browser.
- Navigate to Assets -> Global Asset Graph.
- Click "Materialize all" (top right) to run the full end-to-end pipeline.
- Note: The
build_artist_index_by_decadeasset is partitioned. Dagster will automatically launch 10 parallel runs (one for each decade) and then merge the results inartist_index.
To verify the installation and code integrity, run the test suite:
uv run pytestWe use ruff for linting and bandit for security scanning:
uv run ruff check .
uv run bandit -r src/The pipeline includes automated asset checks that validate data after materialization:
| Check | Asset | Validation Rule |
|---|---|---|
check_artist_index_integrity |
artist_index |
No null IDs/names, no duplicates |
check_artists_completeness |
artists |
>= 50% of artists have genres or tags |
check_releases_per_artist |
releases |
Average releases per artist >= 1 |
check_tracks_schema |
tracks |
No null titles or album_ids |
check_genres_quality |
genres |
No null genre names |
