Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ A remote Hub resource can be saved to the local database by making a POST reques
### Full reindexing
Resources can be fully reindexed in mod-search by making a POST request to the `/linked-data/reindex/full` endpoint. This will start a full reindex of all resources in the database. All resources will be reindexed regardless of their current index state.

The endpoint returns a job instance ID that can be used to track the reindex job's progress. The reindexing process executes asynchronously in the background.
The endpoint returns a job execution ID that can be used to track the reindex job's progress. The reindexing process executes asynchronously in the background.

Optionally, you can specify a `resourceType` query parameter (HUB or WORK) to reindex only resources of a specific type:

Expand All @@ -299,12 +299,12 @@ Example response:
12345
```

The returned job instance ID can be used to monitor the job's status.
The returned job execution ID can be used to monitor the job's status.

### Incremental reindexing
Resources that have not been indexed yet can be incrementally reindexed by making a POST request to the `/linked-data/reindex/incremental` endpoint. This will only reindex resources that have not been indexed yet (indexDate is null).

Similar to full reindex, the endpoint returns a job instance ID for tracking the asynchronous reindexing process. You can specify a `resourceType` query parameter:
Similar to full reindex, the endpoint returns a job execution ID for tracking the asynchronous reindexing process. You can specify a `resourceType` query parameter:

```bash
curl --location --request POST '{{ base-uri }}/linked-data/reindex/incremental?resourceType=WORK' \
Expand All @@ -317,7 +317,29 @@ Example response:
12346
```

The returned job instance ID can be used to monitor the job's status.
The returned job execution ID can be used to monitor the job's status.

### Reindex job status
The status of a reindex job can be retrieved by making a GET request to the `/linked-data/reindex/status` endpoint with the `jobExecutionId` query parameter:

```bash
curl --location --request GET '{{ base-uri }}/linked-data/reindex/status?jobExecutionId=12345' \
--header 'x-okapi-tenant: {tenant}' \
--header 'x-okapi-token: {token}'
```

Example response:
```json
{
"startDate": "2026-02-27T10:00:00",
"endDate": "2026-02-27T10:05:00",
"startedBy": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"reindexType": "FULL",
"linesRead": 50000,
"linesSent": 49998,
"status": "COMPLETED"
}
```

# Integration with FOLIO
When running in FOLIO mode, this module integrates with multiple Folio modules via Kafka.
Expand Down
41 changes: 29 additions & 12 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,6 @@
"permissionsRequired": [ "linked-data.resources.import.post" ],
"modulePermissions": [ "source-storage.records.formatted.item.get" ]
},
{
"methods": [ "POST" ],
"pathPattern": "/linked-data/reindex/full",
"permissionsRequired": [ "linked-data.resources.reindex.full.post" ],
"modulePermissions": [ "search.index.inventory.reindex.post" ]
},
{
"methods": [ "POST" ],
"pathPattern": "/linked-data/reindex/incremental",
"permissionsRequired": [ "linked-data.resources.reindex.incremental.post" ],
"modulePermissions": []
},
{
"methods": [ "GET" ],
"pathPattern": "/linked-data/resource/{id}/graph",
Expand Down Expand Up @@ -170,6 +158,30 @@
}
]
},
{
"id": "linked-data-reindex",
"version": "1.0",
"handlers": [
{
"methods": [ "POST" ],
"pathPattern": "/linked-data/reindex/full",
"permissionsRequired": [ "linked-data.resources.reindex.full.post" ],
"modulePermissions": [ "search.index.inventory.reindex.post" ]
},
{
"methods": [ "POST" ],
"pathPattern": "/linked-data/reindex/incremental",
"permissionsRequired": [ "linked-data.resources.reindex.incremental.post" ],
"modulePermissions": []
},
{
"methods": [ "GET" ],
"pathPattern": "/linked-data/reindex/status",
"permissionsRequired": [ "linked-data.resources.reindex.status.get" ],
"modulePermissions": []
}
]
},
{
"id": "linked-data-admin",
"version": "1.0",
Expand Down Expand Up @@ -256,6 +268,11 @@
"displayName": "Linked Data: Start incremental reindexing of resources in mod-search index",
"description": "Start the process of incremental reindexing of resources in the database. Only resources that have not been indexed yet (indexDate is null) will be reindexed."
},
{
"permissionName": "linked-data.resources.reindex.status.get",
"displayName": "Linked Data: Get reindex job status",
"description": "Get the status of a reindex job by its execution ID."
},
{
"permissionName": "linked-data.admin.caches.delete",
"displayName": "Linked Data: Clear all application caches",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class BatchConfig {
public static final String JOB_PARAM_RESOURCE_TYPE = "resourceType";
public static final String JOB_PARAM_STARTED_BY = "startedBy";
public static final String JOB_PARAM_RUN_TIMESTAMP = "run.timestamp";
public static final String REINDEX_STEP_NAME = "reindexStep";

@Bean
public BeanFactoryPostProcessor jobAndStepScopeConfigurer() {
Expand Down Expand Up @@ -117,8 +118,8 @@ public AsyncTaskExecutor reindexTaskExecutor(
@Value("${mod-linked-data.reindex.pool-size}") int poolSize
) {
var exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(poolSize);
exec.setMaxPoolSize(poolSize);
Comment thread
PBobylev marked this conversation as resolved.
exec.setQueueCapacity(poolSize);
exec.setThreadNamePrefix("reindex-");
exec.initialize();
return exec;
Expand All @@ -131,7 +132,7 @@ public Step reindexStep(JobRepository jobRepository,
ReindexWriter reindexWriter,
@Value("${mod-linked-data.reindex.chunk-size}") int chunkSize,
AsyncTaskExecutor reindexTaskExecutor) {
return new StepBuilder("reindexStep", jobRepository)
return new StepBuilder(REINDEX_STEP_NAME, jobRepository)
.<Resource, ResourceIndexEvent>chunk(chunkSize)
.reader(resourceReader)
.processor(reindexProcessor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.folio.linked.data.util.Constants.STANDALONE_PROFILE;

import lombok.RequiredArgsConstructor;
import org.folio.linked.data.domain.dto.ReindexJobStatusDto;
import org.folio.linked.data.rest.resource.ReindexApi;
import org.folio.linked.data.service.batch.ReindexJobService;
import org.springframework.context.annotation.Profile;
Expand All @@ -30,4 +31,9 @@ public ResponseEntity<String> incrementalReindex(String resourceType) {
var jobExecutionId = valueOf(reindexJobService.start(false, resourceType));
return ResponseEntity.ok(jobExecutionId);
}

@Override
public ResponseEntity<ReindexJobStatusDto> getReindexJobStatus(Long jobExecutionId) {
return ResponseEntity.ok(reindexJobService.getStatus(jobExecutionId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.folio.linked.data.mapper.dto;

import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
import static org.folio.linked.data.configuration.batch.BatchConfig.REINDEX_STEP_NAME;
import static org.folio.linked.data.domain.dto.ReindexJobStatusDto.ReindexTypeEnum.FULL;
import static org.folio.linked.data.domain.dto.ReindexJobStatusDto.ReindexTypeEnum.INCREMENTAL;
import static org.mapstruct.MappingConstants.ComponentModel.SPRING;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Set;
import org.folio.linked.data.domain.dto.ReindexJobStatusDto;
import org.folio.linked.data.domain.dto.ReindexJobStatusDto.ReindexTypeEnum;
import org.folio.linked.data.model.entity.batch.BatchJobExecution;
import org.folio.linked.data.model.entity.batch.BatchStepExecution;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

@Mapper(componentModel = SPRING)
public interface ReindexJobStatusMapper {

@Mapping(target = "startDate", source = "startTime")
@Mapping(target = "endDate", source = "endTime")
@Mapping(target = "reindexType", source = "isFullReindex")
@Mapping(target = "linesRead", expression = "java(linesRead(execution.getStepExecutions()))")
@Mapping(target = "linesSent", expression = "java(linesSent(execution.getStepExecutions()))")
ReindexJobStatusDto toDto(BatchJobExecution execution);

default OffsetDateTime toOffsetDateTime(LocalDateTime localDateTime) {
return ofNullable(localDateTime).map(ldt -> ldt.atOffset(ZoneOffset.UTC)).orElse(null);
}

default ReindexTypeEnum toReindexType(String isFullReindex) {
return ofNullable(isFullReindex)
.map(Boolean::parseBoolean)
.map(full -> full ? FULL : INCREMENTAL)
.orElse(null);
}

default Long linesRead(Set<BatchStepExecution> steps) {
return getReindexStepCount(steps, true);
}

default Long linesSent(Set<BatchStepExecution> steps) {
return getReindexStepCount(steps, false);
}

private Long getReindexStepCount(Set<BatchStepExecution> steps, boolean read) {
if (isNull(steps)) {
return 0L;
}
return steps.stream()
.filter(s -> REINDEX_STEP_NAME.equals(s.getStepName()))
.findFirst()
.map(s -> read
? ofNullable(s.getReadCount()).orElse(0L)
: ofNullable(s.getWriteCount()).orElse(0L))
.orElse(0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.folio.linked.data.model.entity.batch;

import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.Id;
import jakarta.persistence.OneToMany;
import java.time.LocalDateTime;
import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.hibernate.annotations.Formula;

@Getter
@Setter
@EqualsAndHashCode(of = "jobExecutionId")
@Entity
public class BatchJobExecution {
Comment thread
PBobylev marked this conversation as resolved.

@Id
private Long jobExecutionId;

private LocalDateTime startTime;

private LocalDateTime endTime;

private String status;

@Formula("(SELECT p.parameter_value FROM batch_job_execution_params p"
+ " WHERE p.job_execution_id = job_execution_id AND p.parameter_name = 'startedBy')")
private String startedBy;

Comment thread
PBobylev marked this conversation as resolved.
@Formula("(SELECT p.parameter_value FROM batch_job_execution_params p"
+ " WHERE p.job_execution_id = job_execution_id AND p.parameter_name = 'isFullReindex')")
private String isFullReindex;

@OneToMany(mappedBy = "jobExecution", fetch = FetchType.EAGER)
Comment thread
PBobylev marked this conversation as resolved.
private Set<BatchStepExecution> stepExecutions;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.folio.linked.data.model.entity.batch;

import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@EqualsAndHashCode(of = "stepExecutionId")
@Entity
public class BatchStepExecution {
Comment thread
PBobylev marked this conversation as resolved.

@Id
private Long stepExecutionId;

@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "job_execution_id")
private BatchJobExecution jobExecution;

private String stepName;

private Long readCount;

private Long writeCount;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.folio.linked.data.repo;

import org.folio.linked.data.model.entity.batch.BatchJobExecution;
import org.springframework.data.jpa.repository.JpaRepository;

public interface BatchJobExecutionRepository extends JpaRepository<BatchJobExecution, Long> {
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.folio.linked.data.service.batch;

import org.folio.linked.data.domain.dto.ReindexJobStatusDto;

public interface ReindexJobService {

Long start(boolean isFullReindex, String resourceType);

ReindexJobStatusDto getStatus(Long jobExecutionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.folio.ld.dictionary.ResourceTypeDictionary;
import org.folio.linked.data.domain.dto.ReindexJobStatusDto;
import org.folio.linked.data.exception.RequestProcessingExceptionBuilder;
import org.folio.linked.data.mapper.dto.ReindexJobStatusMapper;
import org.folio.linked.data.repo.BatchJobExecutionRepository;
import org.folio.spring.FolioExecutionContext;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecutionException;
Expand All @@ -32,6 +35,8 @@ public class ReindexJobServiceImpl implements ReindexJobService {
private final TaskExecutorJobOperator jobOperator;
private final FolioExecutionContext folioExecutionContext;
private final RequestProcessingExceptionBuilder exceptionBuilder;
private final BatchJobExecutionRepository batchJobExecutionRepository;
private final ReindexJobStatusMapper reindexJobStatusMapper;

@Override
public Long start(boolean isFullReindex, String resourceType) {
Expand All @@ -53,12 +58,19 @@ public Long start(boolean isFullReindex, String resourceType) {
params.add(new JobParameter<>(JOB_PARAM_RUN_TIMESTAMP, System.currentTimeMillis(), Long.class));

try {
return jobOperator.start(reindexJob, new JobParameters(params)).getJobInstanceId();
return jobOperator.start(reindexJob, new JobParameters(params)).getId();
} catch (JobExecutionException e) {
throw new IllegalArgumentException("Job launch exception", e);
}
}

@Override
public ReindexJobStatusDto getStatus(Long jobExecutionId) {
return batchJobExecutionRepository.findById(jobExecutionId)
.map(reindexJobStatusMapper::toDto)
.orElseThrow(() -> exceptionBuilder.notFoundLdResourceByIdException("JobExecution", valueOf(jobExecutionId)));
}

private ResourceTypeDictionary getResourceTypeDictionary(String resourceType) {
try {
return ResourceTypeDictionary.valueOf(resourceType.toUpperCase());
Expand Down
Loading