Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public class MergeContent extends BinFiles {
public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();

private static final Set<String> FRAGMENT_ATTRIBUTE_KEYS = Set.of(
FRAGMENT_ID_ATTRIBUTE, FRAGMENT_INDEX_ATTRIBUTE, FRAGMENT_COUNT_ATTRIBUTE, SEGMENT_ORIGINAL_FILENAME);

public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
Expand Down Expand Up @@ -469,15 +472,16 @@ protected void setUpBinManager(final BinManager binManager, final ProcessContext
protected BinProcessingResult processBin(final Bin bin, final ProcessContext context) throws ProcessException {
final BinProcessingResult binProcessingResult = new BinProcessingResult(true);

final boolean isDefragment = context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class) == MergeStrategy.DEFRAGMENT;
MergeBin merger = switch (context.getProperty(MERGE_FORMAT).asAllowableValue(MergeFormat.class)) {
case TAR -> new TarMerge();
case ZIP -> new ZipMerge(context.getProperty(COMPRESSION_LEVEL).asInteger());
case FLOWFILE_STREAM_V3 ->
new FlowFileStreamMerger(new FlowFilePackagerV3(), StandardFlowFileMediaType.VERSION_3.getMediaType());
new FlowFileStreamMerger(new FlowFilePackagerV3(), StandardFlowFileMediaType.VERSION_3.getMediaType(), isDefragment);
case FLOWFILE_STREAM_V2 ->
new FlowFileStreamMerger(new FlowFilePackagerV2(), StandardFlowFileMediaType.VERSION_2.getMediaType());
new FlowFileStreamMerger(new FlowFilePackagerV2(), StandardFlowFileMediaType.VERSION_2.getMediaType(), isDefragment);
case FLOWFILE_TAR_V1 ->
new FlowFileStreamMerger(new FlowFilePackagerV1(), StandardFlowFileMediaType.VERSION_1.getMediaType());
new FlowFileStreamMerger(new FlowFilePackagerV1(), StandardFlowFileMediaType.VERSION_1.getMediaType(), isDefragment);
case CONCAT -> new BinaryConcatenationMerge();
case AVRO -> new AvroMerge();
};
Expand All @@ -487,7 +491,7 @@ protected BinProcessingResult processBin(final Bin bin, final ProcessContext con
final List<FlowFile> contents = bin.getContents();
final ProcessSession binSession = bin.getSession();

if (context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class) == MergeStrategy.DEFRAGMENT) {
if (isDefragment) {
final String error = getDefragmentValidationError(bin.getContents());

// Fail the FlowFiles and commit them
Expand Down Expand Up @@ -520,6 +524,11 @@ protected BinProcessingResult processBin(final Bin bin, final ProcessContext con

bundle = binSession.putAllAttributes(bundle, bundleAttributes);

if (isDefragment) {
bundle = binSession.removeAllAttributes(bundle, Set.of(
FRAGMENT_ID_ATTRIBUTE, FRAGMENT_INDEX_ATTRIBUTE, FRAGMENT_COUNT_ATTRIBUTE, SEGMENT_ORIGINAL_FILENAME));
}

final String inputDescription = contents.size() < 10 ? contents.toString() : contents.size() + " FlowFiles";

getLogger().info("Merged {} into {}. Reason for merging: {}", inputDescription, bundle, bin.getEvictionReason());
Expand Down Expand Up @@ -1006,10 +1015,12 @@ private class FlowFileStreamMerger implements MergeBin {

private final FlowFilePackager packager;
private final String mimeType;
private final boolean removeFragmentAttributes;

public FlowFileStreamMerger(final FlowFilePackager packager, final String mimeType) {
public FlowFileStreamMerger(final FlowFilePackager packager, final String mimeType, final boolean removeFragmentAttributes) {
this.packager = packager;
this.mimeType = mimeType;
this.removeFragmentAttributes = removeFragmentAttributes;
}

@Override
Expand All @@ -1030,6 +1041,9 @@ public FlowFile merge(final Bin bin, final ProcessContext context) {
bin.getSession().read(flowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
if (removeFragmentAttributes) {
FRAGMENT_ATTRIBUTE_KEYS.forEach(attributes::remove);
}
packager.packageFlowFile(in, out, attributes, flowFile.getSize());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
Expand Down Expand Up @@ -96,13 +97,20 @@
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type "
+ "attribute is set to application/octet-stream."),
@WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated "
+ "UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same value for this "
+ "attribute, determined by the Fragment Identifier Value property. For TAR and ZIP formats this attribute "
+ "is always written. For FlowFile stream formats it is written when Add Fragment Attributes to FlowFile "
+ "Streams is enabled."),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single "
+ "parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"),
+ "parent FlowFile. For TAR and ZIP formats this attribute is always written. For FlowFile stream formats "
+ "it is written when Add Fragment Attributes to FlowFile Streams is enabled."),
@WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile. For TAR and ZIP formats this "
+ "attribute is always written. For FlowFile stream formats it is written when Add Fragment Attributes to "
+ "FlowFile Streams is enabled."),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because "
+ "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile"),
+ "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile. "
+ "For TAR and ZIP formats this attribute is always written. For FlowFile stream formats it is written when "
+ "Add Fragment Attributes to FlowFile Streams is enabled."),
@WritesAttribute(attribute = UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and time that the unpacked file was last modified (tar and zip only)."),
@WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, description = "The date and time that the file was created. For encrypted zip files this attribute" +
" always holds the same value as " + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted zip files if available it will be returned otherwise" +
Expand Down Expand Up @@ -200,10 +208,35 @@ public class UnpackContent extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor ADD_FRAGMENT_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Add Fragment Attributes to FlowFile Streams")
.description("When enabled, assigns fragment.identifier, fragment.index, fragment.count, and "
+ "segment.original.filename to FlowFiles unpacked from FlowFile stream formats. TAR and ZIP "
+ "formats always write these attributes. Enabling this allows FlowFile stream output to be "
+ "regrouped by MergeContent in Defragment mode.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor FRAGMENT_IDENTIFIER_VALUE = new PropertyDescriptor.Builder()
.name("Fragment Identifier Value")
.description("Expression evaluated once against the incoming FlowFile to determine the value of "
+ "fragment.identifier on all unpacked children. The default ${UUID()} generates a unique "
+ "random identifier per parent FlowFile.")
.required(false)
.defaultValue("${UUID()}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
PACKAGING_FORMAT,
ZIP_FILENAME_CHARSET,
FILE_FILTER,
ADD_FRAGMENT_ATTRIBUTES,
FRAGMENT_IDENTIFIER_VALUE,
PASSWORD,
ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR
);
Expand Down Expand Up @@ -348,8 +381,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
return;
}

if (addFragmentAttrs) {
finishFragmentAttributes(session, flowFile, unpacked);
final boolean addFragmentAttrsEnabled = context.getProperty(ADD_FRAGMENT_ATTRIBUTES).asBoolean();
if (addFragmentAttrs || addFragmentAttrsEnabled) {
final String customFragmentId = getFragmentIdentifierValue(context, flowFile);
finishFragmentAttributesCustom(session, flowFile, unpacked, customFragmentId);
}
session.transfer(unpacked, REL_SUCCESS);
final String fragmentId = !unpacked.isEmpty() ? unpacked.getFirst().getAttribute(FRAGMENT_ID) : null;
Expand Down Expand Up @@ -691,35 +726,42 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li
}
}

private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
// first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments
int fragmentCount = 0;
for (FlowFile ff : unpacked) {
String fragmentIndex = ff.getAttribute(FRAGMENT_INDEX);
if (fragmentIndex != null) {
fragmentCount++;
} else {
return;
}
}

String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
if (originalFilename.endsWith(".tar") || originalFilename.endsWith(".zip") || originalFilename.endsWith(".pkg")) {
originalFilename = originalFilename.substring(0, originalFilename.length() - 4);
}

// second pass adds fragment attributes
List<FlowFile> newList = new ArrayList<>(unpacked);
private void finishFragmentAttributesCustom(final ProcessSession session, final FlowFile source,
final List<FlowFile> unpacked, final String fragmentId) {
final String originalFilename = stripArchiveExtension(source.getAttribute(CoreAttributes.FILENAME.key()));
final int fragmentCount = unpacked.size();
final List<FlowFile> newList = new ArrayList<>(unpacked);
unpacked.clear();
for (FlowFile ff : newList) {
FlowFile newFF = session.putAllAttributes(ff, Map.of(
for (int i = 0; i < newList.size(); i++) {
final FlowFile newFF = session.putAllAttributes(newList.get(i), Map.of(
FRAGMENT_ID, fragmentId,
FRAGMENT_INDEX, String.valueOf(i + 1),
FRAGMENT_COUNT, String.valueOf(fragmentCount),
SEGMENT_ORIGINAL_FILENAME, originalFilename
));
unpacked.add(newFF);
}
}

private String getFragmentIdentifierValue(final ProcessContext context, final FlowFile source) {
final String fragmentId = context.getProperty(FRAGMENT_IDENTIFIER_VALUE)
.evaluateAttributeExpressions(source).getValue();
if (fragmentId == null || fragmentId.isBlank()) {
throw new ProcessException("Fragment Identifier Value must evaluate to a non-empty value");
}
return fragmentId;
}

private static String stripArchiveExtension(final String filename) {
if (filename == null) {
return "";
}
if (filename.endsWith(".tar") || filename.endsWith(".zip") || filename.endsWith(".pkg")) {
return filename.substring(0, filename.length() - 4);
}
return filename;
}

protected enum PackageFormat implements DescribedValue {
AUTO_DETECT_FORMAT(AUTO_DETECT_FORMAT_NAME, null, null),
TAR_FORMAT(TAR_FORMAT_NAME, null, "application/x-tar"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ fragment.identifier" attribute. Each FlowFile with the same identifier must also
fragment.index" attribute so that the FlowFiles can be ordered correctly. For a given "fragment.identifier", at least
one FlowFile must have the "fragment.count" attribute (which indicates how many FlowFiles belong in the bin). Other
FlowFiles with the same identifier must have the same value for the "fragment.count" attribute, or they can omit this
attribute. **NOTE:** while there are valid use cases for breaking apart FlowFiles and later re-merging them, it is an
attribute. Once defragmentation is complete, the fragment attributes (fragment.identifier, fragment.index, fragment.count,
and segment.original.filename) are automatically removed from the merged output FlowFile. When the Merge Format is a
FlowFile stream, these attributes are also removed from each inner FlowFile packaged within the stream, so that unpacking
the result does not produce FlowFiles with stale fragment attributes. **NOTE:** while there are valid use cases for breaking apart FlowFiles and later re-merging them, it is an
antipattern to take a larger FlowFile, break it into a million tiny FlowFiles, and then re-merge them. Doing so can
result in using huge amounts of Java heap and can result in Out Of Memory Errors. Additionally, it adds large amounts of
load to the NiFi framework. This can result in increased CPU and disk utilization and often times can be an order of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,75 @@ private void createFlowFiles(final TestRunner testRunner) {
testRunner.enqueue("World!", attributes);
}

@Test
public void testDefragmentRemoveFragmentAttributes() throws IOException {
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MergeStrategy.DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");

final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
attributes.put(MergeContent.SEGMENT_ORIGINAL_FILENAME, "original");

runner.enqueue("Hello ".getBytes(StandardCharsets.UTF_8), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("World".getBytes(StandardCharsets.UTF_8), attributes);

runner.run();

runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).getFirst();
assembled.assertContentEquals("Hello World".getBytes(StandardCharsets.UTF_8));
assembled.assertAttributeNotExists(MergeContent.FRAGMENT_ID_ATTRIBUTE);
assembled.assertAttributeNotExists(MergeContent.FRAGMENT_INDEX_ATTRIBUTE);
assembled.assertAttributeNotExists(MergeContent.FRAGMENT_COUNT_ATTRIBUTE);
assembled.assertAttributeNotExists(MergeContent.SEGMENT_ORIGINAL_FILENAME);
}

@Test
public void testDefragmentRemoveFragmentAttributesFromFlowFileStream() throws IOException {
// Defragment two fragments into a FlowFile stream V3
runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MergeStrategy.DEFRAGMENT);
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MergeFormat.FLOWFILE_STREAM_V3);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");

final Map<String, String> attributes = new HashMap<>();
attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
attributes.put(MergeContent.SEGMENT_ORIGINAL_FILENAME, "original");

runner.enqueue("Hello ".getBytes(StandardCharsets.UTF_8), attributes);
attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
runner.enqueue("World".getBytes(StandardCharsets.UTF_8), attributes);

runner.run();

runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile merged = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).getFirst();

// Verify outer FlowFile has no fragment attributes
merged.assertAttributeNotExists(MergeContent.FRAGMENT_ID_ATTRIBUTE);
merged.assertAttributeNotExists(MergeContent.FRAGMENT_INDEX_ATTRIBUTE);
merged.assertAttributeNotExists(MergeContent.FRAGMENT_COUNT_ATTRIBUTE);
merged.assertAttributeNotExists(MergeContent.SEGMENT_ORIGINAL_FILENAME);

// Unpack the stream and verify inner FlowFiles also have no fragment attributes
final TestRunner unpackRunner = TestRunners.newTestRunner(new UnpackContent());
unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3);
unpackRunner.enqueue(merged);
unpackRunner.run();

unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
for (final MockFlowFile inner : unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS)) {
inner.assertAttributeNotExists(MergeContent.FRAGMENT_ID_ATTRIBUTE);
inner.assertAttributeNotExists(MergeContent.FRAGMENT_INDEX_ATTRIBUTE);
inner.assertAttributeNotExists(MergeContent.FRAGMENT_COUNT_ATTRIBUTE);
inner.assertAttributeNotExists(MergeContent.SEGMENT_ORIGINAL_FILENAME);
}
}

@Test
void testMigrateProperties() {
final Map<String, String> expectedRenamed = Map.ofEntries(
Expand Down
Loading
Loading