Skip to content

Commit c1907f2

Browse files
authored
Merge pull request #643 from ibi-group/feature/DT-577-feed-source-summaries-mem-issue
Updated mongo queries to more efficient
2 parents 724a996 + 89d283b commit c1907f2

5 files changed

Lines changed: 292 additions & 196 deletions

File tree

.github/workflows/maven.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
with:
3939
node-version: 22.x
4040
- name: Start MongoDB
41-
uses: supercharge/mongodb-github-action@1.3.0
41+
uses: supercharge/mongodb-github-action@1.12.1
4242
with:
4343
mongodb-version: 4.2
4444
- name: Setup Maven Cache

src/main/java/com/conveyal/datatools/manager/models/FeedSourceSummary.java

Lines changed: 186 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@
77
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
88
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
99
import com.google.common.collect.Lists;
10-
import com.mongodb.client.model.Accumulators;
11-
import com.mongodb.client.model.Projections;
1210
import com.mongodb.client.model.Sorts;
1311
import com.mongodb.client.model.UnwindOptions;
12+
import com.mongodb.client.model.Variable;
1413
import org.bson.Document;
1514
import org.bson.conversions.Bson;
1615

1716
import java.time.LocalDate;
1817
import java.time.ZoneId;
1918
import java.util.ArrayList;
19+
import java.util.Arrays;
2020
import java.util.Date;
2121
import java.util.HashMap;
2222
import java.util.List;
@@ -26,15 +26,19 @@
2626
import static com.conveyal.datatools.manager.DataManager.hasConfigProperty;
2727
import static com.conveyal.datatools.manager.DataManager.isExtensionEnabled;
2828
import static com.conveyal.datatools.manager.DataManager.isModuleEnabled;
29-
import static com.mongodb.client.model.Aggregates.group;
3029
import static com.mongodb.client.model.Aggregates.limit;
3130
import static com.mongodb.client.model.Aggregates.lookup;
3231
import static com.mongodb.client.model.Aggregates.match;
3332
import static com.mongodb.client.model.Aggregates.project;
3433
import static com.mongodb.client.model.Aggregates.replaceRoot;
3534
import static com.mongodb.client.model.Aggregates.sort;
3635
import static com.mongodb.client.model.Aggregates.unwind;
36+
import static com.mongodb.client.model.Filters.expr;
3737
import static com.mongodb.client.model.Filters.in;
38+
import static com.mongodb.client.model.Projections.computed;
39+
import static com.mongodb.client.model.Projections.fields;
40+
import static com.mongodb.client.model.Projections.include;
41+
import static com.mongodb.client.model.Sorts.descending;
3842
import static java.util.Objects.requireNonNullElse;
3943

