From a7b72e5469153be18066beea2caaa569f56975f4 Mon Sep 17 00:00:00 2001 From: Chad Johnson Date: Tue, 14 May 2019 18:19:34 -0600 Subject: [PATCH 1/2] api.defaultQueue will set a default queue value on push and index requests if they don't already have a value for queue. --- .../sdk/indexing/IndexingServiceImpl.java | 19 +++++++++++ .../sdk/indexing/IndexingServiceTest.java | 33 ++++++++++++++++--- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java index 7350644..575437e 100644 --- a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java +++ b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java @@ -119,6 +119,8 @@ * separate upload. *
  • {@value #INDEXING_SERVICE_REQUEST_MODE} - Specifies the default request mode for index and * delete item requests + *
  • {@value #INDEXING_SERVICE_DEFAULT_QUEUE} - Specifies the default queue for index and + * push item requests * */ public class IndexingServiceImpl extends BaseApiService implements IndexingService { @@ -130,6 +132,7 @@ public class IndexingServiceImpl extends BaseApiService implements public static final String CONNECTOR_ID = "api.connectorId"; public static final String UPLOAD_THRESHOLD_BYTES = "api.contentUploadThresholdBytes"; public static final String INDEXING_SERVICE_REQUEST_MODE = "api.defaultRequestMode"; + public static final String INDEXING_SERVICE_DEFAULT_QUEUE = "api.defaultQueue"; public static final String REQUEST_CONNECT_TIMEOUT = "indexingService.connectTimeoutSeconds"; public static final String REQUEST_READ_TIMEOUT = "indexingService.readTimeoutSeconds"; public static final String ENABLE_API_DEBUGGING = "indexingService.enableDebugging"; @@ -165,6 +168,7 @@ public class IndexingServiceImpl extends BaseApiService implements private final VersionProvider versionProvider; private final QuotaServer quotaServer; private final RequestMode requestMode; + private final String defaultQueue; private final boolean enableApiDebugging; private final boolean allowUnknownGsuitePrincipals; @@ -245,6 +249,7 @@ private IndexingServiceImpl(Builder builder) { this.versionProvider = builder.versionProvider; this.quotaServer = builder.quotaServer; this.requestMode = builder.requestMode; + this.defaultQueue = builder.defaultQueue; this.enableApiDebugging = builder.enableApiDebugging; this.allowUnknownGsuitePrincipals = builder.allowUnknownGsuitePrincipals; } @@ -262,6 +267,7 @@ public static class Builder extends BaseApiService.AbstractBuilder quotaServer = new QuotaServer.Builder<>(Operations.class).build(); private RequestMode requestMode = DEFAULT_REQUEST_MODE; + private String defaultQueue; private int contentUploadConnectTimeoutSeconds = DEFAULT_CONNECT_TIMEOUT_SECONDS; private int contentUploadReadTimeoutSeconds = DEFAULT_READ_TIMEOUT_SECONDS; private boolean enableApiDebugging; @@ -292,6 +298,11 @@ public Builder setRequestMode(RequestMode requestMode) { return this; } + public Builder setDefaultQueue(String defaultQueue) { + this.defaultQueue = defaultQueue; + return this; + } + public Builder setConnectorId(String connectorId) { this.connectorId = connectorId; return this; @@ -408,6 +419,7 @@ public static IndexingServiceImpl.Builder fromConfiguration( } catch (IllegalArgumentException e) { throw new InvalidConfigurationException("Unable to parse configured request mode", e); } + String defaultQueue = Configuration.getString(INDEXING_SERVICE_DEFAULT_QUEUE, "").get(); int connectTimeoutSeconds = Configuration.getInteger(REQUEST_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT_SECONDS).get(); Configuration.checkConfiguration( @@ -440,6 +452,7 @@ public static IndexingServiceImpl.Builder fromConfiguration( .setBatchPolicy(BatchPolicy.fromConfiguration()) .setRetryPolicy(RetryPolicy.fromConfiguration()) .setRequestMode(requestModeConfigured) + .setDefaultQueue(defaultQueue) .setConnectorId(Configuration.getString("api.connectorId", defaultConnectorName).get()) .setRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds) .setContentUploadRequestTimeout(connectTimeoutSeconds, readTimeoutSeconds) @@ -712,6 +725,7 @@ public ListenableFuture indexItem(Item item, RequestMode requestMode) if (item.decodeVersion() == null) { item.encodeVersion(versionProvider.getVersion()); } + item.setQueue(getQueue(item.getQueue())); Index updateRequest = service .indexing() @@ -916,6 +930,7 @@ public ListenableFuture push(String id, PushItem pushItem) throws IOExcept checkArgument(!Strings.isNullOrEmpty(id), "id can not be null or empty"); checkArgument(pushItem != null, "Push item cannot be null."); String resourceName = getItemResourceName(id); + pushItem.setQueue(getQueue(pushItem.getQueue())); Push request = this.service .indexing() @@ -981,6 +996,10 @@ private String getRequestMode(RequestMode requestMode) { return requestMode == RequestMode.UNSPECIFIED ? this.requestMode.name() : requestMode.name(); } + private String getQueue(String queue) { + return queue == null ? this.defaultQueue : queue; + } + /** * Performs an {@link IndexingServiceImpl#executeRequest} but tracks "not found" exceptions to * convert them to a {@code null} return value. diff --git a/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java b/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java index d25153e..98ce363 100644 --- a/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java +++ b/indexing/src/test/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceTest.java @@ -149,10 +149,10 @@ public class IndexingServiceTest { @Before public void createService() throws IOException, GeneralSecurityException { - createService(false, false); + createService(false, false, null); } - private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals) + private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals, String defaultQueue) throws IOException, GeneralSecurityException { this.transport = new TestingHttpTransport("datasources/source/connectors/unitTest"); JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); @@ -191,6 +191,7 @@ private void createService(boolean enableDebugging, boolean allowUnknownGsuitePr .setConnectorId("unitTest") .setEnableDebugging(enableDebugging) .setAllowUnknownGsuitePrincipals(allowUnknownGsuitePrincipals) + .setDefaultQueue(defaultQueue) .build(); this.indexingService.startAsync().awaitRunning(); } @@ -564,7 +565,7 @@ public void testUpdateItem() throws IOException, InterruptedException { @Test public void testUpdateItemDebugOptionsEnabled() throws Exception { - createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false); + createService(/*debugging*/ true, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ null); doAnswer( invocation -> { Items.Index updateRequest = invocation.getArgument(0); @@ -582,7 +583,7 @@ public void testUpdateItemDebugOptionsEnabled() throws Exception { @Test public void testUpdateItemAllowUnknownGsuitePrincipals() throws Exception { - createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true); + createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ true, /*defaultQueue*/ null); doAnswer( invocation -> { Items.Index updateRequest = invocation.getArgument(0); @@ -731,6 +732,19 @@ public void testUpdateItemWithContentHash() throws IOException { verify(quotaServer, times(1)).acquire(Operations.DEFAULT); } + @Test + public void testUpdateItemWithDefaultQueue() throws GeneralSecurityException, IOException { + createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue"); + Item item = new Item().setName(GOOD_ID); + ByteArrayContent content = ByteArrayContent.fromString("text/plain", ""); + this.indexingService.indexItemAndContent( + item, content, null, ContentFormat.TEXT, RequestMode.ASYNCHRONOUS); + assertEquals( + "specialqueue", + item.getQueue()); + verify(quotaServer, times(1)).acquire(Operations.DEFAULT); + } + @Test public void testUpdateItemError() throws IOException, InterruptedException { doAnswer( @@ -1044,6 +1058,17 @@ public void testPushItemNull() throws IOException { this.indexingService.push(GOOD_ID, null); } + @Test + public void testPushItemWithDefaultQueue() throws GeneralSecurityException, IOException { + createService(/*debugging*/ false, /*allowUnknownGsuitePrincipals*/ false, /*defaultQueue*/ "specialqueue"); + this.transport.addPushItemReqResp(GOOD_ID, SOURCE_ID, new Item()); + PushItem pushItem = new PushItem(); + this.indexingService.push(GOOD_ID, pushItem); + assertEquals( + "specialqueue", + pushItem.getQueue()); + } + /* unreserve */ @Test public void testUnreserveItem() throws IOException { From 5cf0cbe2bbc5703e157324bab2684f4ccd498699 Mon Sep 17 00:00:00 2001 From: Chad Johnson Date: Mon, 17 Jun 2019 08:16:39 -0600 Subject: [PATCH 2/2] Check for empty queue --- .../cloudsearch/sdk/indexing/IndexingServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java index 575437e..a59b8fe 100644 --- a/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java +++ b/indexing/src/main/java/com/google/enterprise/cloudsearch/sdk/indexing/IndexingServiceImpl.java @@ -997,7 +997,7 @@ private String getRequestMode(RequestMode requestMode) { } private String getQueue(String queue) { - return queue == null ? this.defaultQueue : queue; + return (queue == null || queue.isEmpty()) ? this.defaultQueue : queue; } /**