Joining Streams
It is possible to join two streams or topics using any criterion that can be expressed in terms of the messages themselves. The join is always done within a configured time window. A join part has the following fields:
Field | Mandatory | Description |
---|---|---|
left | Yes | An object with two fields. The first is one of fromCollection , fromStream or fromTopic , which are defined in streams. The second is the on field, which is a MongoDB query expression. |
right | Yes | An object with two fields. The first is one of fromCollection , fromStream or fromTopic , which are defined in streams. The second is the on field, which is a MongoDB query expression. |
name | Yes | The name of the output stream. Other streams can connect to it with that name. |
toCollection | No | The name of the MongoDB collection to which the messages are sent. |
toString | No | A boolean field that, when the toTopic field is present, will cause the JSON messages to be written as strings. |
toTopic | No | The name of the Kafka topic to which this stream will be connected as a producer. |
type | Yes | The value is always join . |
window | No | The time window in milliseconds. The default is an infinite window. |
The produced messages will have the fields left
and right
, with the respective messages from the incoming streams.
Two MongoDB collections will be generated. The name is structured as <app>-<part>-join-<side>[-<env>]
. The original messages are saved in the collections. The additional field _join_timestamp
is added for the join window. Create indexes on _join_timestamp
and based on the fields left.on
and right.on
respectively. If you use MongoDB Atlas, then you use the field _join_timestamp
to do automatic archiving. The value of the field is epoch millis. You can also add a TTL index based on that field to get rid of old documents automatically.
The following example joins the command and event streams from the above-mentioned aggregate over a time window of 5 seconds, using the correlation ID. It writes the results to the topic string-test
serialised as strings.
---
application: "my-app"
version: "1.0"
parts:
- type: "join"
name: "joined"
toTopic: "string-test"
toString: true
window: 5000
left:
fromStream: "plusminus-counter-command"
on: "$_corr"
right:
fromStream: "plusminus-counter-event"
on: "$_corr"