The MergeContent processor in Apache NiFi is one of the most useful processors but can also be one of the biggest sources of confusion. The processor (you guessed it!) merges flowfiles together based on a merge strategy. The processor’s purpose is straightforward but its properties can be tricky. In this post we describe how it can be used to merge previously split flowfiles together.
In a recent NiFi flow, the flow was being split into separate pipelines. Both pipelines executed independently and when both were complete they were merged back into a single flowfile. The MergeContent will be using Defragment as the Merge Strategy. In MergeContent-speak, the split flowfiles became fragments. There are two fragments and we can refer to them as 0 and 1. Which is which doesn’t really matter. What does matter is that the index is unique and less than the fragment count (2).
Merging by Defragment
Using the Defragment merge strategy requires some attributes to be placed on the flowfile. Those attributes are:
- fragment.identifier – All flowfiles with the same fragment.identifier will be grouped together.
- fragment.count – The count of flowfile fragments, i.e. how many splits do we have? (All flowfiles having the same fragment.identifier must have the same value for fragment.count.)
- fragment.index – Identifies the index of the flowfile in the group. All flowfiles in the group must have a unique fragment.index value that is between 0 and the count of fragments.
You can set these attributes using an UpdateAttribute processor. In the screenshot below, our flowfile was previously split into 5 fragments. The common attribute value across each of the fragments is some.id. This UpdateAttribute processor is setting this flowfile as index 0.
With these attributes set, when flowfiles reach the MergeContent processor it will know how to combine them. As flowfiles come into the MergeContent processor the value of the fragment.identifier attribute will be read. The MergeContent will bin the flowfiles based on this attribute. When the count of flowfiles binned equals the fragment.count the flowfiles will be merged together. This means that the MergeContent’s Maximum Number of Bins property should be equal to or greater than the number of fragment.identifers processed concurrently (source). So, if your flow has 100 flowfiles with unique fragment.identifier attribute values being processed at any given time you will want to have at least 100 bins.
The other properties of the MergeContent processor are mostly self-explanatory. For example, if you are merging text you can set the Demarcator property to separate the text. The Header and Footer properties allow you to sandwich the combined text with some values.
MergeContent on a Multi-Node NiFi Cluster
It’s important to remember that in a distributed NiFi cluster the MergeContent processor requires all fragments to be on the same node. An easy way to catch this is when flowfiles get “stuck” in the transition to the MergeContent processor and their positions are the same. (The same flowfile fragments are both at position N.) This means that one part of the flowfile is on one node and the other part is on the other node, hence the stuck flowfiles. You need to ensure that the upstream flow is putting the flowfiles onto the same NiFi node. One way to do this is to set the load balance strategy to partition and use ${some.id} as the attribute. This will ensure that flowfiles with the same value for the some.id attribute will be routed to the same node. (For more on load balancing check out this blog post from the Apache NiFi project.)