Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.hisp.dhis.program.Program;
import org.hisp.dhis.program.ProgramStage;
import org.hisp.dhis.relationship.RelationshipType;
import org.hisp.dhis.test.webapi.H2ControllerIntegrationTestBase;
import org.hisp.dhis.test.webapi.PostgresControllerIntegrationTestBase;
import org.hisp.dhis.trackedentity.TrackedEntityType;
import org.hisp.dhis.tracker.imports.report.ImportReport;
import org.hisp.dhis.webapi.controller.tracker.JsonImportReport;
Expand All @@ -57,7 +57,7 @@
* requests
*/
@Transactional
class TrackerImportReportTest extends H2ControllerIntegrationTestBase {
class TrackerImportReportTest extends PostgresControllerIntegrationTestBase {

private static final String ORG_UNIT_UID = "PSeMWi7rBgb";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
import static org.hisp.dhis.changelog.ChangeLogType.DELETE;
import static org.hisp.dhis.changelog.ChangeLogType.UPDATE;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -107,6 +110,8 @@ public abstract class AbstractTrackerPersister<T extends TrackerDto, V extends I

protected final FileResourceStore fileResourceStore;

protected final ObjectMapper objectMapper;

/**
* Template method that can be used by classes extending this class to execute the persistence
* flow of Tracker entities
Expand All @@ -123,7 +128,7 @@ public PersistResult persist(TrackerBundle bundle) {

List<EntityNotifications> notifications = new ArrayList<>();
ChangeLogAccumulator changeLogs = new ChangeLogAccumulator();
EntityWriteBatch batch = new EntityWriteBatch();
EntityWriteBatch batch = new EntityWriteBatch(objectMapper);

//
// Extract the entities to persist from the Bundle
Expand All @@ -132,6 +137,11 @@ public PersistResult persist(TrackerBundle bundle) {

Connection conn = DataSourceUtils.getConnection(dataSource);
try {
// Pre-allocate primary keys in a single round-trip for entity types that opt in.
// The cursor advances only on isNew branches inside the loop below.
long[] preAllocatedIds = preAllocateIds(conn, bundle, dtos);
int preAllocatedIdsCursor = 0;

for (T trackerDto : dtos) {

Entity objectReport = new Entity(getType(), trackerDto.getUID());
Expand All @@ -150,6 +160,15 @@ public PersistResult persist(TrackerBundle bundle) {
//
V convertedDto = convert(bundle, trackerDto);

//
// Assign the pre-allocated id (if any) before staging so the flush path can emit it
// unchanged, and so any TEAV/changelog code that reads convertedDto.getId() sees the
// final value.
//
if (preAllocatedIds != null && isNew(bundle, trackerDto)) {
assignId(convertedDto, preAllocatedIds[preAllocatedIdsCursor++]);
}

//
// Handle ownership records, if required
//
Expand Down Expand Up @@ -214,7 +233,7 @@ public PersistResult persist(TrackerBundle bundle) {
if (FlushMode.OBJECT == bundle.getFlushMode()) {
// Flush entity INSERTs/UPDATEs before changelog INSERTs so FK references
// (trackedentityid, eventid) exist before changelog rows reference them.
batch.flush(entityManager);
batch.flush(entityManager, conn);
entityManager.flush();
changeLogs.flushAll(conn);
}
Expand Down Expand Up @@ -252,7 +271,7 @@ public PersistResult persist(TrackerBundle bundle) {
if (FlushMode.AUTO == bundle.getFlushMode()) {
// Flush entity INSERTs/UPDATEs before changelog INSERTs so FK references
// (trackedentityid, eventid) exist before changelog rows reference them.
batch.flush(entityManager);
batch.flush(entityManager, conn);
entityManager.flush();
changeLogs.flushAll(conn);
}
Expand All @@ -264,6 +283,35 @@ public PersistResult persist(TrackerBundle bundle) {
return new PersistResult(typeReport, notifications);
}

private long[] preAllocateIds(Connection conn, TrackerBundle bundle, List<T> dtos)
throws SQLException {
String sequenceName = sequenceName();
if (sequenceName == null) {
return null;
}
int createCount = 0;
for (T dto : dtos) {
if (isNew(bundle, dto)) {
createCount++;
}
}
if (createCount == 0) {
return null;
}
return allocateIds(conn, sequenceName, createCount);
}

private void assignId(V convertedDto, long id) {
if (convertedDto instanceof TrackedEntity te) {
te.setId(id);
} else {
throw new IllegalStateException(
"Pre-allocated id assignment not implemented for "
+ convertedDto.getClass().getName()
+ " -- sequenceName() returned non-null but assignId does not handle this type.");
}
}

private void stageInsert(V convertedDto, EntityWriteBatch batch) {
if (convertedDto instanceof TrackedEntity te) {
batch.stageInsert(te);
Expand Down Expand Up @@ -396,6 +444,40 @@ protected static Set<Notification> mergeNotifications(
@SuppressWarnings("unchecked")
protected abstract List<T> getByType(TrackerBundle bundle);

/**
* Returns the PostgreSQL sequence used to allocate primary keys for this entity type, or {@code
* null} if id allocation should be left to Hibernate. When non-null, {@link #persist} pre-fetches
* one id per CREATE entity in a single round-trip and assigns the id before staging so the flush
* path can emit a multi-row INSERT.
*/
protected abstract String sequenceName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we not make this one return null by default and make TrackedEntityPersister override it? this way we would get rid of the boilerplate code in the other persisters

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed to be this way because this is the skeleton for next steps.
Every persister will need to implement this, so there will be no null returned in the end


/**
* Fetches {@code count} ids from {@code sequenceName} in a single round-trip. The sequence name
* is interpolated into the SQL (not a bind parameter) because PostgreSQL's {@code nextval} takes
* a {@code regclass}, and the value comes from {@link #sequenceName()} — controlled by us, not
* user input.
*/
private static long[] allocateIds(Connection conn, String sequenceName, int count)
throws SQLException {
long[] ids = new long[count];
String sql = "select nextval('" + sequenceName + "') from generate_series(1, ?)";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, count);
try (ResultSet rs = ps.executeQuery()) {
int i = 0;
while (rs.next()) {
ids[i++] = rs.getLong(1);
}
if (i != count) {
throw new SQLException(
"Allocated " + i + " ids from " + sequenceName + ", expected " + count);
}
}
}
return ids;
}

