Daniel Sandor, Andrea Bobanac
DATA ENGINEERS
Introduction
The Transactional Outbox pattern is a design pattern used in software development to ensure reliable and consistent communication between different components or microservices in a distributed system. It is particularly useful in scenarios where you need to update the state of an object and notify other services of this update consistently.
The Transactional Outbox pattern utilizes database tables to capture transactional changes and domain events and message brokers to process domain events asynchronously.
Dataphos Publisher can make that process faster and simpler by forming domain events at the source without the need for a separate database table. The domain events, or business objects in Publisher terminology, are formed from the source data and published to one of the supported message brokers to be processed by another service.
Maintaining consistency in distributed systems
In a distributed system, where data is spread across multiple nodes or servers, achieving and maintaining consistency becomes a significant challenge. Consistency is one of the key properties of the ACID (Atomicity, Consistency, Isolation, Durability) model, which defines the properties that ensure reliable transactional processing in databases.
In a database, transactions are often designed to be atomic, meaning they either complete successfully in their entirety, or they are rolled back entirely. This ensures the consistency of the data in the database. If an operation fails after the database transaction, but before other related actions (such as producing a message to message broker) take place, rolling back the database transaction will maintain data integrity.
If producing messages to a message broker or any other external system were done before the completion of the database transaction, a failure in the database transaction would leave the system in an inconsistent state. The database transaction would be rolled back, but the messages produced to the message broker cannot be rolled back. This leads to data inconsistencies in downstream services.
The Transactional Outbox pattern comes in handy to prevent such situations and maintain the desired consistency between services.
Transactional Outbox pattern
The Transactional Outbox pattern uses an outbox table (e.g., PostgreSQL table) to store business event data. For example, an Order service would split the order creation process into one insert in the orders table, and one insert in the order lines (items on the order) table at the least. Then, the whole JSON (the order with order lines) would be inserted in the outbox table with the context of the type of transaction. This event would then be extracted from the outbox table and published to a message broker (e.g., Apache Kafka) using some CDC (Change Data Capture) solution.
While the Transactional Outbox pattern provides a robust mechanism for maintaining consistency in distributed systems, it also comes with certain challenges and drawbacks:
- Complexity: Implementing the Outbox Pattern can add complexity to your system, as you need to manage an outbox table, a message relay, and a message broker.
- Latency: Asynchronous domain event processing introduces latency, as messages are sent and processed after the local transaction is committed, which may not be suitable for all use cases.
- Message ordering and duplication: If not carefully managed, the order of message delivery to other services might not be guaranteed. Also, the pattern can lead to message duplication.
Outbox Pattern architecture (source)
Dataphos Publisher
The Dataphos Publisher was developed for running a constant flow of ready-to-digest data packets across your Cloud infrastructure – sourced directly from a database.
Current data ingestion solutions, like CDC, provide data extraction with minimal message formatting capabilities. The Dataphos Publisher provides a data ingestion pipeline that forms well-defined, structured business objects at the source according to a specified configuration.
The Dataphos Publisher lets the user define a query and how the results of the query should be assembled into a structured message. The Publisher takes care of serializing the data, optionally encrypting it, and publishing it to a message broker. It can be used for extracting historic data, as well as for real-time data capture.
Meaning, that the outbox table of the Transactional Outbox pattern could be avoided and instead the Dataphos Publisher could be used in real-time to track changes and publish events to a message broker. That is, the Dataphos Publisher can be used as a faster and programmatically simpler replacement for the Transactional Outbox pattern. No new tables need to be added to your application, and the application can handle only the transactional changes without the domain event context.
Transactional Outbox pattern using the Dataphos Publisher
Dataphos Publisher for maintaining consistency
To demonstrate how the Dataphos Publisher could be used to maintain consistency between services, a Spring Boot application that handles Orders was created. The application is a simplified demonstration of a real-world backend system that communicates with a database.
Spring REST APIs were created with routes for standard CRUD operations to handle Orders. When the Spring Boot Application starts, tables ‘orders’ and ‘order_lines’ are created in the operational PostgreSQL database. The routes enable the user to manipulate Order objects in the database. HTTP requests can then be sent via Postman to create, update, or delete order data in the tables.
Three Dataphos Publisher instances are utilized to capture data: one that reads data based on the created_at audit column and has “OrderCreated” defined as the domain event type in the metadata, one that reads data based on updated_at audit column and has “OrderUpdated” defined as the domain event type in the metadata and one that reads data based on deleted_at audit column and has “OrderDeleted” defined as the domain event type in the metadata.
All three instances form events and publish them to a message broker. Apache Kafka was chosen as the destination because of its ordering guarantees and high performance. All instances publish data to the same topic since every event is related to the Order domain. The Publisher publishes these events to Apache Kafka in the order they were executed on the source database. For example, an “OrderUpdated” event will appear after the “OrderCreated” event for a specific order.
The Spring Boot application, Apache Kafka, the PostgreSQL database, and the Dataphos Publisher are all deployed as Docker containers using Docker Compose.
The implementation and configuration files for this demonstration can be found on a public GitHub repository.
Spring Boot Order application
When the Spring Boot application is started, the orders and order_lines tables are created in the PostgreSQL database. The tables are indexed for efficient querying by the Publisher. Also, REST APIs that support CRUD operations on the PostgreSQL database objects are exposed.
PostgreSQL database
Both tables in the PostgreSQL database have the following audit columns: created_at, updated_at, deleted_at, and is_active. The audit columns signify the state of the database row. The created_at column is updated by the application when an order is created. The updated_at and deleted_at columns contain a null value until an update or a delete operation occurs for that order. These are timestamp columns updated with the timestamp at the moment of the respective operation. The is_active column is a utility boolean column that signifies if a row should be observed by applications that read the data. These columns form an extended SCD2 schema usually found in data warehouses. These columns are required for the Publisher to be able to query specific events on the data.
Generally, the orders table contains information about the order. Who placed the order, what time they placed it, the shipping address associated, payment method, etc. It often doesn’t contain any information about what was ordered. The order_lines table generally contains information about what was ordered; this is done because a single order can be comprised of multiple items. So the order line would specify the item ordered, the quantity ordered, and so on, and there would be one line for each different item ordered. In this example the price and name of the product are also in this table for simplicity.
REST API
The REST API accepts JSON objects as the HTTP request payload. Separate routes are exposed for creating, updating, and deleting orders. The JSON objects represent the order that was placed. It contains all the required information, including each order line. This information would usually be provided by an application that provides a user interface in an order management system.
Dataphos Publisher
The Dataphos Publisher focuses on forming structured objects directly at the source and publishing them to a message broker to allow a multitude of consumers to independently consume and process the data in their own individual pipelines. It comes tailor-made for a situation where you need a fast pipeline from your source database to a downstream consumer of business/domain events while maintaining consistency and application simplicity. The Publisher needs to be configured to run multiple jobs to handle outgoing events.
The Publisher stores configurations in a PostgreSQL operational database called the Metadata database. That includes the configurations for the data source (mainly connection details for a database of a supported type), the destination (message broker configurations), and the Publisher instance configurations.
The Manager component is a REST server that exposes API endpoints used for configuration management. The Publisher’s Web UI interface communicates with it to allow users to configure their data ingestion jobs using YAML configuration files.
The ingestion job configuration is defined in the “instance” YAML file. The query to capture data, the domain event structure, format and metadata, and the source and destination are defined here.
-
The instance YAML configuration looks like this:
publisherName: publisher-orders-created
sourceName: publisher-postgres-source
destinationName: publisher-kafka-destination
serializationType: Json
encryptionEnabled: false
fetcherConfig:
useNativeDriver: true
fetchingThreadsNO: 1
queryIncrementType: MINUTE
queryIncrementValue: 1
businessObject:
description: Publisher - Transactional Outbox
objectGroup: order
additionalMetadata:
eventType: OrderCreated
definition:
- id
- order_info:
- created_at
- updated_at
- deleted_at
- is_active
- purchaser
- payment_method
- order_lines:
- product
- quantity
- price
- order_id
arrayElements:
- order_lines
groupElements:
- order_id
keyElements:
- id
idElements:
- id
- created_at
- order_id
query: SELECT orders.id AS id, orders.purchaser AS purchaser,
orders.payment_method AS payment_method, orders.created_at AS created_at,
orders.updated_at AS updated_at, orders.deleted_at AS deleted_at,
orders.is_active AS is_active, order_lines.product AS product,
order_lines.quantity AS quantity, order_lines.price AS price,
order_lines.order_id AS order_id FROM orders JOIN order_lines ON orders.id =
order_lines.order_id WHERE orders.created_at >= {{ .FetchFrom }} AND
orders.created_at < {{ .FetchTo }};
-
The serializationType field defines the serialization format for the domain events. Events will be serialized to JSON.
-
The encryptionEnabled field is a boolean flag that indicates if encryption will be used in the Publisher instance. No encryption will be used.
-
The two variables queryIncrementType and queryIncrementValue as a pair determine the upper bound of the message fetch period. The Publisher processes data in runs. Each run processes a period of the data from the source database. In the upper example, the Publisher will try to increment up to 1 minute in advance, but will return to the current timestamp if the incremented one is ahead of the current timestamp. The increment period here can be small because of the low activity on the source database. One run will defer to a second of the data from the source, since little time is spent on processing the data.
-
The business object definition defines the business object/domain event structure and fields. The main transformation done to the data is grouping. The data is grouped by the unique order ID, meaning, all order lines for one order will be collected as an array of values in one object.
The keyElements field selects a unique key from the data. The unique key is given to each business object, and stored in the message metadata. The key is also used as the message key when publishing data to Kafka. It is important for that key to be the order ID to keep the ordering between events due to Kafka’s partitioning semantics.
Additional metadata contains additional parameters (keys) that will be stored as message headers when published to the broker. The domain event type is recorded in the headers. -
The query that will be executed on the source database must be defined. The query needs to include a time interval with template placeholders. Since the Publisher fetches data in runs, these placeholders will be replaced by increasing, sequential timestamp values for each run. The database query needs to be compatible with the given business object definition, i.e., it needs to select all the required fields. Here, the created_at timestamp column is used to fetch newly created orders.
One Worker component is created for each active instance configuration. There can be multiple active workers with different configurations simultaneously. Three Publisher instances are deployed for this demonstration, so there are three worker YAML files. Once a Worker is created, it processes data in a loop until a previously defined stopping point, until it’s stopped by the user, or until an error has occurred. The Worker fetches the data, formats it, serializes it, and then publishes it to the predefined destination (Kafka in this example).
Managing orders
In a real-world scenario, a user would place, update, or delete an order using a sales website. This website offers order management through a frontend application. This application would contact the backend order application over the REST API using JSON objects.
Creating orders
The following is an example JSON object that represents the order placed through a sales website:
{
"purchaser": "Leon",
"paymentMethod": "card",
"orderLines": [{
"product" : "jeans",
"quantity": 2,
"price": 80.00 },
{
"product" : "sneakers",
"quantity" : 1,
"price": 70.00
}]
}
JSON Order example
The order is stored in the operational database in the order and order_lines tables. To process this order event asynchronously and reliably, one could store the event in a separate outbox
table and use CDC to send it to a message broker.
In this case, the Dataphos Publisher will read the data stored in the order and order_lines tables, form a domain event with the “OrderCreated” event type and send it to Apache Kafka. The order could then be used in other services, for example, a service could track of the inventory of products in a warehouse and provide an API to query it. This event would then cause the inventory for the jeans and sneakers products to be reduced by two and one, respectively.
Updating orders
A user could decide to update a placed order. For example, a user could decide to order more sneakers. In that case, the following JSON would be sent to the order application:
{
"orderId": 1,
"purchaser": "Leon",
"paymentMethod": "card",
"orderLines": [
{
"product" : "sneakers",
"quantity" : 5,
"price": 70.00
}
]
}
Updated JSON Order example
In this case, the Dataphos Publisher would pick up on the update based on the updated_at timestamp column that will be updated in the database. The event would be sent to Apache Kafka with the “OrderUpdated” event type.
The inventory service would then reduce the inventory for the product by four.
Deleting orders
If the user deletes the order, the deleted_at timestamp column would be updated, without actually removing the database row. This way, the Dataphos Publisher would send the “OrderDeleted” event to Apache Kafka.
The inventory service could then increase the inventory state for the jeans and sneakers products by two and five, respectively.
Events on the message broker
Events published by the Publisher to Apache Kafka can be viewed over the Redpanda console. They are stored on a Kafka topic called orders
. They are published in the order they occurred on the source database.
The first event is the order that was created.
The created Order event
The created Order headers
The upper image displays the domain event, and the lower image shows the message headers, including the event type.
The second event is the update of the existing order.
The updated Order event
The updated order headers
The upper image displays the updated domain event, with a changed quantity of the sneakers product. The lower image, shows the “OrderUpdated” event type. To capture only the changed data in the order lines, change the Publisher’s query to use the updated_at timestamp column in the order_lines table.
The final event is the deletion of the existing order.
The updated Order event
The deleted Order headers
Conclusion
In distributed systems, different services often need to operate on the same data, which can lead to inconsistency issues. Traditional approaches like the Two-Phase Commit can introduce tight coupling between services and may not scale well.
The Outbox Pattern helps mitigate these issues by ensuring that all services receive updates and become eventually consistent. However, it doesn’t offer the possibility of forming business objects at the source according to your business needs. So, here comes Syntio’s Dataphos Publisher.
Dataphos Publisher removes the process of creating the outbox table and, in that way, reduces the complexity to achieve the same outcome. It provides a stable, fast, and efficient way to format your data at the source and publish it to a message broker for further consumption. Because of this, Dataphos Publisher represents a more efficient alternative to the transactional outbox pattern.