Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 194 additions & 146 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
<h2db.version>2.2.224</h2db.version>
<db2.version>12.1.0.0</db2.version>
<cron4j.version>2.2.5</cron4j.version>
<testcontainers>1.19.3</testcontainers>
<testcontainers>1.21.4</testcontainers>
<akka.version>2.8.5</akka.version>
<akka-http.version>10.5.3</akka-http.version>
<reflections.version>0.10.2</reflections.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,36 @@ class FhirMappingJobManager(
}
}

/**
* Substitutes batch parameters in the preprocessSql of all source bindings in a mapping task.
* Parameters in preprocessSql are specified as $parameterName and will be replaced with the corresponding value.
*
* @param task The original mapping task
* @param parameters Map of parameter names to their values (e.g., Map("year" -> "2014", "month" -> "1"))
* @return A new mapping task with substituted preprocessSql in all source bindings
*/
private def substituteBatchParameters(task: FhirMappingTask, parameters: Map[String, String]): FhirMappingTask = {
val updatedSourceBinding = task.sourceBinding.map { case (alias, binding) =>
val updatedBinding = binding.preprocessSql match {
case Some(sql) =>
val substitutedSql = parameters.foldLeft(sql) { case (currentSql, (paramName, paramValue)) =>
currentSql.replace(s"$$$paramName", paramValue)
}
// Create a new binding with the substituted SQL based on the binding type
binding match {
case fs: FileSystemSource => fs.copy(preprocessSql = Some(substitutedSql))
case ss: SqlSource => ss.copy(preprocessSql = Some(substitutedSql))
case ks: KafkaSource => ks.copy(preprocessSql = Some(substitutedSql))
case fss: FhirServerSource => fss.copy(preprocessSql = Some(substitutedSql))
case other => other // For any other type, return as-is
}
case None => binding
}
alias -> updatedBinding
}
task.copy(sourceBinding = updatedSourceBinding)
}

/**
* Read the source data, divide it into chunks and execute the mapping (first mapping task in the Fhir Mapping Job
* Execution) and write each chunk sequentially
Expand All @@ -308,24 +338,83 @@ class FhirMappingJobManager(
identityServiceSettings: Option[IdentityServiceSettings] = None,
timeRange: Option[(LocalDateTime, LocalDateTime)] = None): Future[Unit] = {
val mappingTask = mappingJobExecution.mappingTasks.head
logger.debug(s"Reading source data for mapping ${mappingTask.name} within mapping job ${mappingJobExecution.jobId} ...")

// Check if this task has a batching strategy defined
mappingTask.batchingStrategy match {
case Some(strategy) if strategy.batchParameterSets.nonEmpty =>
// Execute the mapping task for each batch parameter set sequentially
val totalBatches = strategy.batchParameterSets.size
logger.debug(s"Batching strategy defined for mapping ${mappingTask.name} with $totalBatches batches")

strategy.batchParameterSets.zipWithIndex.foldLeft(Future.successful(())) { case (previousFuture, (batchParams, batchIndex)) =>
previousFuture.flatMap { _ =>
val batchNumber = batchIndex + 1
val isLastBatch = batchNumber == totalBatches
logger.debug(s"Processing batch $batchNumber/$totalBatches for mapping ${mappingTask.name} with parameters: $batchParams")

// Substitute the batch parameters in the task's preprocessSql
val batchTask = substituteBatchParameters(mappingTask, batchParams)

// Execute the batch, only log final result on last batch
executeSingleBatch(mappingJobExecution.copy(mappingTasks = Seq(batchTask)), sourceSettings, fhirWriter,
terminologyServiceSettings, identityServiceSettings, timeRange, Some(batchNumber), Some(totalBatches), Some(batchParams), isLastBatch)
}
}

case _ =>
// No batching strategy, execute normally (always the last/only batch)
executeSingleBatch(mappingJobExecution, sourceSettings, fhirWriter, terminologyServiceSettings,
identityServiceSettings, timeRange, None, None, None, isLastBatch = true)
}
}

/**
* Execute a single batch of the mapping task. This is called either once (when no batching strategy)
* or multiple times (once for each batch parameter set).
*
* @param mappingJobExecution Fhir Mapping Job execution
* @param sourceSettings The source settings of the mapping job
* @param fhirWriter FHIR writer
* @param terminologyServiceSettings Terminology service settings
* @param identityServiceSettings Identity service settings
* @param timeRange Time range for the source data to load
* @param batchNumber Current batch number (1-based), None if no batching
* @param totalBatches Total number of batches, None if no batching
* @param batchParams Parameters for this batch, None if no batching
* @param isLastBatch Whether this is the last batch (used to determine when to log final execution result)
* @return
*/
private def executeSingleBatch(mappingJobExecution: FhirMappingJobExecution,
sourceSettings: Map[String, MappingJobSourceSettings],
fhirWriter: BaseFhirWriter,
terminologyServiceSettings: Option[TerminologyServiceSettings],
identityServiceSettings: Option[IdentityServiceSettings],
timeRange: Option[(LocalDateTime, LocalDateTime)],
batchNumber: Option[Int],
totalBatches: Option[Int],
batchParams: Option[Map[String, String]],
isLastBatch: Boolean): Future[Unit] = {
val mappingTask = mappingJobExecution.mappingTasks.head
val batchInfo = batchNumber.map(n => s" [batch $n/${totalBatches.getOrElse("?")}]").getOrElse("")
logger.debug(s"Reading source data for mapping ${mappingTask.name}$batchInfo within mapping job ${mappingJobExecution.jobId} ...")
val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.jobId))
// Cache the DataFrame to avoid re-reading the source data multiple times during processing.
// This is particularly useful when using chunking (e.g., via ToFhirConfig.engineConfig.maxChunkSizeForMappingJobs),
// as each chunk triggers a new read of the source data. Caching ensures that the data is read only once
// and reused across all chunks, improving performance.
df.cache()
val sizeOfDf: Long = df.count()
logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.name} within mapping job ${mappingJobExecution.jobId} ...")
val batchParamsInfo = batchParams.map(p => s" with params: $p").getOrElse("")
logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.name}$batchInfo$batchParamsInfo within mapping job ${mappingJobExecution.jobId} ...")