// // // // // // // //
// // // // // // // //
// SHARED METHODS //
Expand Down Expand Up @@ -446,12 +528,12 @@ protected void handleTrackedEntityAttributeValues(
// already attached to the session, preventing a duplicate em.persist that would throw
// EntityExistsException. The batch overlay catches the within-persister case (e.g. two
// enrollments under the same TE both carrying the same attribute) where the second occurrence
// would otherwise produce a fresh instance with the same composite key. A transient TE
// (id == 0) has no DB rows yet -- the insert is still staged in the batch -- so binding it as
// a query parameter would throw TransientObjectException; skip the JPQL in that case and rely
// on the batch overlay alone.
// would otherwise produce a fresh instance with the same composite key. A TE staged for
// insert in this batch has no DB row yet -- whether or not its id is set (Phase 4a pre-
// allocates the id from the sequence) -- so the JPQL would always return empty and the lookup
// can be skipped.
Map<MetadataIdentifier, TrackedEntityAttributeValue> attributeValueById =
trackedEntity.getId() == 0
batch.isStagedAsInsert(trackedEntity)
? new HashMap<>()
: entityManager
.createQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package org.hisp.dhis.tracker.imports.bundle.persister;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -62,11 +63,17 @@ public EnrollmentPersister(
ReservedValueService reservedValueService,
DataSource dataSource,
FileResourceStore fileResourceStore,
ObjectMapper objectMapper,
TrackedEntityProgramOwnerService trackedEntityProgramOwnerService) {
super(reservedValueService, dataSource, fileResourceStore);
super(reservedValueService, dataSource, fileResourceStore, objectMapper);
this.trackedEntityProgramOwnerService = trackedEntityProgramOwnerService;
}

@Override
protected String sequenceName() {
return null;
}

@Override
protected void updateAttributes(
TrackerPreheat preheat,
Expand Down
Loading
Loading