Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.checkMessageAttributes;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.embedS3PointerInReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.extractMessageFromSnsJson;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getMessagePointerFromModifiedReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getOrigReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getReservedAttributeNameIfPresent;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isLarge;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isS3ReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageAttributePayloadSize;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageInSnsJson;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -211,15 +213,26 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
for (Message message : messages) {
Message.Builder messageBuilder = message.toBuilder();

final String originalBody = message.body();
String effectiveBody = originalBody;
if (clientConfiguration.isPayloadSupportFromSnsEnabled()) {
effectiveBody = extractMessageFromSnsJson(originalBody);
}

final Message messageToProcess = messageBuilder.body(effectiveBody).build();

// For each received message check if they are stored in S3.
Optional<String> largePayloadAttributeName = getReservedAttributeNameIfPresent(
message.messageAttributes());
messageToProcess.messageAttributes());
if (!largePayloadAttributeName.isPresent()) {
// Not S3
// If it was SNS, the builder already has effectiveBody, but we want to return originalBody
// if it's not a large payload, to preserve the envelope.
messageBuilder.body(originalBody);
modifiedMessageFutures.add(CompletableFuture.completedFuture(messageBuilder.build()));
} else {
// In S3
final String largeMessagePointer = message.body()
final String largeMessagePointer = messageToProcess.body()
.replace("com.amazon.sqs.javamessaging.MessageS3Pointer",
"software.amazon.payloadoffloading.PayloadS3Pointer");

Expand All @@ -234,7 +247,7 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest
.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.receiptHandle(messageToProcess.receiptHandle())
.build();

deleteMessage(deleteMessageRequest).join();
Expand All @@ -248,18 +261,22 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
}

// Set original payload
messageBuilder.body(originalPayload);
if (clientConfiguration.isPayloadSupportFromSnsEnabled()) {
messageBuilder.body(updateMessageInSnsJson(originalBody, originalPayload));
} else {
messageBuilder.body(originalPayload);
}

// Remove the additional attribute before returning the message
// to user.
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(
message.messageAttributes());
messageToProcess.messageAttributes());
messageAttributes.keySet().removeAll(AmazonSQSExtendedClientUtil.RESERVED_ATTRIBUTE_NAMES);
messageBuilder.messageAttributes(messageAttributes);