val result = ToFhirConfig.engineConfig.maxChunkSizeForMappingJobs match {
//If not specify run it as single chunk
case None =>
logger.debug(s"Executing the mapping ${mappingTask.name} within job ${mappingJobExecution.jobId} ...")
logger.debug(s"Executing the mapping ${mappingTask.name}$batchInfo within job ${mappingJobExecution.jobId} ...")
executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId))
.map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
case Some(chunkSize) if sizeOfDf < chunkSize =>
logger.debug(s"Executing the mapping ${mappingTask.name} within job ${mappingJobExecution.jobId} ...")
logger.debug(s"Executing the mapping ${mappingTask.name}$batchInfo within job ${mappingJobExecution.jobId} ...")
executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId))
.map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter
//Otherwise divide the data into chunks
Expand All @@ -339,15 +428,19 @@ class FhirMappingJobManager(
case (fj, (df, i)) => fj.flatMap(_ =>
executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId))
.map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter))
.map(_ => logger.debug(s"Chunk ${i + 1} / $numOfChunks is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}..."))
.map(_ => logger.debug(s"Chunk ${i + 1} / $numOfChunks$batchInfo is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}..."))
)
}
}
result.map(r => {
// Remove the DataFrame from cache after processing to free up memory resources.
df.unpersist()
// log the result of mapping task execution
ExecutionLogger.logExecutionResultForBatchMappingTask(mappingJobExecution.id)
// Only log the final result of mapping task execution when this is the last batch
// For batching strategy with multiple batches, this ensures the execution cache entry
// is only removed after all batches complete
if (isLastBatch) {
ExecutionLogger.logExecutionResultForBatchMappingTask(mappingJobExecution.id)
}
r
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.tofhir.engine.model

/**
* Batching strategy for processing source data in multiple batches.
* This allows defining custom batching logic per mapping task, such as processing data year by year,
* ID range by ID range, or any custom grouping criteria.
*
* @param batchParameterSets Sequence of parameter sets, where each set represents one batch.
* Each parameter set is a map of (parameterName -> parameterValue).
* All parameters in a set will be substituted in preprocessSql as $parameterName.
*
* Example for single parameter (year):
* batchParameterSets = Seq(
* Map("year" -> "2014"),
* Map("year" -> "2015"),
* Map("year" -> "2016")
* )
*
* Example for multiple parameters (year + month):
* batchParameterSets = Seq(
* Map("year" -> "2020", "month" -> "1"),
* Map("year" -> "2020", "month" -> "2"),
* Map("year" -> "2020", "month" -> "3"),
* Map("year" -> "2021", "month" -> "1"),
* ...
* )
*/
case class BatchingStrategy(batchParameterSets: Seq[Map[String, String]])
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ package io.tofhir.engine.model
* @param sourceBinding A map that provides details on how to load each source data for the mapping.
* It links the source settings of a mapping job to the sources of a mapping.
* @param mapping FhirMapping definition to execute
* @param batchingStrategy Optional batching strategy to process data in multiple batches based on custom parameters
* (e.g., by year, ID range, or any custom grouping). If provided, the mapping will be executed
* once for each parameter value, with the parameter available in preprocessSql as $parameterName
*/
case class FhirMappingTask(name: String, mappingRef: String, sourceBinding: Map[String, MappingSourceBinding], mapping: Option[FhirMapping] = None)
case class FhirMappingTask(name: String, mappingRef: String, sourceBinding: Map[String, MappingSourceBinding], mapping: Option[FhirMapping] = None, batchingStrategy: Option[BatchingStrategy] = None)


/**
Expand Down
23 changes: 23 additions & 0 deletions tofhir-server/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2667,6 +2667,29 @@ components:
- $ref: "#/components/schemas/SqlSource"
- $ref: "#/components/schemas/KafkaSource"
- $ref: "#/components/schemas/FhirServerSource"
batchingStrategy:
$ref: "#/components/schemas/BatchingStrategy"

BatchingStrategy:
description: "Batching strategy for processing source data in multiple batches based on custom parameters"
type: object
properties:
batchParameterSets:
type: array
description: "Sequence of parameter sets, where each set represents one batch. Each parameter set is a map of (parameterName -> parameterValue). All parameters in a set will be substituted in preprocessSql as $parameterName."
items:
type: object
additionalProperties:
type: string
example:
- year: "2020"
month: "1"
- year: "2020"
month: "2"
- year: "2021"
month: "1"
required:
- batchParameterSets

SchedulingSettings:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
logger.debug(s"Testing the mapping ${testResourceCreationRequest.fhirMappingTask.mappingRef} inside the job $jobId by selecting ${testResourceCreationRequest.resourceFilter.numberOfRows} ${testResourceCreationRequest.resourceFilter.order} records.")

// If an unmanaged mapping is provided within the mapping task, normalize the context urls
val mappingTask: FhirMappingTask =
val mappingTaskWithNormalizedUrls: FhirMappingTask =
testResourceCreationRequest.fhirMappingTask.mapping match {
case None => testResourceCreationRequest.fhirMappingTask
case _ =>
Expand All @@ -213,6 +213,16 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls))
}

// If the mapping task has a batching strategy, substitute the first batch parameter set
// This allows testing of parameterized preprocessSql queries with sample parameters
val mappingTask: FhirMappingTask = mappingTaskWithNormalizedUrls.batchingStrategy match {
case Some(strategy) if strategy.batchParameterSets.nonEmpty =>
val firstBatchParams = strategy.batchParameterSets.head
logger.debug(s"Testing with first batch parameters: $firstBatchParams")
substituteBatchParameters(mappingTaskWithNormalizedUrls, firstBatchParams)
case _ => mappingTaskWithNormalizedUrls
}

val fhirMappingJobManager = new FhirMappingJobManager(
toFhirEngine.mappingRepo,
toFhirEngine.contextLoader,
Expand Down Expand Up @@ -394,6 +404,36 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin
toFhirEngine.runningJobRegistry.isJobRunning(jobId)
}
}

/**
* Substitutes batch parameters in the preprocessSql of all source bindings in a mapping task.
* Parameters in preprocessSql are specified as $parameterName and will be replaced with the corresponding value.
*
* @param task The original mapping task
* @param parameters Map of parameter names to their values (e.g., Map("year" -> "2014", "month" -> "1"))
* @return A new mapping task with substituted preprocessSql in all source bindings
*/
private def substituteBatchParameters(task: FhirMappingTask, parameters: Map[String, String]): FhirMappingTask = {
val updatedSourceBinding = task.sourceBinding.map { case (alias, binding) =>
val updatedBinding = binding.preprocessSql match {
case Some(sql) =>
val substitutedSql = parameters.foldLeft(sql) { case (currentSql, (paramName, paramValue)) =>
currentSql.replace(s"$$$paramName", paramValue)
}
// Create a new binding with the substituted SQL based on the binding type
binding match {
case fs: FileSystemSource => fs.copy(preprocessSql = Some(substitutedSql))
case ss: SqlSource => ss.copy(preprocessSql = Some(substitutedSql))
case ks: KafkaSource => ks.copy(preprocessSql = Some(substitutedSql))
case fss: FhirServerSource => fss.copy(preprocessSql = Some(substitutedSql))
case other => other // For any other type, return as-is
}
case None => binding
}
alias -> updatedBinding
}
task.copy(sourceBinding = updatedSourceBinding)
}
}