Skip to content
This repository was archived by the owner on May 15, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
* separate upload.
* <li>{@value #INDEXING_SERVICE_REQUEST_MODE} - Specifies the default request mode for index and
* delete item requests
* <li>{@value #INDEXING_SERVICE_DEFAULT_QUEUE} - Specifies the default queue for index and
* push item requests
* </ul>
*/
public class IndexingServiceImpl extends BaseApiService<CloudSearch> implements IndexingService {
Expand All @@ -130,6 +132,7 @@ public class IndexingServiceImpl extends BaseApiService<CloudSearch> implements
public static final String CONNECTOR_ID = "api.connectorId";
public static final String UPLOAD_THRESHOLD_BYTES = "api.contentUploadThresholdBytes";
public static final String INDEXING_SERVICE_REQUEST_MODE = "api.defaultRequestMode";
public static final String INDEXING_SERVICE_DEFAULT_QUEUE = "api.defaultQueue";
public static final String REQUEST_CONNECT_TIMEOUT = "indexingService.connectTimeoutSeconds";
public static final String REQUEST_READ_TIMEOUT = "indexingService.readTimeoutSeconds";
public static final String ENABLE_API_DEBUGGING = "indexingService.enableDebugging";
Expand Down Expand Up @@ -165,6 +168,7 @@ public class IndexingServiceImpl extends BaseApiService<CloudSearch> implements
private final VersionProvider versionProvider;
private final QuotaServer<Operations> quotaServer;
private final RequestMode requestMode;
private final String defaultQueue;
private final boolean enableApiDebugging;
private final boolean allowUnknownGsuitePrincipals;

Expand Down Expand Up @@ -245,6 +249,7 @@ private IndexingServiceImpl(Builder builder) {
this.versionProvider = builder.versionProvider;
this.quotaServer = builder.quotaServer;
this.requestMode = builder.requestMode;
this.defaultQueue = builder.defaultQueue;
this.enableApiDebugging = builder.enableApiDebugging;
this.allowUnknownGsuitePrincipals = builder.allowUnknownGsuitePrincipals;
}
Expand All @@ -262,6 +267,7 @@ public static class Builder extends BaseApiService.AbstractBuilder<Builder, Clou
private QuotaServer<Operations> quotaServer =
new QuotaServer.Builder<>(Operations.class).build();
private RequestMode requestMode = DEFAULT_REQUEST_MODE;
private String defaultQueue;
private int contentUploadConnectTimeoutSeconds = DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int contentUploadReadTimeoutSeconds = DEFAULT_READ_TIMEOUT_SECONDS;
private boolean enableApiDebugging;
Expand Down Expand Up @@ -292,6 +298,11 @@ public Builder setRequestMode(RequestMode requestMode) {
return this;
}

public Builder setDefaultQueue(String defaultQueue) {
this.defaultQueue = defaultQueue;
return this;
}

public Builder setConnectorId(String connectorId) {
this.connectorId = connectorId;
return this;
Expand Down Expand Up @@ -408,6 +419,7 @@ public static IndexingServiceImpl.Builder fromConfiguration(
} catch (IllegalArgumentException e) {
throw new InvalidConfigurationException("Unable to parse configured request mode", e);
}
String defaultQueue = Configuration.getString(INDEXING_SERVICE_DEFAULT_QUEUE, "").get();
int connectTimeoutSeconds =
Configuration.getInteger(REQUEST_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT_SECONDS).get();
Configuration.checkConfiguration(
Expand Down Expand Up @@ -440,6 +452,7 @@ public static IndexingServiceImpl.Builder fromConfiguration(
.setBatchPolicy(BatchPolicy.fromConfiguration())
.setRetryPolicy(RetryPolicy.fromConfiguration())
.setRequestMode(requestModeConfigured)
.setDefaultQueue(defaultQueue)
.setConnectorId(Configuration.getString("api.connectorId", defaultConnectorName).get())
.setRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds)
.setContentUploadRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds)
Expand Down Expand Up @@ -712,6 +725,7 @@ public ListenableFuture<Operation> indexItem(Item item, RequestMode requestMode)
if (item.decodeVersion() == null) {
item.encodeVersion(versionProvider.getVersion());
}
item.setQueue(getQueue(item.getQueue()));
Index updateRequest =
service
.indexing()
Expand Down Expand Up @@ -916,6 +930,7 @@ public ListenableFuture<Item> push(String id, PushItem pushItem) throws IOExcept
checkArgument(!Strings.isNullOrEmpty(id), "id can not be null or empty");
checkArgument(pushItem != null, "Push item cannot be null.");
String resourceName = getItemResourceName(id);
pushItem.setQueue(getQueue(pushItem.getQueue()));
Push request =
this.service
.indexing()
Expand Down Expand Up @@ -981,6 +996,10 @@ private String getRequestMode(RequestMode requestMode) {
return requestMode == RequestMode.UNSPECIFIED ? this.requestMode.name() : requestMode.name();
}

private String getQueue(String queue) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method can be static

do we want to check if queue.isEmpty() ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added check for queue.isEmpty().

I do not see how this could be static, as it depends on this.defaultQueue

return (queue == null || queue.isEmpty()) ? this.defaultQueue : queue;
}

/**
* Performs an {@link IndexingServiceImpl#executeRequest} but tracks "not found" exceptions to
* convert them to a {@code null} return value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ public class IndexingServiceTest {

@Before
public void createService() throws IOException, GeneralSecurityException {
createService(false, false);
createService(false, false, null);
}

private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals)
private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals, String defaultQueue)
throws IOException, GeneralSecurityException {
this.transport = new TestingHttpTransport("datasources/source/connectors/unitTest");
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
Expand Down Expand Up @@ -191,6 +191,7 @@ private void createService(boolean enableDebugging, boolean allowUnknownGsuitePr
.setConnectorId("unitTest")
.setEnableDebugging(enableDebugging)
.setAllowUnknownGsuitePrincipals(allowUnknownGsuitePrincipals)
.setDefaultQueue(defaultQueue)
.build();
this.indexingService.startAsync().awaitRunning();
}
Expand Down Expand Up @@ -564,7 +565,7 @@ public void testUpdateItem() throws IOException, InterruptedException {

@Test
public void testUpdateItemDebugOptionsEnabled() throws Exception {
createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false);
createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ null);
doAnswer(
invocation -> {
Items.Index updateRequest = invocation.getArgument(0);
Expand All @@ -582,7 +583,7 @@ public void testUpdateItemDebugOptionsEnabled() throws Exception {

@Test
public void testUpdateItemAllowUnknownGsuitePrincipals() throws Exception {
createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true);
createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true, /*defaultQueue*/ null);
doAnswer(
invocation -> {
Items.Index updateRequest = invocation.getArgument(0);
Expand Down Expand Up @@ -731,6 +732,19 @@ public void testUpdateItemWithContentHash() throws IOException {
verify(quotaServer, times(1)).acquire(Operations.DEFAULT);
}

@Test
public void testUpdateItemWithDefaultQueue() throws GeneralSecurityException, IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, just throw Exception for tests (instead of all named individual exception).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just copying existing test cases ; )

createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue");
Item item = new Item().setName(GOOD_ID);
ByteArrayContent content = ByteArrayContent.fromString("text/plain", "");
this.indexingService.indexItemAndContent(
item, content, null, ContentFormat.TEXT, RequestMode.ASYNCHRONOUS);
assertEquals(
"specialqueue",
item.getQueue());
verify(quotaServer, times(1)).acquire(Operations.DEFAULT);
}

@Test
public void testUpdateItemError() throws IOException, InterruptedException {
doAnswer(
Expand Down Expand Up @@ -1044,6 +1058,17 @@ public void testPushItemNull() throws IOException {
this.indexingService.push(GOOD_ID, null);
}

@Test
public void testPushItemWithDefaultQueue() throws GeneralSecurityException, IOException {
createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue");
this.transport.addPushItemReqResp(GOOD_ID, SOURCE_ID, new Item());
PushItem pushItem = new PushItem();
this.indexingService.push(GOOD_ID, pushItem);
assertEquals(
"specialqueue",
pushItem.getQueue());
}

/* unreserve */
@Test
public void testUnreserveItem() throws IOException {
Expand Down