// Embed s3 object pointer in the receipt handle.
String modifiedReceiptHandle = embedS3PointerInReceiptHandle(
message.receiptHandle(),
messageToProcess.receiptHandle(),
largeMessagePointer);
messageBuilder.receiptHandle(modifiedReceiptHandle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.checkMessageAttributes;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.embedS3PointerInReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.extractMessageFromSnsJson;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getMessagePointerFromModifiedReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getOrigReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getReservedAttributeNameIfPresent;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isLarge;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isS3ReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageAttributePayloadSize;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageInSnsJson;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -335,14 +337,25 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag
for (Message message : messages) {
Message.Builder messageBuilder = message.toBuilder();

String originalBody = message.body();
String effectiveBody = originalBody;
if (clientConfiguration.isPayloadSupportFromSnsEnabled()) {
effectiveBody = extractMessageFromSnsJson(originalBody);
}

// for each received message check if they are stored in S3.
Optional<String> largePayloadAttributeName = getReservedAttributeNameIfPresent(message.messageAttributes());
if (largePayloadAttributeName.isPresent()) {
String largeMessagePointer = message.body();
String largeMessagePointer = effectiveBody;
largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer");

try {
messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer));
String resolvedPayload = payloadStore.getOriginalPayload(largeMessagePointer);
if (clientConfiguration.isPayloadSupportFromSnsEnabled()) {
messageBuilder.body(updateMessageInSnsJson(originalBody, resolvedPayload));
} else {
messageBuilder.body(resolvedPayload);
}
} catch (SdkException e) {
if (e.getCause() instanceof NoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.amazon.sqs.javamessaging;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -21,6 +24,7 @@

public class AmazonSQSExtendedClientUtil {
private static final Log LOG = LogFactory.getLog(AmazonSQSExtendedClientUtil.class);
private static final ObjectMapper MAPPER = new ObjectMapper();

public static final String LEGACY_RESERVED_ATTRIBUTE_NAME = "SQSLargePayloadSize";
public static final List<String> RESERVED_ATTRIBUTE_NAMES = Arrays.asList(LEGACY_RESERVED_ATTRIBUTE_NAME,
Expand Down Expand Up @@ -138,6 +142,32 @@ public static <T extends AwsRequest.Builder> T appendUserAgent(
.build());
}

public static String extractMessageFromSnsJson(String snsJson) {
try {
JsonNode rootNode = MAPPER.readTree(snsJson);
if (rootNode.has("Message")) {
return rootNode.get("Message").asText();
}
} catch (Exception e) {
LOG.warn("Failed to parse SNS JSON message body", e);
}
return snsJson;
}

public static String updateMessageInSnsJson(String snsJson, String newMessage) {
try {
JsonNode rootNode = MAPPER.readTree(snsJson);
if (rootNode.isObject() && rootNode.has("Message")) {
ObjectNode objectNode = (ObjectNode) rootNode;
objectNode.put("Message", newMessage);
return MAPPER.writeValueAsString(objectNode);
}
} catch (Exception e) {
LOG.warn("Failed to update SNS JSON message body", e);
}
return newMessage;
}

private static String getFromReceiptHandleByMarker(String receiptHandle, String marker) {
int firstOccurence = receiptHandle.indexOf(marker);
int secondOccurence = receiptHandle.indexOf(marker, firstOccurence + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ExtendedAsyncClientConfiguration extends PayloadStorageAsyncConfigu
private boolean cleanupS3Payload = true;
private boolean useLegacyReservedAttributeName = true;
private boolean ignorePayloadNotFound = false;
private boolean payloadSupportFromSnsEnabled = false;
private String s3KeyPrefix = "";

public ExtendedAsyncClientConfiguration() {
Expand All @@ -28,6 +29,7 @@ public ExtendedAsyncClientConfiguration(ExtendedAsyncClientConfiguration other)
this.cleanupS3Payload = other.doesCleanupS3Payload();
this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName();
this.ignorePayloadNotFound = other.ignoresPayloadNotFound();
this.payloadSupportFromSnsEnabled = other.isPayloadSupportFromSnsEnabled();
this.s3KeyPrefix = other.s3KeyPrefix;
}

Expand Down Expand Up @@ -183,6 +185,43 @@ public boolean ignoresPayloadNotFound() {
return ignorePayloadNotFound;
}

/**
* Sets whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @param payloadSupportFromSnsEnabled
* Whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
*/
public void setPayloadSupportFromSnsEnabled(boolean payloadSupportFromSnsEnabled) {
this.payloadSupportFromSnsEnabled = payloadSupportFromSnsEnabled;
}

/**
* Sets whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @param payloadSupportFromSnsEnabled
* Whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
* @return the updated ExtendedAsyncClientConfiguration object.
*/
public ExtendedAsyncClientConfiguration withPayloadSupportFromSnsEnabled(boolean payloadSupportFromSnsEnabled) {
setPayloadSupportFromSnsEnabled(payloadSupportFromSnsEnabled);
return this;
}

/**
* Checks whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @return True if the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
*/
public boolean isPayloadSupportFromSnsEnabled() {
return payloadSupportFromSnsEnabled;
}

@Override
public ExtendedAsyncClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) {
setAlwaysThroughS3(alwaysThroughS3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ExtendedClientConfiguration extends PayloadStorageConfiguration {
private boolean cleanupS3Payload = true;
private boolean useLegacyReservedAttributeName = true;
private boolean ignorePayloadNotFound = false;
private boolean payloadSupportFromSnsEnabled = false;
private String s3KeyPrefix = "";

public ExtendedClientConfiguration() {
Expand All @@ -47,6 +48,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
this.cleanupS3Payload = other.doesCleanupS3Payload();
this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName();
this.ignorePayloadNotFound = other.ignoresPayloadNotFound();
this.payloadSupportFromSnsEnabled = other.isPayloadSupportFromSnsEnabled();
this.s3KeyPrefix = other.s3KeyPrefix;
}

Expand Down Expand Up @@ -196,6 +198,43 @@ public boolean ignoresPayloadNotFound() {
return ignorePayloadNotFound;
}

/**
* Sets whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @param payloadSupportFromSnsEnabled
* Whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
*/
public void setPayloadSupportFromSnsEnabled(boolean payloadSupportFromSnsEnabled) {
this.payloadSupportFromSnsEnabled = payloadSupportFromSnsEnabled;
}

/**
* Sets whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @param payloadSupportFromSnsEnabled
* Whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
* @return the updated ExtendedClientConfiguration object.
*/
public ExtendedClientConfiguration withPayloadSupportFromSnsEnabled(boolean payloadSupportFromSnsEnabled) {
setPayloadSupportFromSnsEnabled(payloadSupportFromSnsEnabled);
return this;
}

/**
* Checks whether or not the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message.
*
* @return True if the client should attempt to find the message body
* in the "Message" field of a JSON-formatted SNS message. Default: false
*/
public boolean isPayloadSupportFromSnsEnabled() {
return payloadSupportFromSnsEnabled;
}

@Override
public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) {
setAlwaysThroughS3(alwaysThroughS3);
Expand Down
Loading