A scalable movie recommender system built with Apache Spark and Scala 3 for the EPFL course "Systems for data management and data science". The system loads movie and ratings datasets, computes aggregated statistics, and produces personalized recommendations using two different approaches: a baseline predictor and collaborative filtering (ALS), which both use Locality-Sensitive Hashing (LSH) for genre-based nearest-neighbor lookup. The original project description is contained in this file.
.
├── build.sbt # SBT build definition
├── project/
│ ├── build.properties # SBT version (1.9.0)
│ └── plugins.sbt # sbt-assembly plugin
└── src/
├── main/
│ ├── resources/
│ │ ├── movies_small.csv # Sample movies dataset
│ │ └── ratings_small.csv # Sample ratings dataset
│ └── scala/app/
│ ├── Main.scala # Application entry point
│ ├── aggregator/
│ │ └── Aggregator.scala # Incremental rating aggregation
│ ├── loaders/
│ │ ├── MoviesLoader.scala # Loads movie data into an RDD
│ │ └── RatingsLoader.scala# Loads ratings data into an RDD
│ └── recommender/
│ ├── Recommender.scala # Top-level recommendation orchestrator
│ ├── baseline/
│ │ └── BaselinePredictor.scala
│ ├── collaborativeFiltering/
│ │ └── CollaborativeFiltering.scala
│ └── LSH/
│ ├── LSHIndex.scala
│ ├── MinHash.scala
│ └── NNLookup.scala
└── test/
└── scala/
└── MainTest.scala # Test suite
MoviesLoader ──┐
├──► Aggregator ──► getResult() / getKeywordQueryResult()
RatingsLoader ──┤
│
└──► LSHIndex ──► NNLookup ──┐
├──► Recommender
└──► BaselinePredictor ────────┤
│
└──► CollaborativeFiltering ───┘
(ALS)
- Loaders read pipe-delimited CSV files into typed Spark RDDs.
- Aggregator maintains per-movie average ratings and supports incremental updates.
- LSHIndex builds a MinHash-based bucket index over movie genres for fast approximate nearest-neighbor search.
- BaselinePredictor predicts ratings using normalised per-user and per-movie average deviations.
- CollaborativeFiltering trains a matrix-factorisation model (ALS) and predicts ratings for unseen movies.
- Recommender combines the LSH lookup with either predictor to return the top-
Kpersonalized recommendations.
Both dataset files are pipe-separated (|) CSV files stored under src/main/resources/.
<movieId>|<title>|<genre1>|<genre2>|...
| Field | Type | Description |
|---|---|---|
movieId |
Int | Unique movie identifier |
title |
String | Movie title |
genre* |
String | One or more genre tags (0 or more) |
<userId>|<movieId>|<rating>|<timestamp>
| Field | Type | Description |
|---|---|---|
userId |
Int | Unique user identifier |
movieId |
Int | Movie being rated |
rating |
Double | Rating value |
timestamp |
Int | Unix timestamp of the rating event |
The RatingsLoader exposes ratings as RDD[(userId, movieId, Option[Double], rating, timestamp)], where the Option[Double] field holds a previous rating (used for incremental updates in the Aggregator).
| Requirement | Version |
|---|---|
| JDK | 21 |
| Scala | 3.3.1 |
| Apache Spark | 3.5.1 |
| SBT | 1.9.0 |
Note: The build enforces JDK 21 at startup — using a different JDK version will cause an assertion error.
The .sbtopts file already configures the required --add-opens JVM flags so that Spark runs correctly under JDK 21.
Compile the project:
sbt compileProduce a fat JAR (excluding Spark/Hadoop from the assembly):
sbt assemblyThe resulting JAR is written to target/scala-3.3.1/Project_2-assembly-0.1.1.jar.
sbt testAdd your Spark logic inside src/main/scala/app/Main.scala:
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("app").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
// Load data
val movies = new MoviesLoader(sc, "/movies_small.csv").load()
val ratings = new RatingsLoader(sc, "/ratings_small.csv").load()
// Aggregate average ratings
val aggregator = new Aggregator(sc)
aggregator.init(ratings, movies)
aggregator.getResult().take(10).foreach(println)
// Build LSH index and get recommendations
val seed = (0 until 12).toIndexedSeq
val index = new LSHIndex(movies, seed)
val recommender = new Recommender(sc, index, ratings)
val topK = recommender.recommendBaseline(userId = 1, genre = List("Action"), K = 5)
topK.foreach { case (movieId, score) => println(s"Movie $movieId -> $score") }
}
}Submit to a running Spark cluster:
spark-submit \
--class app.Main \
--master local[*] \
target/scala-3.3.1/Project_2-assembly-0.1.1.jarclass MoviesLoader(sc: SparkContext, path: String)
def load(): RDD[(Int, String, List[String])]
// Returns: (movieId, title, genres)Reads the movie CSV from the classpath resource path, parses each pipe-delimited line, and persists the RDD in memory.
class RatingsLoader(sc: SparkContext, path: String)
def load(): RDD[(Int, Int, Option[Double], Double, Int)]
// Returns: (userId, movieId, previousRating, rating, timestamp)Reads the ratings CSV from the classpath resource path and persists the RDD in memory. The previousRating field is always None on initial load; it is populated during incremental updates.
class Aggregator(sc: SparkContext)
def init(ratings: RDD[...], title: RDD[...]): Unit
def getResult(): RDD[(String, Double)]
def getKeywordQueryResult(keywords: List[String]): Double
def updateResult(delta: Array[...]): Unitinit– Joins ratings with movie titles and computes the average rating for each movie. Movies with no ratings receive an average of0.0. The state is persisted on disk and memory for fault tolerance.getResult– Returns(title, averageRating)pairs for all movies.getKeywordQueryResult– Returns the mean average rating over all movies whose genre list contains all of the provided keywords. Returns-1.0if no matching movies exist, or0.0if matching movies exist but none have been rated.updateResult– Incrementally updates the aggregated state given a batch of new or revised ratings, avoiding a full recomputation.
class Recommender(sc: SparkContext, index: LSHIndex, ratings: RDD[...])
def recommendBaseline(userId: Int, genre: List[String], K: Int): List[(Int, Double)]
def recommendCollaborative(userId: Int, genre: List[String], K: Int): List[(Int, Double)]Both methods:
- Use the LSH nearest-neighbor lookup to find candidate movies that share the same genre bucket as
genre. - Filter out movies already watched by
userId. - Predict a rating for each candidate using either the baseline or ALS predictor.
- Return the top-
K(movieId, predictedRating)pairs, sorted by descending predicted rating.
class BaselinePredictor()
def init(ratingsRDD: RDD[...]): Unit
def predict(userId: Int, movieIds: RDD[Int]): RDD[(Int, Double)]Implements the standard baseline prediction formula:
prediction(u, i) = avgUser(u) + avgDev(i) * scale(avgUser(u) + avgDev(i), avgUser(u))
where scale(x, avg) normalises deviations to [1, 5], and avgDev(i) is the mean normalised deviation of all users' ratings for movie i.
class CollaborativeFiltering(rank: Int, regularizationParameter: Double, seed: Long, n_parallel: Int)
def init(ratingsRDD: RDD[...]): Unit
def predict(userId: Int, movieIds: RDD[Int]): RDD[(Int, Double)]Wraps Spark MLlib's ALS (Alternating Least Squares) matrix factorisation:
| Parameter | Value |
|---|---|
rank |
10 |
regularizationParameter |
0.1 |
seed |
0 |
n_parallel (blocks) |
4 |
maxIterations |
20 |
class LSHIndex(data: RDD[(Int, String, List[String])], seed: IndexedSeq[Int])
def hash(input: RDD[List[String]]): RDD[(IndexedSeq[Int], List[String])]
def getBuckets(): RDD[(IndexedSeq[Int], List[(Int, String, List[String])])]
def lookup[T](queries: RDD[(IndexedSeq[Int], T)]): RDD[(IndexedSeq[Int], T, List[...])]class NNLookup(lshIndex: LSHIndex)
def lookup(queries: RDD[List[String]]): RDD[(List[String], List[(Int, String, List[String])])]class MinHash(seed: IndexedSeq[Int])
def hash(data: List[String]): IndexedSeq[Int]The LSH pipeline:
MinHash.hashcomputes a MinHash signature (one minimum hash per seed) for a list of genre keywords.LSHIndexgroups movies into buckets by their MinHash signature.NNLookuphashes a query genre list, then retrieves all movies that land in the same bucket — these are approximate nearest neighbors in genre space.