4044
/**
@@ -164,8 +168,10 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
164168
match(
165169
in("projectId", projectId)
166170
),
171+
172+
// Project only necessary fields early to reduce document size.
167173
project(
168-
Projections.fields(Projections.include(
174+
include(
169175
"_id",
170176
"name",
171177
"deployable",
@@ -174,11 +180,12 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
174180
"labelIds",
175181
"url",
176182
"filename",
177-
"noteIds")
183+
"noteIds"
178184
)
179185
),
180186
sort(Sorts.ascending("name"))
181187
);
188+
182189
return extractFeedSourceSummaries(projectId, organizationId, stages);
183190
}
184191

@@ -188,31 +195,103 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
188195
* If this is updated, be sure to also update the matching Mongo query.
189196
*/
190197
public static Map<String, FeedVersionSummary> getLatestFeedVersionForFeedSources(String projectId) {
191-
List<Bson> stages = Lists.newArrayList(
198+
List<Bson> feedVersionPipeline = Arrays.asList(
199+
// Match FeedVersion documents where feedSourceId equals the feedSourceId passed from the outer document.
192200
match(
193-
in("projectId", projectId)
201+
expr(
202+
new Document("$eq", Arrays.asList("$feedSourceId", "$$feedSourceId"))
203+
)
194204
),
195-
lookup("FeedVersion", "_id", "feedSourceId", "feedVersions"),
196-
lookup("FeedVersion", "publishedVersionId", "namespace", "publishedFeedVersion"),
197-
unwind("$feedVersions"),
198-
unwind("$publishedFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),
199-
sort(Sorts.descending("feedVersions.version")),
200-
group(
201-
"$_id",
202-
Accumulators.first("publishedFeedVersionErrorCount", "$publishedFeedVersion.validationResult.errorCount"),
203-
Accumulators.first("publishedFeedVersionStartDate", "$publishedFeedVersion.validationResult.firstCalendarDate"),
204-
Accumulators.first("publishedFeedVersionEndDate", "$publishedFeedVersion.validationResult.lastCalendarDate"),
205-
Accumulators.first("publishedVersionId", "$publishedVersionId"),
206-
Accumulators.first("feedVersionId", "$feedVersions._id"),
207-
Accumulators.first("firstCalendarDate", "$feedVersions.validationResult.firstCalendarDate"),
208-
Accumulators.first("lastCalendarDate", "$feedVersions.validationResult.lastCalendarDate"),
209-
Accumulators.first("errorCount", "$feedVersions.validationResult.errorCount"),
210-
Accumulators.first("processedByExternalPublisher", "$feedVersions.processedByExternalPublisher"),
211-
Accumulators.first("sentToExternalPublisher", "$feedVersions.sentToExternalPublisher"),
212-
Accumulators.first("gtfsPlusValidation", "$feedVersions.gtfsPlusValidation"),
213-
Accumulators.first("namespace", "$feedVersions.namespace")
205+
sort(descending("version")),
206+
limit(1),
207+
// Project only the fields needed from the FeedVersion to reduce payload size.
208+
project(
209+
include(
210+
"version",
211+
"_id",
212+
"validationResult",
213+
"processedByExternalPublisher",
214+
"sentToExternalPublisher",
215+
"gtfsPlusValidation",
216+
"namespace"
217+
)
214218
)
215219
);
220+
221+
// Define the variable passed into the lookup pipeline.
222+
List<Variable<String>> feedSourceId = List.of(new Variable<>("feedSourceId", "$_id"));
223+
224+
// $lookup that uses the above pipeline to produce "latestFeedVersion" (an array with at most one element).
225+
Bson lookupLatestFeedVersion = lookup(
226+
"FeedVersion",
227+
feedSourceId,
228+
feedVersionPipeline,
229+
"latestFeedVersion"
230+
);
231+
232+
// Pipeline to find the published FeedVersion by namespace (or identifier stored in publishedVersionId)
233+
List<Bson> publishedFeedVersionPipeline = Arrays.asList(
234+
// Match FeedVersion documents where namespace equals the outer document's publishedVersionId.
235+
match(
236+
expr(
237+
new Document("$eq", Arrays.asList("$namespace", "$$publishedVersionId"))
238+
)
239+
),
240+
limit(1),
241+
// Project only the validationResult because that's all that is needed later.
242+
project(include("validationResult"))
243+
);
244+
245+
// Pass publishedVersionId from the local document into the lookup pipeline.
246+
List<Variable<String>> publishedVersionId = List.of(new Variable<>("publishedVersionId", "$publishedVersionId"));
247+
248+
// $lookup that uses the above pipeline to produce "publishedFeedVersion" (an array with at most one element).
249+
Bson lookupPublishedFeedVersion = lookup(
250+
"FeedVersion",
251+
publishedVersionId,
252+
publishedFeedVersionPipeline,
253+
"publishedFeedVersion"
254+
);
255+
256+
// Top-level aggregation stages that combine the lookups and map required fields into a slimmed down result.
257+
List<Bson> stages = Arrays.asList(
258+
// Start by filtering documents by projectId (reduces the number of input documents early).
259+
match(in("projectId", projectId)),
260+
261+
// Attach the latest FeedVersion (as an array "latestFeedVersion").
262+
lookupLatestFeedVersion,
263+
264+
// Attach the published FeedVersion (as an array "publishedFeedVersion").
265+
lookupPublishedFeedVersion,
266+
267+
// Unwind the latestFeedVersion array into a single document.
268+
unwind("$latestFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),
269+
270+
// Unwind the publishedFeedVersion array into a single document.
271+
unwind("$publishedFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),
272+
273+
// Final projection: select and compute only the fields needed for the output to minimize size.
274+
project(fields(
275+
// keep the raw publishedVersionId field for reference.
276+
include("publishedVersionId"),
277+
278+
// Published feed version fields (mapped from the nested publishedFeedVersion.validationResult).
279+
computed("publishedFeedVersionErrorCount", "$publishedFeedVersion.validationResult.errorCount"),
280+
computed("publishedFeedVersionStartDate", "$publishedFeedVersion.validationResult.firstCalendarDate"),
281+
computed("publishedFeedVersionEndDate", "$publishedFeedVersion.validationResult.lastCalendarDate"),
282+
283+
// Latest feed version fields (mapped from the nested latestFeedVersion).
284+
computed("feedVersionId", "$latestFeedVersion._id"),
285+
computed("firstCalendarDate", "$latestFeedVersion.validationResult.firstCalendarDate"),
286+
computed("lastCalendarDate", "$latestFeedVersion.validationResult.lastCalendarDate"),
287+
computed("errorCount", "$latestFeedVersion.validationResult.errorCount"),
288+
computed("processedByExternalPublisher", "$latestFeedVersion.processedByExternalPublisher"),
289+
computed("sentToExternalPublisher", "$latestFeedVersion.sentToExternalPublisher"),
290+
computed("gtfsPlusValidation", "$latestFeedVersion.gtfsPlusValidation"),
291+
computed("namespace", "$latestFeedVersion.namespace")
292+
))
293+
);
294+
216295
return extractFeedVersionSummaries(
217296
"FeedSource",
218297
"feedVersionId",
@@ -228,27 +307,63 @@ public static Map<String, FeedVersionSummary> getLatestFeedVersionForFeedSources
228307
* If this is updated, be sure to also update the matching Mongo query.
229308
*/
230309
public static Map<String, FeedVersionSummary> getFeedVersionsFromLatestDeployment(String projectId) {
231-
List<Bson> stages = Lists.newArrayList(
232-
match(
233-
in("_id", projectId)
234-
),
235-
lookup("Deployment", "_id", "projectId", "deployments"),
236-
unwind("$deployments"),
237-
replaceRoot("$deployments"),
238-
sort(Sorts.descending("lastUpdated")),
239-
limit(1),
240-
lookup("FeedVersion", "feedVersionIds", "_id", "feedVersions"),
241-
unwind("$feedVersions"),
242-
replaceRoot("$feedVersions"),
310+
List<Bson> stages = new ArrayList<>();
311+
stages.add(match(in("_id", projectId)));
312+
313+
// Lookup Deployments for the project.
314+
stages.add(lookup(
315+
"Deployment",
316+
"_id",
317+
"projectId",
318+
"deployments"
319+
));
320+
321+
// Unwind deployments array to get individual deployment documents.
322+
stages.add(unwind("$deployments"));
323+
324+
// Project only fields needed from deployment to reduce doc size before sorting.
325+
stages.add(project(fields(
326+
computed("deployment", "$deployments._id"),
327+
computed("lastUpdated", "$deployments.lastUpdated"),
328+
computed("feedVersionIds", "$deployments.feedVersionIds")
329+
)));
330+
331+
// Sort deployments by lastUpdated descending.
332+
stages.add(sort(descending("lastUpdated")));
333+
stages.add(limit(1));
334+
335+
List<Bson> feedVersionPipeline = Arrays.asList(
336+
match(expr(new Document("$in", Arrays.asList("$_id", "$$feedVersionIds")))),
243337
project(
244-
Projections.fields(Projections.include(
338+
include(
245339
"feedSourceId",
246340
"validationResult.firstCalendarDate",
247341
"validationResult.lastCalendarDate",
248-
"validationResult.errorCount")
342+
"validationResult.errorCount"
249343
)
250344
)
251345
);
346+
347+
// Use pipeline form of lookup to fetch FeedVersions matching deployment’s feedVersionIds
348+
List<Variable<String>> feedVersionIds = List.of(new Variable<>("feedVersionIds", "$feedVersionIds"));
349+
350+
stages.add(lookup(
351+
"FeedVersion",
352+
feedVersionIds,
353+
feedVersionPipeline,
354+
"feedVersions"
355+
));
356+
stages.add(unwind("$feedVersions", new UnwindOptions().preserveNullAndEmptyArrays(false)));
357+
stages.add(replaceRoot("$feedVersions"));
358+
// Final projection: select and compute only the fields needed for the output to minimize size.
359+
stages.add(project(
360+
include(
361+
"_id",
362+
"feedSourceId",
363+
"validationResult"
364+
)
365+
));
366+
252367
return extractFeedVersionSummaries(
253368
"Project",
254369
"_id",
@@ -263,27 +378,46 @@ public static Map<String, FeedVersionSummary> getFeedVersionsFromLatestDeploymen
263378
* <a href="src/main/resources/mongo/getFeedVersionsFromPinnedDeployment.js">getFeedVersionsFromPinnedDeployment.js</a>.
264379
*/
265380
public static Map<String, FeedVersionSummary> getFeedVersionsFromPinnedDeployment(String projectId) {
266-
List<Bson> stages = Lists.newArrayList(
381+
List<Bson> stages = new ArrayList<>();
382+
383+
// Match projects by projectId.
384+
stages.add(match(in("_id", projectId)));
385+
386+
// Project only pinnedDeploymentId to keep doc small.
387+
stages.add(project(include("pinnedDeploymentId")));
388+
389+
// Lookup Deployment documents by pinnedDeploymentId.
390+
stages.add(lookup("Deployment", "pinnedDeploymentId", "_id", "deployment"));
391+
392+
// Unwind deployment array (assuming single deployment per project).
393+
stages.add(unwind("$deployment"));
394+
395+
// Define pipeline in $lookup to filter and project FeedVersion docs.
396+
List<Bson> feedVersionPipeline = Arrays.asList(
267397
match(
268-
in("_id", projectId)
269-
),
270-
project(
271-
Projections.fields(Projections.include("pinnedDeploymentId"))
398+
expr(
399+
new Document("$in", Arrays.asList("$_id", "$$feedVersionIds"))
400+
)
272401
),
273-
lookup("Deployment", "pinnedDeploymentId", "_id", "deployment"),
274-
unwind("$deployment"),
275-
lookup("FeedVersion", "deployment.feedVersionIds", "_id", "feedVersions"),
276-
unwind("$feedVersions"),
277-
replaceRoot("$feedVersions"),
278402
project(
279-
Projections.fields(Projections.include(
403+
include(
404+
"_id",
280405
"feedSourceId",
281406
"validationResult.firstCalendarDate",
282407
"validationResult.lastCalendarDate",
283-
"validationResult.errorCount")
408+
"validationResult.errorCount"
284409
)
285410
)
286411
);
412+
413+
// Define variable for correlated lookup on FeedVersion collection.
414+
List<Variable<String>> feedVersionIds = List.of(
415+
new Variable<>("feedVersionIds", "$deployment.feedVersionIds")
416+
);
417+
418+
// Lookup FeedVersion docs with pipeline and store as feedVersions array.
419+
stages.add(lookup("FeedVersion", feedVersionIds, feedVersionPipeline, "feedVersions"));
420+
287421
return extractFeedVersionSummaries(
288422
"Project",
289423
"_id",

0 commit comments

Comments
 (0)