diff --git a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java
index 7350644..a59b8fe 100644
--- a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java
+++ b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java
@@ -119,6 +119,8 @@
* separate upload.
*
{@value #INDEXING_SERVICE_REQUEST_MODE} - Specifies the default request mode for index and
* delete item requests
+ * {@value #INDEXING_SERVICE_DEFAULT_QUEUE} - Specifies the default queue for index and
+ * push item requests
*
*/
public class IndexingServiceImpl extends BaseApiService implements IndexingService {
@@ -130,6 +132,7 @@ public class IndexingServiceImpl extends BaseApiService implements
public static final String CONNECTOR_ID = "api.connectorId";
public static final String UPLOAD_THRESHOLD_BYTES = "api.contentUploadThresholdBytes";
public static final String INDEXING_SERVICE_REQUEST_MODE = "api.defaultRequestMode";
+ public static final String INDEXING_SERVICE_DEFAULT_QUEUE = "api.defaultQueue";
public static final String REQUEST_CONNECT_TIMEOUT = "indexingService.connectTimeoutSeconds";
public static final String REQUEST_READ_TIMEOUT = "indexingService.readTimeoutSeconds";
public static final String ENABLE_API_DEBUGGING = "indexingService.enableDebugging";
@@ -165,6 +168,7 @@ public class IndexingServiceImpl extends BaseApiService implements
private final VersionProvider versionProvider;
private final QuotaServer quotaServer;
private final RequestMode requestMode;
+ private final String defaultQueue;
private final boolean enableApiDebugging;
private final boolean allowUnknownGsuitePrincipals;
@@ -245,6 +249,7 @@ private IndexingServiceImpl(Builder builder) {
this.versionProvider = builder.versionProvider;
this.quotaServer = builder.quotaServer;
this.requestMode = builder.requestMode;
+ this.defaultQueue = builder.defaultQueue;
this.enableApiDebugging = builder.enableApiDebugging;
this.allowUnknownGsuitePrincipals = builder.allowUnknownGsuitePrincipals;
}
@@ -262,6 +267,7 @@ public static class Builder extends BaseApiService.AbstractBuilder quotaServer =
new QuotaServer.Builder<>(Operations.class).build();
private RequestMode requestMode = DEFAULT_REQUEST_MODE;
+ private String defaultQueue;
private int contentUploadConnectTimeoutSeconds = DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int contentUploadReadTimeoutSeconds = DEFAULT_READ_TIMEOUT_SECONDS;
private boolean enableApiDebugging;
@@ -292,6 +298,11 @@ public Builder setRequestMode(RequestMode requestMode) {
return this;
}
+ public Builder setDefaultQueue(String defaultQueue) {
+ this.defaultQueue = defaultQueue;
+ return this;
+ }
+
public Builder setConnectorId(String connectorId) {
this.connectorId = connectorId;
return this;
@@ -408,6 +419,7 @@ public static IndexingServiceImpl.Builder fromConfiguration(
} catch (IllegalArgumentException e) {
throw new InvalidConfigurationException("Unable to parse configured request mode", e);
}
+ String defaultQueue = Configuration.getString(INDEXING_SERVICE_DEFAULT_QUEUE, "").get();
int connectTimeoutSeconds =
Configuration.getInteger(REQUEST_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT_SECONDS).get();
Configuration.checkConfiguration(
@@ -440,6 +452,7 @@ public static IndexingServiceImpl.Builder fromConfiguration(
.setBatchPolicy(BatchPolicy.fromConfiguration())
.setRetryPolicy(RetryPolicy.fromConfiguration())
.setRequestMode(requestModeConfigured)
+ .setDefaultQueue(defaultQueue)
.setConnectorId(Configuration.getString("api.connectorId", defaultConnectorName).get())
.setRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds)
.setContentUploadRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds)
@@ -712,6 +725,7 @@ public ListenableFuture indexItem(Item item, RequestMode requestMode)
if (item.decodeVersion() == null) {
item.encodeVersion(versionProvider.getVersion());
}
+ item.setQueue(getQueue(item.getQueue()));
Index updateRequest =
service
.indexing()
@@ -916,6 +930,7 @@ public ListenableFuture- push(String id, PushItem pushItem) throws IOExcept
checkArgument(!Strings.isNullOrEmpty(id), "id can not be null or empty");
checkArgument(pushItem != null, "Push item cannot be null.");
String resourceName = getItemResourceName(id);
+ pushItem.setQueue(getQueue(pushItem.getQueue()));
Push request =
this.service
.indexing()
@@ -981,6 +996,10 @@ private String getRequestMode(RequestMode requestMode) {
return requestMode == RequestMode.UNSPECIFIED ? this.requestMode.name() : requestMode.name();
}
+ private String getQueue(String queue) {
+ return (queue == null || queue.isEmpty()) ? this.defaultQueue : queue;
+ }
+
/**
* Performs an {@link IndexingServiceImpl#executeRequest} but tracks "not found" exceptions to
* convert them to a {@code null} return value.
diff --git a/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java b/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java
index d25153e..98ce363 100644
--- a/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java
+++ b/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java
@@ -149,10 +149,10 @@ public class IndexingServiceTest {
@Before
public void createService() throws IOException, GeneralSecurityException {
- createService(false, false);
+ createService(false, false, null);
}
- private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals)
+ private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals, String defaultQueue)
throws IOException, GeneralSecurityException {
this.transport = new TestingHttpTransport("datasources/source/connectors/unitTest");
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
@@ -191,6 +191,7 @@ private void createService(boolean enableDebugging, boolean allowUnknownGsuitePr
.setConnectorId("unitTest")
.setEnableDebugging(enableDebugging)
.setAllowUnknownGsuitePrincipals(allowUnknownGsuitePrincipals)
+ .setDefaultQueue(defaultQueue)
.build();
this.indexingService.startAsync().awaitRunning();
}
@@ -564,7 +565,7 @@ public void testUpdateItem() throws IOException, InterruptedException {
@Test
public void testUpdateItemDebugOptionsEnabled() throws Exception {
- createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false);
+ createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ null);
doAnswer(
invocation -> {
Items.Index updateRequest = invocation.getArgument(0);
@@ -582,7 +583,7 @@ public void testUpdateItemDebugOptionsEnabled() throws Exception {
@Test
public void testUpdateItemAllowUnknownGsuitePrincipals() throws Exception {
- createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true);
+ createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true, /*defaultQueue*/ null);
doAnswer(
invocation -> {
Items.Index updateRequest = invocation.getArgument(0);
@@ -731,6 +732,19 @@ public void testUpdateItemWithContentHash() throws IOException {
verify(quotaServer, times(1)).acquire(Operations.DEFAULT);
}
+ @Test
+ public void testUpdateItemWithDefaultQueue() throws GeneralSecurityException, IOException {
+ createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue");
+ Item item = new Item().setName(GOOD_ID);
+ ByteArrayContent content = ByteArrayContent.fromString("text/plain", "");
+ this.indexingService.indexItemAndContent(
+ item, content, null, ContentFormat.TEXT, RequestMode.ASYNCHRONOUS);
+ assertEquals(
+ "specialqueue",
+ item.getQueue());
+ verify(quotaServer, times(1)).acquire(Operations.DEFAULT);
+ }
+
@Test
public void testUpdateItemError() throws IOException, InterruptedException {
doAnswer(
@@ -1044,6 +1058,17 @@ public void testPushItemNull() throws IOException {
this.indexingService.push(GOOD_ID, null);
}
+ @Test
+ public void testPushItemWithDefaultQueue() throws GeneralSecurityException, IOException {
+ createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue");
+ this.transport.addPushItemReqResp(GOOD_ID, SOURCE_ID, new Item());
+ PushItem pushItem = new PushItem();
+ this.indexingService.push(GOOD_ID, pushItem);
+ assertEquals(
+ "specialqueue",
+ pushItem.getQueue());
+ }
+
/* unreserve */
@Test
public void testUnreserveItem() throws IOException {