Aggregation Pipeline Stages
This page lists the supported MongoDB aggregation pipeline stages, with possible deviations. Inside the stages the aggregation pipeline operators can be used.
$addFields
The following example pulls three fields from the incoming message and adds them up to form the new field totalScore
.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-added"
pipeline:
- $addFields:
totalScore:
$add:
- "$totalHomework"
- "$totalQuiz"
- "$extraCredit"
$bucket
A bucket is a grouping aggregation over a message stream. The state of the aggregation is kept in a MongoDB collection. The extension field _collection
can be used to specify that collection. If the field is not present a collection name will be generated from a digest of the bucket expression. This means that if you change the expression the collection will change too. This effectively resets the aggregation.
The accumulator expressions $addToSet
, $avg
, $count
, $last
, $max
, $mergeObjects
, $min
, $push
, $stdDevPop
and $sum
are supported.
Note that this is a streaming implementation. So, instead of receiving one message with the final result as you would with a MongoDB collection, you get all incremental changes to the aggregation.
The following example groups all message per the year_born
field and pours them in five buckets. Those that don't fit in one of the buckets fall into the "Other" bucket. The output messages will have an array of artists and the count.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-grouped"
pipeline:
- $bucket:
groupBy: "$year_born"
boundaries:
- 1840
- 1850
- 1860
- 1870
- 1880
default: "Other"
output:
count:
$sum: 1
artists:
$push:
name:
$concat:
- "$first_name"
- " "
- "$last_name"
year_born: "$year_born"
$count
The same remarks apply as for the $bucket
stage. The following example counts the scores that are higher than 80.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-counted-scores"
pipeline:
- $match:
score:
$gt: 80
- $count: "passing_scores"
$deduplicate
With this extension stage you can filter away duplicate messages according to a given expression, which goes in the expression
field. The collection
field denotes the MongoDB collection that is used for the state. If you use Atlas Archiving you can keep the collection small. Use the _timestamp
field for this. Its value is epoch millis. The optional field cacheWindow
is the time in milliseconds that messages are kept in a cache for duplicate checking. The default value is 1000. An example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-unique"
pipeline:
- $deduplicate:
expression:
$concat:
- "$_id"
_ "$_corr"
collection: "deduplicate-state"
cacheWindow: 60000
The commit to the collection happens when the next stage asks for more messages, which means it has successfully the previous ones. When the next stage is buffered and the previous stage hasn't yet sent as many messages as the buffer size when it completes, then those last messages won't be committed. They will be offered again when the application restarts.
$delay
With this extension stage you can send a messages to a Kafka topic with a delay. The order of the messages is not guaranteed. The stage is an object with two fields. The
duration
field is the number of milliseconds the operation is delayed. The topic
field is the Kafka topic to which the message is sent after the delay.
Note that message loss is possible if there is a failure in the middle of a delay.
The main use-case for this stage is retry logic. The following example submits messages to some HTTP-endpoint. The second part forwards successful submissions to the my-submitted-messages
Kafka topic. The third part filters on messages where the service was unavailable. After five seconds the messages are sent to the original topic. Note that in this case the five seconds are applied to every individual message.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "submit"
fromTopic: "my-messages"
pipeline:
- $http:
url: "https://my-domain/"
method: "POST"
body: "$$ROOT"
as: "result"
- type: "stream"
name: "success"
fromStream: "submit"
toTopic: "my-submitted-messages"
pipeline:
- $match:
httpError:
$exists: false
- type: "stream"
name: "failed"
fromStream: "submit"
pipeline:
- $match:
httpError.statusCode: 503
- $unset: "httpError"
- $delay:
duration: 5000
topic: "my-messages"
$delete
This is an extension stage to delete documents from a MongoDB collection. It has the mandatory fields from
and on
. The former is a MongoDB collection. The latter is either a string or a non-empty array of strings. It represents fields in the incoming JSON object. The stage deletes records from the collection for which the given fields have the same values as for the incoming JSON object. The output of the stage is the incoming JSON object. In case of failure it will retry forever. Here is an example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-deleted"
pipeline:
- $delete:
from: "my-collection"
on: "id"
$group
The same remarks apply as for the $bucket
stage. The following example groups the messages by the item
field, which becomes the _id
of the output messages. The totalSaleAmount
output field is the sum of the multiplications of the price
and quantity
fields. The second stage then selects those for which the field is greater than or equal to 100.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-grouped"
pipeline:
- $group:
_id: "$item"
totalSaleAmount:
$sum:
$multiply:
- "$price"
- "$quantity"
- $match:
totalSaleAmount:
$gte: 100
$http
With this extension stage you can "join" a JSON HTTP API with a data stream or cause side effects to it. The object supports the following fields:
Field | Mandatory | Description |
---|---|---|
as | No | This output field will carry the response body. Without it the response body is ignored. |
body | No | The result of this expression will be used as the request body. |
headers | No | The expression should yield an object. Its contents will be added as HTTP headers. Array values will result in multi-valued headers. |
method | Yes | The HTTP method. The expression should yield a string. |
sslContext | No | This object can be used for client-side authentication. Its keyStore field should refer to a PKCS#12 key store file. Its password field should provide the password for the keys in the key store file. |
unwind | No | When this Boolean field is set to true and when the response body is a JSON array, then for each entry in the array a message will be produced with the array entry in the as field. If the array is empty no messages are produced at all. |
url | Yes | The URL that will be called. The expression should yield a string. |
HTTP errors are put in the httpError
field, which contains the field integer statusCode
and body
, which can be any JSON value.
In the following example the URL is constructed with the id
field in the incoming message. The header X-My-Header
is populated with the meta
field. The output message will have the extra result
field.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-http"
pipeline:
- $http:
url:
$concat:
- "https://my-domain/"
- "$id"
method: "GET"
headers:
X-My-Header: "$meta"
as: "result"
$jslt
This extension stage transforms the incoming message with a JSLT script. Its specification should be a string. If it starts with resource:/
the script will be loaded as a class path resource, otherwise it is interpreted as a filename. If the transformation changes or adds the _id
field then that will become the key of the outgoing message.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-jslt"
pipeline:
- $jslt: "my_script.jslt"
$lag
This extension stage fetches the message lag for the Kafka cluster. It adds a JSON object to the message in the field that is defined in the as
field of the specification. The keys of the generated object are the consumer groups. The values are objects where the keys are the topics and the values are again objects. In there each partition has a key with a number as the value. An example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "message-lag"
fromTopic: "message-lag-trigger"
toTopic: "message-lag"
pipeline:
- $lag:
as: messageLag
The generated messages look like this:
{
"messageLag": {
"my-consumer": {
"topic1": {
"0": 0,
"1": 6,
"2": 10,
"3": 0
},
"topic2": {
"0": 0,
"1": 11,
"2": 0,
"3": 3
}
}
}
}
$log
With this extension stage you can write something to a Java logger. The incoming message will come out unchanged. The expression of the stage is an object with at least the field message
. The expression in this field will be converted to a string after evaluation. The logger to which the entries are sent has the same name as the application. The optional field level
can be used to set the Java log level. If its not there, the default log level will be used.
You can extra fields in the optional attributes
field. They will be added to the emitted OpenTelemtry attributes.
If the incoming message has the _corr
field, then the log entry will have a trace ID that is derived from it by removing the dashes from the UUID. A span ID will also be added. Its value will be the first half of the trace ID, because that is how JSON Streams defines the root span ID.
The following example logs the field subobject.field
with the log level INFO
.
---
application: "my-app"
version: "1.0.0"
parts:
- type: "stream"
name: " my-stream"
fromTopic: "my-messages"
pipeline:
- $log:
message: $subobject.field"
level: "INFO"
attributes:
dataset: "test"
original: "$value"
$lookup
The extra optional Boolean field inner
is available to make this pipeline stage behave like an inner join instead of an outer left join, which is the default. When the other optional Boolean field unwind
is set, multiple objects may be returned where the as
field will have a single value instead of an array. In this case the join will always be an inner join. With the unwind feature you can avoid the accumulation of large arrays in memory. When both the extra fields connectionString
and database
are present the query will go to that database instead of the default one.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-lookup"
pipeline:
- $lookup:
from: "my-collection"
as: "result"
let:
field1: "$field1"
field2: "$field2"
pipeline:
- $match:
record.field1: "$$field1"
record.field2: "$$field2"
record.error:
$exists: false
$match
See several other examples. The expressions support the query operators.
$merge
Pipeline values for the whenMatched
field are currently not supported. The into
field can only be the name of a collection. The database is always the one from the configuration. The optional key
field accepts an expression, which is applied to the incoming message. When it is present it will be used as the value for the _id
field in the MongoDB collection. The output of the stream is whatever has been updated to or taken from the MongoDB collection. The value of the _id
field of the incoming message will be kept for the output message.
$out
Because streams are infinite this operator behaves like $merge
with the following settings:
into: "<output-collection>"
on: "_id"
whenMatched: "replace"
whenNotMatched: "insert"
$per
This extension stage is an object with the mandatory fields amount
and as
. It accumulates the amount of messages and produces a message with only the field denoted by the as
field. The field is an array of messages. With the optional timeout
field, which is a number of milliseconds, a batch of messages can be emitted before it is full. In that case the length of the generated array will vary. An example
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-batch"
pipeline:
- $per:
amount: 100
as: "batch"
timeout: 500
$probe
With this extension stage you can monitor the throughput anywhere in a pipeline. The specification is an object with the fields name
and topic
, which is the name of a Kafka topic. It will write messages to that topic with the fields name
, minute
and count
, which represents the number of messages it has seen in that minute. Note that if your pipeline is running on multiple topic partitions you should group the messages on the specified topic by the name and minute and sum the count. That is because every instance of the pipeline only sees the messages that pass on the partitions that are assigned to it.
Say you have an application with a probe like the following:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-filtered"
pipeline:
- $match:
shouldFilter: true
- $probe:
name: "filtering"
topic: "pipeline-probe-part
The application that then aggregates all topic partitions could then look like this:
---
- application: "json-streams-probe"
version: "1.0"
parts:
- type: "stream"
name: "group-probe"
fromTopic: "pipeline-probe-part"
toTopic: "pipeline-probe"
pipeline:
- $group:
_id:
name: "$name"
minute: "$minute"
_collection: "group-probe"
count:
$sum: "$count"
- $set:
name: "$_id.name"
minute: "$_id.minute"
perSecond:
$round:
- $divide:
- "$count"
- 60
- 1
The counts are grouped over the probe name and then the perSecond
is added. If all other applications write their probes to the pipeline-probe-part
topic, then you only need to run the above application once. Otherwise you would pull out the pipeline and embed it in your applications.
$project
In the following example the resulting messages will only contain the calculated field totalScore
, together with the field _id
, which is output by default.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-added"
pipeline:
- $addFields:
totalScore:
$add:
- "$totalHomework"
- "$totalQuiz"
- "$extraCredit"
- $project:
totalScore: 1
$redact
In the following example objects with the field age
with a value greater than 17 will be processed further. The others will be removed.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-redacted"
pipeline:
- $redact:
$cond:
if:
$gt:
- "$age"
- 17
then: "$$DESCEND"
else: "$$PRUNE"
$replaceRoot
In the following example the contents of the field subobject
will become the output message.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-replaced"
pipeline:
- $replaceRoot:
newRoot: "$subobject"
$replaceWith
This stage works like the $replaceRoot
stage, but without the newRoot
field. The previous example would become the following:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-replaced"
pipeline:
- $replaceWith: "$subobject"
$send
With this extension stage you can send a message to a Kafka topic. The stage is an object with a topic
field, which is the Kafka topic to which the message is sent. The main use-case for this stage is dynamic routing of messages to topics. The messages that are processed by the following example have the field topic
. They are forwarded to the corresponding topic.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
pipeline:
- $send:
topic: "$topic"
$set
This is a synonym for the $addFields
stage.
$setKey
With this extension stage you can change the Kafka key of the message without changing the message itself. The stage expects an expression, the result of which will be converted to a string. Changing the key will repartition the stream. A use-case for this is serialising a message stream based on some business key.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-key"
pipeline:
- $setKey:
$concat:
- "$field1"
- "$field2"
$signJwt
A $http
stage sometime needs a signed JSON Web Token for its request. With this extension stage you can generate and sign one and put it in a temporary field, which you remove after the $http
stage. The following fields are ssupported:
Field | Mandatory | Description |
---|---|---|
as | Yes | The name of the field that will receive the JWT as a string. |
aud | No | The string expression that becomes the standard aud JWT claim. |
claims | No | The object expression that contains any non-standard JWT claims. |
iss | No | The string expression that becomes the standard iss JWT claim. |
kid | No | The string expression that becomes the standard kid JWT claim. |
privateKey | Yes | The string that represents the private key. You would pull that from the configuration. Don't put it the application directly. |
sub | No | The string expression that becomes the standard sub JWT claim. |
ttl | No | The time to live in seconds for the token. The default is five seconds. Making this longer reduces CPU usage. |
An example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-enriched"
pipeline:
- $signJwt:
privateKey: "${SECRET.larsPrivateKey}"
as: "_token"
ttl: 2592000
sub: "$_jwt.sub"
claims:
roles: "$_jwt.roles"
- $http:
url: "${END_POINT}"
method: "POST"
headers:
Authorization:
$concat:
- "Bearer "
- "$_token"
body:
traceId:
$jes-uuid: null
data: "my data"
as: "response"
- $unset: "_token"
$s3Attachments
This extension stage lets you post a number of S3-objects as attachments to an HTTP endpoint. The default content type is multipart/mixed
. You can change this in the headers. The object supports the following fields:
Field | Mandatory | Description |
---|---|---|
attachments | Yes | An array of objects with the mandatory fields bucket and key . The multipart/mixed request body will be constructed in the order of the array. Extra fields will be used as MIME part headers. The headers Content-Length and Content-Transfer-Encoding can't be overridden. |
headers | No | The expression should yield an object. Its contents will be added as HTTP headers. Array values will result in multi-valued headers. |
sslContext | No | This object can be used for client-side authentication. Its keyStore field should refer to a PKCS#12 key store file. Its password field should provide the password for the keys in the key store file. |
url | Yes | The URL that will be called. The expression should yield a string. |
HTTP errors are put in the httpError
field, which contains the field integer statusCode
and body
, which can be any JSON value.
The following is an example with an input message:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "stream"
fromTopic: "in"
toTopic: "out"
pipeline:
- $s3Attachments:
url: "http://localhost:9000"
attachments: "$attachments"
{
"_id": "87AF18F1-40FB-4C1E-9EE1-2A51927B5A4A",
"attachments": [
{
"bucket": "my-bucket",
"key": "com2012_0429nl01.pdf",
"x-my-header": "value"
},
{
"bucket": "my-bucket",
"key": "com2012_0444nl01.pdf"
},
{
"bucket": "my-bucket",
"key": "com2012_0445nl01.pdf"
},
{
"bucket": "my-bucket",
"key": "com2012_0448nl01.pdf"
}
]
}
$s3Csv
With this extension stage you can consume a CSV-file that resides in an S3-bucket. The lines are emitted as individual JSON messages. The first line will be used for the field names of the messages. The mandatory fields are bucket
and key
, which are the name of the bucket and the object key respectively. The optional field separator
has the default value "\t". You must ensure the proper credentials are available, either as an IAM-role or environment variables.
The following example consumes a topic that receives all kinds of S3-events. It selects a bucket and the CSV-objects in it.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-app"
fromTopic: "s3-events"
toTopic: "csv-messages"
pipeline:
- $match:
eventName:
$regex: "/^ObjectCreated:.*$/"
s3.bucket.name: "my-csv-bucket"
s3.object.key:
$regex: "/^.*.csv$/"
- $s3Csv:
bucket: "$s3.bucket.name"
key: "$s3.object.key"
separator: "\t"
$s3Out
This extension stage lets you write individual JSON messages to an S3-bucket. It has only the fields bucket
and key
, which are the name of the bucket and the object key respectively. You must ensure the proper credentials are available, either as an IAM-role or environment variables.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
pipeline:
- $s3Out:
$bucket: "my-bucket"
$key:
$concat:
- "$field1"
- "$field2"
$throttle
With this extension stage you can limit the throughput in a pipeline by setting the maxPerSecond
field. An example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
pipeline:
- $throttle:
maxPerSecond: 1000
$trace
This extension stage writes all JSON objects that pass through it to the Java logger net.pinctte.mongo.streams
with level INFO
. That is, when the stage doesn't have an expression (set to null
). If you give it an expression, its result will be written to the logger. This can be used for pipeline debugging. This is an example:
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-filtered"
pipeline:
- $match:
shouldFilter: true
- $trace: null
$unset
In the following example the field myfield
is removed in the output messages.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-unset"
pipeline:
- $unset: "myfield"
$unwind
The Boolean extension option newIds
will cause UUIDs to be generated for the output documents if the given array was not absent or empty. If the incoming messages have the values
array, then there will be as many output messages as there are values in that array. For each, the values
field will have the corresponding single value.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-stream"
fromTopic: "my-messages"
toTopic: "my-messages-unwind"
pipeline:
- $unwind: "$values"
$validate
This extension stage performs validation on the incoming messages with a validator. When it finds errors, the field _error
will be set to true
. The field errors
will then have an array of objects with the field location
, which will be set to the JSON pointer of the validation error. The optional field code
will be set to whatever code the validation condition has defined.
In the following example the presence of the field subobject.myfield
is checked. If it is absent an error entry with the location /subobject
and the code REQUIRED
will be added.
---
application: "my-app"
version: "1.0"
parts:
- type: "stream"
name: "my-validated-messages"
fromTopic: "my-messages"
pipeline:
- $validate:
conditions:
- subobject.myfield:
$exists: true
$code: "REQUIRED"
The value of this stage can also be a filename.