From 40058c804124672afcdc6b0fb3beac91cc01572c Mon Sep 17 00:00:00 2001 From: pkelly-nifi Date: Tue, 24 Feb 2026 15:09:58 -0500 Subject: [PATCH 1/3] NIFI-7307 Adding support for setting and retrieving Azure Blob Storage metadata and tags --- .../GetAzureBlobStorageMetadata_v12.java | 131 ++++++++++++++++++ .../storage/GetAzureBlobStorageTags_v12.java | 130 +++++++++++++++++ .../storage/PutAzureBlobStorage_v12.java | 95 ++++++++++++- .../azure/storage/utils/BlobAttributes.java | 4 + .../org.apache.nifi.processor.Processor | 2 + 5 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java create mode 100644 nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java new file mode 100644 index 000000000000..75fc259cbeb6 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; + +@Tags({"azure", "microsoft", "cloud", "storage", "blob"}) +@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, + CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageTags_v12.class }) +@CapabilityDescription("Retrieves user metadata and/or tags from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") +@InputRequirement(Requirement.INPUT_REQUIRED) +public class GetAzureBlobStorageMetadata_v12 extends AbstractAzureBlobProcessor_v12 { + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) + .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) + .build(); + + public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) + .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) + .build(); + + private static final List PROPERTIES = List.of( + BLOB_STORAGE_CREDENTIALS_SERVICE, + CONTAINER, + BLOB_NAME, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE + ); + + static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("A blob with the supplied name was found in the container") + .build(); + + static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No blob was found with the supplied name in the container") + .build(); + + private static final Set relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); + + private static final String ATTRIBUTE_FORMAT_USER_METADATA = "azure.user.metadata.%s"; + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final Map newAttributes = new HashMap<>(); + + try { + BlobServiceClient storageClient = getStorageClient(context, flowFile); + BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); + BlobClient blobClient = containerClient.getBlobClient(blobName); + + BlobProperties blobProperties = blobClient.getProperties(); + blobProperties.getMetadata().forEach((key, value) -> { + newAttributes.put(ATTRIBUTE_FORMAT_USER_METADATA.formatted(key), value); + }); + + flowFile = session.putAllAttributes(flowFile, newAttributes); + session.transfer(flowFile, REL_FOUND); + } catch (BlobStorageException e) { + if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + getLogger().warn("Specified blob ({}) does not exist, routing to not found.", blobName); + session.transfer(flowFile, REL_NOT_FOUND); + } else { + getLogger().error("Failed to retrieve metadata for the specified blob ({}) from Azure Blob Storage. Routing to failure", blobName, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + } +} + diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java new file mode 100644 index 000000000000..be0119f3fe87 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; + +@Tags({"azure", "microsoft", "cloud", "storage", "blob"}) +@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, + CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageMetadata_v12.class }) +@CapabilityDescription("Retrieves user metadata and/or tags from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") +@InputRequirement(Requirement.INPUT_REQUIRED) +public class GetAzureBlobStorageTags_v12 extends AbstractAzureBlobProcessor_v12 { + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) + .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) + .build(); + + public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) + .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) + .build(); + + private static final List PROPERTIES = List.of( + BLOB_STORAGE_CREDENTIALS_SERVICE, + CONTAINER, + BLOB_NAME, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE + ); + + static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("A blob with the supplied name was found in the container") + .build(); + + static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No blob was found with the supplied name in the container") + .build(); + + private static final Set relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); + + private static final String ATTRIBUTE_FORMAT_TAG = "azure.tag.%s"; + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final Map newAttributes = new HashMap<>(); + + try { + BlobServiceClient storageClient = getStorageClient(context, flowFile); + BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); + BlobClient blobClient = containerClient.getBlobClient(blobName); + + Map tags = blobClient.getTags(); + tags.forEach((key, value) -> { + newAttributes.put(ATTRIBUTE_FORMAT_TAG.formatted(key), value); + }); + + flowFile = session.putAllAttributes(flowFile, newAttributes); + session.transfer(flowFile, REL_FOUND); + } catch (BlobStorageException e) { + if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + getLogger().warn("Specified blob ({}) does not exist, routing to not found.", blobName); + session.transfer(flowFile, REL_NOT_FOUND); + } else { + getLogger().error("Failed to retrieve metadata for the specified blob ({}) from Azure Blob Storage. Routing to failure", blobName, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + } +} + diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index 50c0a4d9d9f5..262ab57d71ec 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlockBlobItem; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.blob.options.BlobParallelUploadOptions; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -36,15 +37,18 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.fileresource.service.api.FileResource; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.ClientSideEncryptionSupport; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; @@ -76,6 +80,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_MIME_TYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_TIMESTAMP; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_USER_METADATA; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; @@ -87,14 +92,19 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_MIME_TYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_USER_METADATA; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @Tags({"azure", "microsoft", "cloud", "storage", "blob"}) @SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, - CopyAzureBlobStorage_v12.class}) + CopyAzureBlobStorage_v12.class, GetAzureBlobStorageMetadata_v12.class, GetAzureBlobStorageTags_v12.class}) @CapabilityDescription("Puts content into a blob on Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the blob", + value = "The value of a User-Defined Metadata field to add to the blob", + description = "Allows user-defined metadata to be added to the blob as key/value pairs", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER), @WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME), @@ -106,9 +116,29 @@ @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH), @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE), - @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)}) + @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED), + @WritesAttribute(attribute = ATTR_NAME_USER_METADATA, description = ATTR_DESCRIPTION_USER_METADATA)}) public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport { + public static final PropertyDescriptor BLOB_TAG_PREFIX = new PropertyDescriptor.Builder() + .name("Blob Tag Prefix") + .description("Specifies the prefix which would be scanned against the incoming FlowFile's attributes and the matching attribute's " + + "name and value would be considered as the outgoing Azure blob's Tag name and Tag value respectively. For Ex: If the " + + "incoming FlowFile carries the attributes tagAzurecountry, tagAzurePII, the tag prefix to be specified would be 'tagAzure'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor REMOVE_TAG_PREFIX = new PropertyDescriptor.Builder() + .name("Remove Tag Prefix") + .description("If set to 'True', the value provided for '" + BLOB_TAG_PREFIX.getDisplayName() + "' will be removed from " + + "the attribute(s) and then considered as the Tag name. For ex: If the incoming FlowFile carries the attributes tagAzurecountry, " + + "tagAzurePII and the prefix is set to 'tagAzure' then the corresponding tag values would be 'country' and 'PII'") + .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False")) + .defaultValue("false") + .build(); + private static final List PROPERTY_DESCRIPTORS = List.of( BLOB_STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.CONTAINER, @@ -116,6 +146,8 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl AzureStorageUtils.CONFLICT_RESOLUTION, BLOB_NAME, CONTENT_MD5, + BLOB_TAG_PREFIX, + REMOVE_TAG_PREFIX, RESOURCE_TRANSFER_SOURCE, FILE_RESOURCE_SERVICE, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, @@ -124,6 +156,16 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl CSE_LOCAL_KEY ); + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + @Override protected Collection customValidate(ValidationContext validationContext) { final List results = new ArrayList<>(super.customValidate(validationContext)); @@ -187,6 +229,14 @@ public void onTrigger(final ProcessContext context, final ProcessSession session blobParallelUploadOptions.setParallelTransferOptions(parallelTransferOptions); blobParallelUploadOptions.setRequestConditions(blobRequestConditions); + if (context.getProperty(BLOB_TAG_PREFIX).isSet()) { + blobParallelUploadOptions.setTags(getObjectTags(context, flowFile)); + } + final Map userMetadata = getUserMetadata(context, flowFile); + if (!userMetadata.isEmpty()) { + blobParallelUploadOptions.setMetadata(userMetadata); + } + final String contentMd5 = context.getProperty(CONTENT_MD5).evaluateAttributeExpressions(sourceFlowFile).getValue(); if (contentMd5 != null) { final byte[] md5Bytes = convertMd5ToBytes(contentMd5); @@ -201,6 +251,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (ignore) { attributes.put(ATTR_NAME_IGNORED, "false"); } + if (!userMetadata.isEmpty()) { + StringBuilder userMetaBldr = new StringBuilder(); + for (String userKey : userMetadata.keySet()) { + userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); + } + attributes.put(ATTR_NAME_USER_METADATA, userMetaBldr.toString()); + } } } catch (BlobStorageException e) { final BlobErrorCode errorCode = e.getErrorCode(); @@ -245,4 +302,38 @@ private static void applyUploadResultAttributes(final Map attrib attributes.put(ATTR_NAME_LANG, null); attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM); } + + private Map getObjectTags(ProcessContext context, FlowFile flowFile) { + final String prefix = context.getProperty(BLOB_TAG_PREFIX).evaluateAttributeExpressions(flowFile).getValue(); + final Map objectTags = new HashMap<>(); + final Map attributesMap = flowFile.getAttributes(); + + attributesMap.entrySet().stream() + .filter(attribute -> attribute.getKey().startsWith(prefix)) + .forEach(attribute -> { + String tagKey = attribute.getKey(); + String tagValue = attribute.getValue(); + + if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) { + tagKey = tagKey.replace(prefix, ""); + } + objectTags.put(tagKey, tagValue); + }); + + return objectTags; + } + + private Map getUserMetadata(ProcessContext context, FlowFile flowFile) { + final Map userMetadata = new HashMap<>(); + + for (final Map.Entry entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty( + entry.getKey()).evaluateAttributeExpressions(flowFile).getValue(); + userMetadata.put(entry.getKey().getName(), value); + } + } + + return userMetadata; + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java index 7ce30c1da7ce..7d64406f5872 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java @@ -51,4 +51,8 @@ public final class BlobAttributes { public static final String ATTR_NAME_IGNORED = "azure.ignored"; public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict Resolution Strategy is 'ignore', " + "this property will be true/false depending on whether the blob was ignored."; + + public static final String ATTR_NAME_USER_METADATA = "azure.usermetadata"; + public static final String ATTR_DESCRIPTION_USER_METADATA = "A human-readable form of the User Metadata of " + + "the blob, if any was set"; } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index f7ef2f370705..6e10aea3f6ee 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -27,6 +27,8 @@ org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.CopyAzureBlobStorage_v12 +org.apache.nifi.processors.azure.storage.GetAzureBlobStorageMetadata_v12 +org.apache.nifi.processors.azure.storage.GetAzureBlobStorageTags_v12 org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage_v12 org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage_v12 From 1228a2e2f14fe8e3eb244d1bbbfb7ccbb2f99ef0 Mon Sep 17 00:00:00 2001 From: pkelly-nifi Date: Tue, 3 Mar 2026 06:31:02 -0500 Subject: [PATCH 2/3] Updates for review feedback and abstract logic for retrieving properties --- ...ureBlobStoragePropertiesProcessor_v12.java | 122 ++++++++++++++++++ .../GetAzureBlobStorageMetadata_v12.java | 106 ++------------- .../storage/GetAzureBlobStorageTags_v12.java | 105 ++------------- .../storage/PutAzureBlobStorage_v12.java | 15 ++- 4 files changed, 154 insertions(+), 194 deletions(-) create mode 100644 nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractGetAzureBlobStoragePropertiesProcessor_v12.java diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractGetAzureBlobStoragePropertiesProcessor_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractGetAzureBlobStoragePropertiesProcessor_v12.java new file mode 100644 index 000000000000..c14c999b4abf --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractGetAzureBlobStoragePropertiesProcessor_v12.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; + +public abstract class AbstractGetAzureBlobStoragePropertiesProcessor_v12 extends AbstractAzureBlobProcessor_v12 { + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) + .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) + .build(); + + public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) + .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) + .build(); + + private static final List PROPERTIES = List.of( + BLOB_STORAGE_CREDENTIALS_SERVICE, + CONTAINER, + BLOB_NAME, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE + ); + + static final Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("A blob with the supplied name was found in the container") + .build(); + + static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No blob was found with the supplied name in the container") + .build(); + + private static final Set RELATIONSHIPS = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + protected abstract String getAttributePrefix(); + + protected abstract Map fetchProperties(BlobClient blobClient); + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + try { + final BlobServiceClient storageClient = getStorageClient(context, flowFile); + final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); + final BlobClient blobClient = containerClient.getBlobClient(blobName); + + final Map newAttributes = new HashMap<>(); + fetchProperties(blobClient).forEach((key, value) -> + newAttributes.put(getAttributePrefix().formatted(key), value) + ); + + flowFile = session.putAllAttributes(flowFile, newAttributes); + final String eventDetails = String.format("Attributes added [%s]", String.join(", ", newAttributes.keySet())); + session.getProvenanceReporter().modifyAttributes(flowFile, eventDetails); + session.transfer(flowFile, REL_FOUND); + } catch (final BlobStorageException e) { + if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + getLogger().warn("Specified blob ({}) does not exist, routing to not found.", blobName); + session.transfer(flowFile, REL_NOT_FOUND); + } else { + getLogger().error("Failed to retrieve properties for the specified blob ({}) from Azure Blob Storage. Routing to failure", blobName, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java index 75fc259cbeb6..fff5e569daf3 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageMetadata_v12.java @@ -17,115 +17,33 @@ package org.apache.nifi.processors.azure.storage; import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.models.BlobErrorCode; -import com.azure.storage.blob.models.BlobProperties; -import com.azure.storage.blob.models.BlobStorageException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; - -import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; -import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; -import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; @Tags({"azure", "microsoft", "cloud", "storage", "blob"}) -@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, - CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageTags_v12.class }) -@CapabilityDescription("Retrieves user metadata and/or tags from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") +@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, + CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageTags_v12.class}) +@CapabilityDescription("Retrieves user metadata from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") @InputRequirement(Requirement.INPUT_REQUIRED) -public class GetAzureBlobStorageMetadata_v12 extends AbstractAzureBlobProcessor_v12 { - - public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) - .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) - .build(); - - public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) - .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) - .build(); - - private static final List PROPERTIES = List.of( - BLOB_STORAGE_CREDENTIALS_SERVICE, - CONTAINER, - BLOB_NAME, - AzureStorageUtils.PROXY_CONFIGURATION_SERVICE - ); +@WritesAttributes({@WritesAttribute(attribute = "azure.user.metadata.", description = "The value of the retrieved metadata")}) +public class GetAzureBlobStorageMetadata_v12 extends AbstractGetAzureBlobStoragePropertiesProcessor_v12 { - static Relationship REL_FOUND = new Relationship.Builder() - .name("found") - .description("A blob with the supplied name was found in the container") - .build(); - - static Relationship REL_NOT_FOUND = new Relationship.Builder() - .name("not found") - .description("No blob was found with the supplied name in the container") - .build(); - - private static final Set relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); - - private static final String ATTRIBUTE_FORMAT_USER_METADATA = "azure.user.metadata.%s"; + private static final String ATTRIBUTE_PREFIX = "azure.user.metadata.%s"; @Override - public Set getRelationships() { - return relationships; + protected String getAttributePrefix() { + return ATTRIBUTE_PREFIX; } @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final Map newAttributes = new HashMap<>(); - - try { - BlobServiceClient storageClient = getStorageClient(context, flowFile); - BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); - BlobClient blobClient = containerClient.getBlobClient(blobName); - - BlobProperties blobProperties = blobClient.getProperties(); - blobProperties.getMetadata().forEach((key, value) -> { - newAttributes.put(ATTRIBUTE_FORMAT_USER_METADATA.formatted(key), value); - }); - - flowFile = session.putAllAttributes(flowFile, newAttributes); - session.transfer(flowFile, REL_FOUND); - } catch (BlobStorageException e) { - if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { - getLogger().warn("Specified blob ({}) does not exist, routing to not found.", blobName); - session.transfer(flowFile, REL_NOT_FOUND); - } else { - getLogger().error("Failed to retrieve metadata for the specified blob ({}) from Azure Blob Storage. Routing to failure", blobName, e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } + protected Map fetchProperties(final BlobClient blobClient) { + return blobClient.getProperties().getMetadata(); } } - diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java index be0119f3fe87..afe6c2462399 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/GetAzureBlobStorageTags_v12.java @@ -17,114 +17,33 @@ package org.apache.nifi.processors.azure.storage; import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.models.BlobErrorCode; -import com.azure.storage.blob.models.BlobStorageException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; - -import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; -import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; -import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; @Tags({"azure", "microsoft", "cloud", "storage", "blob"}) -@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, - CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageMetadata_v12.class }) -@CapabilityDescription("Retrieves user metadata and/or tags from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") +@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, + CopyAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, GetAzureBlobStorageMetadata_v12.class}) +@CapabilityDescription("Retrieves tags from the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") @InputRequirement(Requirement.INPUT_REQUIRED) -public class GetAzureBlobStorageTags_v12 extends AbstractAzureBlobProcessor_v12 { - - public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) - .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) - .build(); - - public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) - .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) - .build(); - - private static final List PROPERTIES = List.of( - BLOB_STORAGE_CREDENTIALS_SERVICE, - CONTAINER, - BLOB_NAME, - AzureStorageUtils.PROXY_CONFIGURATION_SERVICE - ); +@WritesAttributes({@WritesAttribute(attribute = "azure.tag.", description = "The value of the retrieved tag")}) +public class GetAzureBlobStorageTags_v12 extends AbstractGetAzureBlobStoragePropertiesProcessor_v12 { - static Relationship REL_FOUND = new Relationship.Builder() - .name("found") - .description("A blob with the supplied name was found in the container") - .build(); - - static Relationship REL_NOT_FOUND = new Relationship.Builder() - .name("not found") - .description("No blob was found with the supplied name in the container") - .build(); - - private static final Set relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); - - private static final String ATTRIBUTE_FORMAT_TAG = "azure.tag.%s"; + private static final String ATTRIBUTE_PREFIX = "azure.tag.%s"; @Override - public Set getRelationships() { - return relationships; + protected String getAttributePrefix() { + return ATTRIBUTE_PREFIX; } @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final Map newAttributes = new HashMap<>(); - - try { - BlobServiceClient storageClient = getStorageClient(context, flowFile); - BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); - BlobClient blobClient = containerClient.getBlobClient(blobName); - - Map tags = blobClient.getTags(); - tags.forEach((key, value) -> { - newAttributes.put(ATTRIBUTE_FORMAT_TAG.formatted(key), value); - }); - - flowFile = session.putAllAttributes(flowFile, newAttributes); - session.transfer(flowFile, REL_FOUND); - } catch (BlobStorageException e) { - if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { - getLogger().warn("Specified blob ({}) does not exist, routing to not found.", blobName); - session.transfer(flowFile, REL_NOT_FOUND); - } else { - getLogger().error("Failed to retrieve metadata for the specified blob ({}) from Azure Blob Storage. Routing to failure", blobName, e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } + protected Map fetchProperties(final BlobClient blobClient) { + return blobClient.getTags(); } } - diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index 262ab57d71ec..bc3997113dee 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlockBlobItem; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.blob.options.BlobParallelUploadOptions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -63,6 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM; import static com.azure.core.util.FluxUtil.toFluxByteBuffer; @@ -251,12 +253,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (ignore) { attributes.put(ATTR_NAME_IGNORED, "false"); } - if (!userMetadata.isEmpty()) { - StringBuilder userMetaBldr = new StringBuilder(); - for (String userKey : userMetadata.keySet()) { - userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); - } - attributes.put(ATTR_NAME_USER_METADATA, userMetaBldr.toString()); + final String userMetadataAttributeValue = userMetadata.entrySet().stream() + .map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining("\n")); + if (StringUtils.isNotBlank(userMetadataAttributeValue)) { + attributes.put(ATTR_NAME_USER_METADATA, userMetadataAttributeValue); } } } catch (BlobStorageException e) { @@ -315,7 +316,7 @@ private Map getObjectTags(ProcessContext context, FlowFile flowF String tagValue = attribute.getValue(); if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) { - tagKey = tagKey.replace(prefix, ""); + tagKey = tagKey.substring(prefix.length()); } objectTags.put(tagKey, tagValue); }); From 74369772077a92a88e91664075430fdbff10196f Mon Sep 17 00:00:00 2001 From: pkelly-nifi Date: Tue, 3 Mar 2026 14:46:48 -0500 Subject: [PATCH 3/3] Adding tests for Azure tags and metadata --- .../TestGetAzureBlobStorageMetadata_v12.java | 271 ++++++++++++++ .../TestGetAzureBlobStorageTags_v12.java | 237 ++++++++++++ .../storage/TestPutAzureBlobStorage_v12.java | 340 ++++++++++++++++++ 3 files changed, 848 insertions(+) create mode 100644 nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageMetadata_v12.java create mode 100644 nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageTags_v12.java diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageMetadata_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageMetadata_v12.java new file mode 100644 index 000000000000..4c61d02cd74c --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageMetadata_v12.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestGetAzureBlobStorageMetadata_v12 { + + private static final String CONTAINER_NAME = "test-container"; + private static final String BLOB_NAME = "test-blob"; + + private TestRunner runner; + private BlobServiceClient storageClient; + private BlobClient blobClient; + private BlobProperties blobProperties; + + @BeforeEach + void setUp() { + storageClient = mock(BlobServiceClient.class); + final BlobContainerClient containerClient = mock(BlobContainerClient.class); + blobClient = mock(BlobClient.class); + blobProperties = mock(BlobProperties.class); + + when(storageClient.getBlobContainerClient(CONTAINER_NAME)).thenReturn(containerClient); + when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient); + when(blobClient.getProperties()).thenReturn(blobProperties); + + final GetAzureBlobStorageMetadata_v12 processor = new GetAzureBlobStorageMetadata_v12() { + @Override + protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + return storageClient; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return super.getSupportedPropertyDescriptors().stream() + .filter(pd -> !pd.equals(BLOB_STORAGE_CREDENTIALS_SERVICE)) + .toList(); + } + }; + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER, CONTAINER_NAME); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME, BLOB_NAME); + } + + @Test + void testSuccessfulMetadataRetrieval() { + final Map metadata = Map.of( + "author", "jane-doe", + "source-system", "erp", + "processing-date", "2024-01-15" + ); + when(blobProperties.getMetadata()).thenReturn(metadata); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + + assertEquals("jane-doe", flowFile.getAttribute("azure.user.metadata.author")); + assertEquals("erp", flowFile.getAttribute("azure.user.metadata.source-system")); + assertEquals("2024-01-15", flowFile.getAttribute("azure.user.metadata.processing-date")); + } + + @Test + void testEmptyMetadataRetrieval() { + when(blobProperties.getMetadata()).thenReturn(Map.of()); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + + flowFile.getAttributes().forEach((key, value) -> + assertFalse(key.startsWith("azure.user.metadata."), + "No metadata attributes should exist when blob has none, found: " + key) + ); + } + + @Test + void testBlobNotFound() { + BlobStorageException exception = mockBlobStorageException(BlobErrorCode.BLOB_NOT_FOUND); + when(blobClient.getProperties()).thenThrow(exception); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_NOT_FOUND, 1); + } + + @Test + void testOtherBlobStorageExceptionRoutesToFailure() { + BlobStorageException exception = mockBlobStorageException(BlobErrorCode.AUTHORIZATION_FAILURE); + when(blobClient.getProperties()).thenThrow(exception); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE).getFirst(); + assertTrue(flowFile.isPenalized(), "FlowFile should be penalized on failure"); + } + + @Test + void testContainerAndBlobNameFromFlowFileAttributes() { + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER, + "${azure.container}"); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME, + "${azure.blobname}"); + + final String dynamicContainer = "other-container"; + final String dynamicBlob = "other-blob"; + + final BlobContainerClient otherContainerClient = mock(BlobContainerClient.class); + final BlobClient otherBlobClient = mock(BlobClient.class); + final BlobProperties otherBlobProperties = mock(BlobProperties.class); + + when(storageClient.getBlobContainerClient(dynamicContainer)) + .thenReturn(otherContainerClient); + when(otherContainerClient.getBlobClient(dynamicBlob)) + .thenReturn(otherBlobClient); + when(otherBlobClient.getProperties()).thenReturn(otherBlobProperties); + when(otherBlobProperties.getMetadata()).thenReturn(Map.of("origin", "external")); + + runner.enqueue("", Map.of( + "azure.container", dynamicContainer, + "azure.blobname", dynamicBlob + )); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + assertEquals("external", flowFile.getAttribute("azure.user.metadata.origin")); + } + + @Test + void testProvenanceEventOnFound() { + when(blobProperties.getMetadata()).thenReturn(Map.of("key", "value")); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final ProvenanceEventRecord modifyEvent = runner.getProvenanceEvents().stream() + .filter(e -> e.getEventType() == ProvenanceEventType.ATTRIBUTES_MODIFIED) + .findFirst() + .orElse(null); + assertNotNull(modifyEvent, "Should have an ATTRIBUTES_MODIFIED provenance event"); + } + + @Test + void testMetadataAttributePrefix() { + when(blobProperties.getMetadata()).thenReturn(Map.of("customKey", "customValue")); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + assertEquals("customValue", flowFile.getAttribute("azure.user.metadata.customKey")); + assertNull(flowFile.getAttribute("customKey"), + "Raw key should not appear without prefix"); + } + + @Test + void testMultipleFlowFiles() { + when(blobProperties.getMetadata()) + .thenReturn(Map.of("seq", "1")) + .thenReturn(Map.of("seq", "2")); + + runner.enqueue(""); + runner.enqueue(""); + runner.run(2); + + assertEquals(2, runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).size()); + } + + @Test + void testMetadataWithManyEntries() { + final Map largeMetadata = new HashMap<>(); + for (int i = 0; i < 20; i++) { + largeMetadata.put("key" + i, "value" + i); + } + when(blobProperties.getMetadata()).thenReturn(largeMetadata); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + + for (int i = 0; i < 20; i++) { + assertEquals("value" + i, + flowFile.getAttribute("azure.user.metadata.key" + i)); + } + } + + private static BlobStorageException mockBlobStorageException(BlobErrorCode errorCode) { + final BlobStorageException exception = mock(BlobStorageException.class); + when(exception.getErrorCode()).thenReturn(errorCode); + when(exception.getMessage()).thenReturn("Mocked: " + errorCode); + return exception; + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageTags_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageTags_v12.java new file mode 100644 index 000000000000..e4cfe1e69a2f --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestGetAzureBlobStorageTags_v12.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestGetAzureBlobStorageTags_v12 { + + private static final String CONTAINER_NAME = "test-container"; + private static final String BLOB_NAME = "test-blob"; + + private TestRunner runner; + private BlobServiceClient storageClient; + private BlobClient blobClient; + + @BeforeEach + void setUp() { + storageClient = mock(BlobServiceClient.class); + final BlobContainerClient containerClient = mock(BlobContainerClient.class); + blobClient = mock(BlobClient.class); + + when(storageClient.getBlobContainerClient(CONTAINER_NAME)).thenReturn(containerClient); + when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient); + + final GetAzureBlobStorageTags_v12 processor = new GetAzureBlobStorageTags_v12() { + @Override + protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + return storageClient; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return super.getSupportedPropertyDescriptors().stream() + .filter(pd -> !pd.equals(BLOB_STORAGE_CREDENTIALS_SERVICE)) + .toList(); + } + }; + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER, CONTAINER_NAME); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME, BLOB_NAME); + } + + @Test + void testSuccessfulTagRetrieval() { + final Map tags = Map.of( + "environment", "production", + "department", "engineering", + "cost-center", "12345" + ); + when(blobClient.getTags()).thenReturn(tags); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + + assertEquals("production", flowFile.getAttribute("azure.tag.environment")); + assertEquals("engineering", flowFile.getAttribute("azure.tag.department")); + assertEquals("12345", flowFile.getAttribute("azure.tag.cost-center")); + } + + @Test + void testEmptyTagsRetrieval() { + when(blobClient.getTags()).thenReturn(Map.of()); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + + flowFile.getAttributes().forEach((key, value) -> + assertFalse(key.startsWith("azure.tag."), + "No tag attributes should be set when blob has no tags, found: " + key) + ); + } + + @Test + void testBlobNotFound() { + BlobStorageException exception = mockBlobStorageException(BlobErrorCode.BLOB_NOT_FOUND); + when(blobClient.getTags()).thenThrow(exception); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_NOT_FOUND, 1); + } + + @Test + void testOtherBlobStorageExceptionRoutesToFailure() { + BlobStorageException exception = mockBlobStorageException(BlobErrorCode.AUTHORIZATION_FAILURE); + when(blobClient.getTags()).thenThrow(exception); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FAILURE).getFirst(); + assertTrue(flowFile.isPenalized(), "FlowFile should be penalized on failure"); + } + + @Test + void testContainerAndBlobNameFromFlowFileAttributes() { + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.CONTAINER, + "${azure.container}"); + runner.setProperty(AbstractGetAzureBlobStoragePropertiesProcessor_v12.BLOB_NAME, + "${azure.blobname}"); + + final String dynamicContainer = "dynamic-container"; + final String dynamicBlob = "dynamic-blob"; + + final BlobContainerClient dynamicContainerClient = mock(BlobContainerClient.class); + final BlobClient dynamicBlobClient = mock(BlobClient.class); + when(storageClient.getBlobContainerClient(dynamicContainer)) + .thenReturn(dynamicContainerClient); + when(dynamicContainerClient.getBlobClient(dynamicBlob)) + .thenReturn(dynamicBlobClient); + when(dynamicBlobClient.getTags()).thenReturn(Map.of("region", "us-east")); + + runner.enqueue("", Map.of( + "azure.container", dynamicContainer, + "azure.blobname", dynamicBlob + )); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + assertEquals("us-east", flowFile.getAttribute("azure.tag.region")); + } + + @Test + void testProvenanceEventOnFound() { + when(blobClient.getTags()).thenReturn(Map.of("key", "value")); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final ProvenanceEventRecord modifyEvent = runner.getProvenanceEvents().stream() + .filter(e -> e.getEventType() == ProvenanceEventType.ATTRIBUTES_MODIFIED) + .findFirst() + .orElse(null); + assertNotNull(modifyEvent, "Should have an ATTRIBUTES_MODIFIED provenance event"); + } + + @Test + void testTagAttributePrefix() { + when(blobClient.getTags()).thenReturn(Map.of("myKey", "myValue")); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).getFirst(); + assertEquals("myValue", flowFile.getAttribute("azure.tag.myKey")); + } + + @Test + void testMultipleFlowFiles() { + when(blobClient.getTags()) + .thenReturn(Map.of("batch", "1")) + .thenReturn(Map.of("batch", "2")); + + runner.enqueue(""); + runner.enqueue(""); + runner.run(2); + + assertEquals(2, runner.getFlowFilesForRelationship( + AbstractGetAzureBlobStoragePropertiesProcessor_v12.REL_FOUND).size()); + } + + private static BlobStorageException mockBlobStorageException(BlobErrorCode errorCode) { + final BlobStorageException exception = mock(BlobStorageException.class); + when(exception.getErrorCode()).thenReturn(errorCode); + when(exception.getMessage()).thenReturn("Mocked: " + errorCode); + return exception; + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage_v12.java index 74f78e666457..a48d2082e6b1 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage_v12.java @@ -16,19 +16,61 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobType; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.migration.ProxyServiceMigration; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.PropertyMigrationResult; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_USER_METADATA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestPutAzureBlobStorage_v12 { + + private static final String CONTAINER_NAME = "test-container"; + private static final String BLOB_NAME = "test-blob"; + private static final String TEST_CONTENT = "test content"; + private static final String ETAG = "test-etag"; + private static final String BLOB_URL = "https://account.blob.core.windows.net/test-container/test-blob"; + + private TestRunner runner; + private BlobClient blobClient; + private ArgumentCaptor uploadOptionsCaptor; + @Test void testMigration() { TestRunner runner = TestRunners.newTestRunner(PutAzureBlobStorage_v12.class); @@ -43,4 +85,302 @@ void testMigration() { assertEquals(expectedRenamed, propertyMigrationResult.getPropertiesRenamed()); } + + @BeforeEach + void setUp() { + final BlobServiceClient storageClient = mock(BlobServiceClient.class); + final BlobContainerClient containerClient = mock(BlobContainerClient.class); + blobClient = mock(BlobClient.class); + uploadOptionsCaptor = ArgumentCaptor.forClass(BlobParallelUploadOptions.class); + + when(storageClient.getBlobContainerClient(CONTAINER_NAME)).thenReturn(containerClient); + when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient); + when(containerClient.exists()).thenReturn(true); + when(blobClient.getBlobUrl()).thenReturn(BLOB_URL); + when(blobClient.getContainerName()).thenReturn(CONTAINER_NAME); + when(blobClient.getBlobName()).thenReturn(BLOB_NAME); + + final BlockBlobItem blockBlobItem = mock(BlockBlobItem.class); + when(blockBlobItem.getETag()).thenReturn(ETAG); + when(blockBlobItem.getLastModified()).thenReturn(OffsetDateTime.now()); + + final Response response = mock(Response.class); + when(response.getValue()).thenReturn(blockBlobItem); + when(blobClient.uploadWithResponse( + any(BlobParallelUploadOptions.class), isNull(), any(Context.class))) + .thenReturn(response); + + final BlobProperties blobProperties = mock(BlobProperties.class); + when(blobProperties.getBlobType()).thenReturn(BlobType.BLOCK_BLOB); + when(blobProperties.getETag()).thenReturn(ETAG); + when(blobProperties.getContentType()).thenReturn("application/octet-stream"); + when(blobProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + when(blobProperties.getBlobSize()).thenReturn((long) TEST_CONTENT.length()); + when(blobProperties.getContentLanguage()).thenReturn(null); + when(blobClient.getProperties()).thenReturn(blobProperties); + + final PutAzureBlobStorage_v12 processor = new PutAzureBlobStorage_v12() { + @Override + protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + return storageClient; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return super.getSupportedPropertyDescriptors().stream() + .filter(pd -> !pd.equals(BLOB_STORAGE_CREDENTIALS_SERVICE)) + .toList(); + } + }; + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractAzureBlobProcessor_v12.BLOB_NAME, BLOB_NAME); + runner.setProperty("Container Name", CONTAINER_NAME); + } + + @Test + void testTagsFromFlowFileAttributesWithPrefix() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "azure.tag."); + + final Map attributes = new HashMap<>(); + attributes.put("azure.tag.environment", "production"); + attributes.put("azure.tag.team", "engineering"); + attributes.put("other.attribute", "should-be-ignored"); + + runner.enqueue(TEST_CONTENT, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map tags = uploadOptionsCaptor.getValue().getTags(); + + assertNotNull(tags, "Tags should not be null when tag prefix is set"); + assertEquals(2, tags.size()); + assertEquals("production", tags.get("azure.tag.environment")); + assertEquals("engineering", tags.get("azure.tag.team")); + } + + @Test + void testTagsFromFlowFileAttributesWithPrefixRemoval() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "azure.tag."); + runner.setProperty(PutAzureBlobStorage_v12.REMOVE_TAG_PREFIX, "true"); + + final Map attributes = new HashMap<>(); + attributes.put("azure.tag.environment", "staging"); + attributes.put("azure.tag.department", "finance"); + attributes.put("unrelated.key", "unrelated-value"); + + runner.enqueue(TEST_CONTENT, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map tags = uploadOptionsCaptor.getValue().getTags(); + + assertNotNull(tags); + assertEquals(2, tags.size()); + assertEquals("staging", tags.get("environment")); + assertEquals("finance", tags.get("department")); + } + + @Test + void testNoTagsWhenPrefixNotSet() { + runner.enqueue(TEST_CONTENT, Map.of("azure.tag.something", "value")); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + assertNull(uploadOptionsCaptor.getValue().getTags(), + "Tags should be null when tag prefix property is not set"); + } + + @Test + void testNoMatchingTagAttributes() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "azure.tag."); + + runner.enqueue(TEST_CONTENT, Map.of("other.key", "other-value")); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map tags = uploadOptionsCaptor.getValue().getTags(); + + assertNotNull(tags); + assertTrue(tags.isEmpty(), + "Tags should be empty when no attributes match the prefix"); + } + + @Test + void testTagPrefixWithExpressionLanguage() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "${tag.prefix}"); + + final Map attributes = new HashMap<>(); + attributes.put("tag.prefix", "custom."); + attributes.put("custom.region", "us-east-1"); + attributes.put("custom.tier", "standard"); + + runner.enqueue(TEST_CONTENT, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map tags = uploadOptionsCaptor.getValue().getTags(); + + assertNotNull(tags); + assertEquals(2, tags.size()); + assertEquals("us-east-1", tags.get("custom.region")); + assertEquals("standard", tags.get("custom.tier")); + } + + @Test + void testRemoveTagPrefixDefaultsToFalse() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "pfx."); + + runner.enqueue(TEST_CONTENT, Map.of("pfx.key", "val")); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map tags = uploadOptionsCaptor.getValue().getTags(); + + assertNotNull(tags); + assertTrue(tags.containsKey("pfx.key"), + "Full attribute name should be the tag key when prefix removal is off"); + assertFalse(tags.containsKey("key"), + "Stripped key should not appear when prefix removal is off"); + } + + @Test + void testUserMetadataFromDynamicProperties() { + runner.setProperty("x-custom-header", "header-value"); + runner.setProperty("department", "engineering"); + + runner.enqueue(TEST_CONTENT); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map metadata = + uploadOptionsCaptor.getValue().getMetadata(); + + assertNotNull(metadata); + assertEquals(2, metadata.size()); + assertEquals("header-value", metadata.get("x-custom-header")); + assertEquals("engineering", metadata.get("department")); + } + + @Test + void testUserMetadataWithExpressionLanguage() { + runner.setProperty("source-system", "${system.name}"); + + runner.enqueue(TEST_CONTENT, Map.of("system.name", "crm-export")); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final Map metadata = + uploadOptionsCaptor.getValue().getMetadata(); + + assertNotNull(metadata); + assertEquals("crm-export", metadata.get("source-system")); + } + + @Test + void testNoUserMetadataWhenNoDynamicProperties() { + runner.enqueue(TEST_CONTENT); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + assertNull(uploadOptionsCaptor.getValue().getMetadata(), + "Metadata should not be set when no dynamic properties exist"); + } + + @Test + void testUserMetadataAttributeOnSuccessFlowFile() { + runner.setProperty("project", "alpha"); + runner.setProperty("owner", "team-a"); + + runner.enqueue(TEST_CONTENT); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractAzureBlobProcessor_v12.REL_SUCCESS).getFirst(); + final String attr = flowFile.getAttribute(ATTR_NAME_USER_METADATA); + + assertNotNull(attr, "User metadata attribute should be present"); + assertTrue(attr.contains("project=alpha")); + assertTrue(attr.contains("owner=team-a")); + } + + @Test + void testNoUserMetadataAttributeWhenNoDynamicProperties() { + runner.enqueue(TEST_CONTENT); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractAzureBlobProcessor_v12.REL_SUCCESS).getFirst(); + assertNull(flowFile.getAttribute(ATTR_NAME_USER_METADATA), + "User metadata attribute should be absent when no dynamic properties exist"); + } + + @Test + void testBothTagsAndMetadataSet() { + runner.setProperty(PutAzureBlobStorage_v12.BLOB_TAG_PREFIX, "tag."); + runner.setProperty(PutAzureBlobStorage_v12.REMOVE_TAG_PREFIX, "true"); + runner.setProperty("meta-key", "meta-value"); + + runner.enqueue(TEST_CONTENT, Map.of("tag.env", "prod")); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + verify(blobClient).uploadWithResponse( + uploadOptionsCaptor.capture(), isNull(), any(Context.class)); + final BlobParallelUploadOptions options = uploadOptionsCaptor.getValue(); + + assertEquals("prod", options.getTags().get("env")); + assertEquals("meta-value", options.getMetadata().get("meta-key")); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship( + AbstractAzureBlobProcessor_v12.REL_SUCCESS).getFirst(); + assertNotNull(flowFile.getAttribute(ATTR_NAME_USER_METADATA)); + } + + @Test + void testProvenanceEventOnSuccess() { + runner.enqueue(TEST_CONTENT); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor_v12.REL_SUCCESS, 1); + + final ProvenanceEventRecord sendEvent = runner.getProvenanceEvents().stream() + .filter(e -> e.getEventType() == ProvenanceEventType.SEND) + .findFirst() + .orElse(null); + assertNotNull(sendEvent, "Should have a SEND provenance event"); + assertTrue(sendEvent.getTransitUri().contains(BLOB_NAME)); + } }