State Management
State is managed in an aggregate. An aggregate can define commands, which may have effect. When they do, the change is recorded and published as an event.
You manage the state by writing reducers for commands. A reducer receives a command and the current state of an aggregate instance. Its task is to calculate the new state. Reducers are supposed to not have side effects. As such, reducers are merely JSON transformations.
Reducers are written in JSLT or as an aggregation pipeline. A JSLT script receives an object with the fields command
and state
. It should return the new state. Scripts are allowed to import other scripts. You should always use relative filenames.
A pipeline receives the same object. Its output becomes the new state.
Aggregate Parts
An aggregate application part has the following fields:
Field | Mandatory | Description |
---|---|---|
aggregateType | Yes | The name of the aggregate. Usually is it composed as <app>-<type> . |
commands | No | A JSON object where the keys are the command names. If no commands are given only the built-in commands put , delete and patch will be available. |
environment | No | The environment for the aggregate. This will be used for Kafka topic suffixes. |
preprocessor | No | A pipeline that pre-processes commands before they are reduced. With this you can avoid adding another public Kafka topic in front of the command topic. |
type | Yes | The value is always aggregate . |
uniqueExpression | No | A MongoDB expression that is executed on aggregate instances. This expresses the uniqueness of aggregate instances based on some criterion. If you use this feature then you must make sure all commands also have the fields that constitute the unique expression. |
Commands have the following fields:
Field | Mandatory | Description |
---|---|---|
preprocessor | No | A pipeline that pre-processes commands with this name before they are reduced. With this you can avoid adding another public Kafka topic in front of the command topic. |
reducer | Yes | It can be the filename of a JSLT script, which may be relative. It receives a JSON object with the fields command and state . The generated object will be used as the new state. It can also be a pipeline, which also receives JSON objects with the fields command and state . The output of the pipeline becomes the new state. |
validator | No | A command validator. The value of the field must be a filename, which may be relative. If an expression wants to refer to the current state of an aggregate instance it can use the field _state . |
An aggregate creates the streams with the names <app>-<type>-aggregate
, <app>-<type>-command
, <app>-<type>-event
, <app>-<type>-event-full
and <app>-<type>-reply
. They correspond to the Kafka topics of an aggregate. You can connect to those streams in other parts of the application.
Aggregates
An aggregate is a JSON document, which can have any structure plus the following technical fields:
Field | Mandatory | Description |
---|---|---|
_corr | Yes | The correlation identifier that was used by the last command. It is usually a UUID. It is propagated the event if one is produced. |
_deleted | No | This boolean marks the aggregate instance as deleted. This is a logical deletion. |
_id | Yes | The identifier of the aggregate instance. It is usually a UUID. |
_jwt | No | The decoded JSON Web Token that was used by the last command. It is propagated the event if one is produced. |
_seq | Yes | A sequence number. This is the sequence number of the last event. |
_type | Yes | The aggregate type, which is composed as <application>-<name> . |
Commands
A command is a JSON document, which has the following technical fields on top of whatever you put in it:
Field | Mandatory | Description |
---|---|---|
_command | Yes | The name of the command. A reducer is selected based on this name. |
_corr | Yes | A correlation identifier. It is propagated throughout the flow. This is usually a UUID. |
_error | No | This Boolean indicates there is a problem with the command. It is set by the validator that comes with the reducer. |
_id | Yes | The identifier of the aggregate instance. It is usually a UUID. |
_jwt | No | The decoded JSON Web Token. It is propagated throughout the flow. |
_languages | No | An array of language tags in the order of preference. When a validator or some other component wishes to send messages to the user, it can use the proper language for it. |
_seq | No | A sequence number. If this field is present then its value should be the same as that field in the aggregate instance. Otherwise the command is ignored. |
_type | Yes | The aggregate type, which is composed as <application>-<name> . |
There are three built-in commands called put
, patch
and delete
. The put
command replaces the entire contents of the aggregate instance. The patch
command has the array field _ops
, which is a JSON patch that is applied to the instance. The delete
command performs a logical deletion. It sets the field _deleted
to true
. The reducers for these commands can be replaced.
Events
An event is a JSON document. It is generated when the old and the new states of the aggregate instance are different. When a reducer doesn't change the instance nothing will be generated. An event has the following technical fields:
Field | Mandatory | Description |
---|---|---|
_after | No | An optional field that carries the new state of the aggregate instance. |
_before | No | An optional field that carries the previous state of the aggregate instance. When this field and the previous one are present we speak of a "full event". |
_command | Yes | The name of the command that caused the event to be created. |
_corr | Yes | The correlation identifier that was used by the last command. It is usually a UUID. If you process the event you should always propagate this. |
_id | Yes | The identifier of the aggregate instance. It is usually a UUID. |
_jwt | No | The decoded JSON Web Token that was used by the last command. If you process the event you should always propagate this. |
_ops | Yes | An array of operations as described in RFC 6902. It describes how an aggregate instance has changed after the reduction by a command. |
_seq | Yes | A sequence number. There should not be holes in the sequence. This would indicate corruption of the event log. |
_timestamp | Yes | The timestamp of the event creation in epoch millis. |
_type | Yes | The aggregate type, which is composed as <application>-<name> . |
The Kafka Topics
The external interface at runtime is a set op Kafka topics. Their names always have the form <application>-<type>-<purpose>[-<environment>]
. The following topics are expected to exist (the names are the purpose):
Name | Mandatory | Description |
---|---|---|
aggregate | Yes | On this topic the current state of the aggregate is emitted when it has changed. |
command | Yes | Through this topic commands are received. It is the only input of an aggregate. |
event | Yes | On this topic the events are emitted, which contain the changes between two subsequent aggregate versions. |
event-full | Yes | The events are also emitted on this topic, but here they have two extra fields. The _before field contains the previous state of the aggregate instance, while _after contains the one after the reduction. This is for consumers that want to do other kinds of analysis than with the plain difference, which is in the _ops field. |
reply | Yes | On this topic either the new aggregate states or the commands that didn't pass validation are emitted. The topic is meant to be routed back to the end-user, for example through Server-sent Events. |
unique | No | This topic is only required when a "unique" MongoDB expression is given to the aggregate object. Commands will be re-keyed on this topic using the key values generated by the expression. |
The number of topic partitions should be the same for all topics. This is the upper limit for the parallelism you can achieve for one aggregate. It is best to provide more partitions than the level of parallelism you want to start with. This allows you to scale out without having to extend the number of partitions, for which down time would be needed if you use Kafka. Note, however, that partitions don't come cheap.
Storing the State
The state is always stored in MongoDB in the collection with the name <application>-<type>
[-<environment>]
. For better observability you can also store the events, commands, etc. by adding a few parts to your application as in the following example. It assumes you use the environment
configuration setting. If that is not the case then you should remove the -${ENV}
suffixes. The AGGREGATE_TYPE
parameter would be the full aggregate type. You can also see that the parts use the predefined aggregate streams.
---
- type: "stream"
name: "save-commands"
fromStream: "${AGGREGATE_TYPE}-command"
pipeline:
- $set:
_id:
id: $_id
corr: $_corr
command: $_command
- $out: "${AGGREGATE_TYPE}-command-${ENV}"
- type: "stream"
name: "save-events"
fromStream: "${AGGREGATE_TYPE}-event"
pipeline:
- $set:
_id:
id: "$_id"
seq: "$_seq"
- $out: "${AGGREGATE_TYPE}-event-${ENV}"
- type: "stream"
name: "save-deleted"
fromStream: "${AGGREGATE_TYPE}-aggregate"
pipeline:
- $match:
_deleted: true
- $out: "${AGGREGATE_TYPE}-deleted-${ENV}"
- type: "stream"
name: "save-invalid"
fromStream: "${AGGREGATE_TYPE}-reply"
pipeline:
- $match:
_error: true
- $set:
_id:
id: $_id
corr: $_corr
command: $_command
- $out: "${AGGREGATE_TYPE}-invalid-${ENV}"
Uniqueness
Sometimes it is important that aggregate instances are unique according to some business criterion and not only the aggregate identifier. This can be achieved by giving the aggregate object a MongoDB expression that generates unique values from the commands. Commands with a different aggregate identifier but the same unique business value will map to the same aggregate instance. The aggregate identifier of the instance will be the aggregate identifier of the first command with the unique value.
Whether this is useful or not depends on the use-case. When you have a stream of objects that come from another system and where the desired uniqueness has no meaning then this feature makes it very easy to consolidate that stream correctly in aggregate instances. However, when duplicates could be created accidentally, this feature would promote the overwriting of data from different users. In that case it is better to add a unique index to the MongoDB aggregate collection.
When you use this feature, the Kafka topic with the purpose unique
should exist. You should also make sure all commands have the fields that constitute the unique expression.
Unique expressions involving only one scalar field are very simple. The expression is then "$<field-name>"
. Sometimes, however, uniqueness can depend on several fields. You could write an expression that turns this into some scalar value, but that wouldn't be efficient. You could not make use of a MongoDB index. Therefore, a unique expression is also allowed to be a JSON object. You would write the expression like this:
{
"field1": "$field1",
"field2": "$field2"
}
As with scalar values the field references are used to extract the values from the command. The result is a plain equality expression for MongoDB.
Access Control
Next to role-based and function-based access control it is sometimes also necessary to manage access to individual aggregate instances. A user may, for example, be allowed to issue a certain command to a certain type of aggregate, but not necessarily to all instances of that type.
The aggregate part checks if the field _acl
is present in the current state of an aggregate instance. Without the field the command is allowed. Otherwise it should contain a subobject, where each field is the name of a command. The value is an array of role names, which is matched with the field /_jwt/roles
in the command. If the intersection is not empty then the command is good to go. If there is no field in _acl
that corresponds to the command then the fallback field write
is tried. If that is also not available then the command is allowed. If the /_jwt/sub
field is system
, then the command is always allowed.
In the following example there are constraints for the commands get
and put
.
{
...
"_acl": {
"get": ["get_role"],
"put": ["put_role"]
}
...
}
The following command would be denied because the user who sent it doesn't have the put_role
:
{
"_id": "21f38d06-cbc2-4ec4-9c1c-271285776cc7",
"_corr": "e4179cf3-886a-4457-8d18-94cf0db7f760",
"_type": "plusminus-counter",
"_command": "put",
"_jwt": {
"sub": "memyselfandi",
"roles": ["get_role"]
},
"value": 0
}
The HTTP API
JSON Streams doesn't have an API. It only works with Kafka topics. It is, however, possible to interact with managed state in a generic way. The repository pincette-jes-http is an example of an HTTP-server that provides this. It's a light weight process you can spin up with a port.
An Example
The following example is the aggregate plusminus-counter
. It has the commands plus
, minus
and put
. All have an effect on the field value
.
---
application: "plusminus"
version: "1.0"
parts:
- type: "aggregate"
aggregateType: "plusminus-counter"
commands:
plus:
reducer: "plus.jslt"
validator: "validate_plus.yml"
minus:
reducer: "minus.jslt"
validator: "validate_minus.yml"
put:
reducer: "put.jslt"
validator: "validate_put.yml"
plus.jslt:
.state | {
"value": .value + 1,
*: .
}
minus.jslt:
.state | {
"value": .value - 1,
*: .
}
put.jslt:
.command | {
"_command": null,
*: .
}
validate_plus.yml:
---
include:
- "operator.yml"
conditions:
- _command: "plus"
validate_minus.yml:
---
include:
- "operator.yml"
conditions:
- _command: "minus"
validate_put.yml:
---
include:
- "type.yml"
conditions:
- _command: "put"
- value: 0
$code: "INIT"
type.yml:
---
conditions:
- _type: "plusminus-counter"
operator.yml:
---
include:
- "type.yml"
conditions:
- value:
$exists: false
$code: "OPERATOR"
The first two reducers move the context into the state
field. They change the value
field and just copy all the others. The last reducer is for the put
command. It moves the context into the command
field, because the whole command will become the new state. Only the _command
field is removed.
You don't have to provide the put
command, because it is built in. The reason to add it here is validation. You could also import a common put
implementation in JSLT.
The validators check the command names and if the commands are for the right aggregate type. The latter is not strictly necessary, because commands of the wrong type are ignored by the aggregate. For the commands plus
and minus
an extra check is done. The value
field should not be present. In the case of the put
command the value
must be zero.