Joining Streams
It is possible to join two streams or topics using any criterion that can be expressed in terms of the messages themselves. The join is always done within a configured time window. A join part has the following fields:
| Field | Mandatory | Description |
|---|---|---|
| left | Yes | An object with two fields. The first is one of fromCollection, fromStream or fromTopic, which are defined in streams. The second is the on field, which is a MongoDB query expression. |
| right | Yes | An object with two fields. The first is one of fromCollection, fromStream or fromTopic, which are defined in streams. The second is the on field, which is a MongoDB query expression. |
| name | Yes | The name of the output stream. Other streams can connect to it with that name. |
| toCollection | No | The name of the MongoDB collection to which the messages are sent. |
| toString | No | A boolean field that, when the toTopic field is present, will cause the JSON messages to be written as strings. |
| toTopic | No | The name of the Kafka topic to which this stream will be connected as a producer. |
| type | Yes | The value is always join. |
| window | No | The time window in milliseconds. The default is an infinite window. |
The produced messages will have the fields left and right, with the respective messages from the incoming streams.
Two MongoDB collections will be generated. The name is structured as <app>-<part>-join-<side>[-<env>]. The original messages are saved in the collections. The additional field _join_timestamp is added for the join window. Create indexes on _join_timestamp and based on the fields left.on and right.on respectively. If you use MongoDB Atlas, then you use the field _join_timestamp to do automatic archiving. The value of the field is epoch millis. You can also add a TTL index based on that field to get rid of old documents automatically.
The following example joins the command and event streams from the above-mentioned aggregate over a time window of 5 seconds, using the correlation ID. It writes the results to the topic string-test serialised as strings.
---
application: "my-app"
version: "1.0"
parts:
- type: "join"
name: "joined"
toTopic: "string-test"
toString: true
window: 5000
left:
fromStream: "plusminus-counter-command"
on: "$_corr"
right:
fromStream: "plusminus-counter-event"
on: "$_corr"