Data Orchestration with Dataphos Persistor: Technical Overview

Emil Huzjak

DATA ENGINEER

Introduction

Safe data storage and reusability are of utmost importance in the cloud, especially with highly active message streams, pipeline testing, and everything in between. Whether you’re adding new pipelines or need historical data to process it again, keeping track of all messages sent in the past is a daunting task for any data producer, so independently storing the data must be done if you ever plan on accessing the data at a later time. Persisting the data offers long-term benefits: constructing a versatile and affordable data lake broadens the analysis scope and facilitates a future transition to a data warehouse.

Our product, the Dataphos Persistor, is easier to deploy and use than many existing solutions for data storage and allows you to build a customizable, structured data lake and selectively reuse the data at your own pace across platforms. We probed the market to determine needs and delivered accordingly. There’s a good chance your pipeline is already compatible with Persistor. If you haven’t yet automated all aspects of data access in your organization or use case, keep reading to discover our proposal.

What is Persistor

Persistor serves as an organizer for data originating from a message broker. It helps you transfer messages from your message broker to a cheap cloud storage to be analyzed or to be stored until you need them. It offers indexing of messages with detailed granularity in source attributes and time intervals, which it uses to resubmit the data to other topics further down the line when needed. It can persist and send data from most message brokers, from the lesser known to the mainstream ones, such as Apache Kafka, GCP Pub/Sub and Azure Service Bus, and store it in a wide array of storage providers.

Persistor can later read from the data lake for resubmitting to another topic, a process which is highly configurable to help you reprocess the exact data you want to. Resending data upon request to various pipelines is essential for modern, agile data analysis. It enables the data lake storage to play a dynamic role in both testing and production systems. Persistor allows you to specify the kind of data you wish to resubmit via message attributes in a declarative manner, down to the message level, and works in the background, thereby removing the bulk of manual work from resubmitting.

The following diagram is a simplistic overview of the data storage functionality of Persistor and its indexing of metadata. The message source is pictured on the left and the storage on the right. After a batch of messages is persisted, the metadata about those messages is sent to an additional topic where Indexer comes into play and stores it in a Mongo database. This is how Indexer works with Persistor to keep the metadata indexed and catalogued.

High level diagram of Persistor core and Indexer

In case of mishaps such as connection errors or server downtime, there’s no need to worry. A ‘dead letter’ topic offers a secure space to temporarily archive problematic data for later processing, to trigger alerts, and to diagnose issues.

The resubmitting workflow is even simpler. Resubmitter has a connection to your storage, indexing database and broker. An API call will prompt it to fetch the metadata from the database (using the indexing API) and find out which files the relevant messages are in. It then fetches those files, extracts the messages and publishes them to the topic.

High level diagram of resubmitting

As a user or developer, the only way you interact with Persistor is if you want to change its deployment configuration (like the storage destination and topic it consumes from), or more frequently if you want to forward a subset or all of the messages to another broker.

What can Persistor help You with?

  • Saving data: A data lake can act as an unyielding anchor for all things broker-related, and with Persistor it is not just a heap of bytes, but a curated repository of information, functioning as a landing zone from which all analytics or data quality assurance tools can operate.

  • Data reusability: Discovered a bug in your processing or want to make a change? Perhaps a property wasn’t calculated correctly? Easily resubmit the original data to the topic after reconfiguring the pipeline.

  • Adding new processing pipelines: The Resubmitter can relay existing messages to a different topic, irrespective of their age. Your data remains accessible and intact.

  • Manageable data chunks: Tailor the batching granularity to match your data’s frequency and size. Handling payloads in the thousands? Opt for larger batches to ensure manageability. Once the data volume diminishes, Persistor will periodically store information, ensuring up-to-date storage.

  • Arranging data by type: Add as many message keys as necessary, and Persistor will categorize data into respective directories. This structured storage approach ensures that specific data is easily accessible, removing the need to sift through unrelated information. Whether your data varies daily or hourly, you can segregate messages by time. Alternatively, consolidate them in one directory for collective access when needed.

  • Automatic dead lettering: Uncertain about the destination of misrouted data? The dead-letter topic serves as an economical hub where unforeseen issues can be collectively assessed and addressed, and it’s easy to set up all sorts of services for analytics and alerting. With the capabilities and power that Persistor brings to the table, the dead letter topic will mostly be collecting dust. However, it stands ready for those moments when the going gets tough

  • Alerting: Leverage the dead-letter topic to set up automated alerts for failure rates surpassing predefined thresholds. To bolster your pipeline error response time, consider integrating Persistor.

Benefits of using Persistor

  • Fast and lightweight resubmission: Resubmitting starts quickly and takes up little resources. The heavy lifting is done by the data lake and message broker. The criteria by which data is picked for resubmission can include as many message attributes as needed.

  • Robust, configurable storage: Persistor populates your data lake with messages and structures them according to message attributes. A data lake can serve as the basis of multiple analysis processes and facilitate data migration to different technologies.

  • Cost-consciousness: Persistor can easily handle a large amount of data cheaply and efficiently. Considering the expenses and time lost when manually resending data and figuring out which data needs to go where, the choice to use Persistor is a no-brainer.

  • One-size-fits-all solution: Persistor eliminates the need to context switch between provider-specific solutions for data backup. The same product can be deployed regardless of the broker in question and only a few changes to parameters are required.

  • Unaltered data: Even after multiple resubmissions, Persistor ensures message content remains unchanged, eliminating concerns over data format consistency.

  • Seamless integration: Deploy the entire Persistor workflow effortlessly using straightforward, declarative infrastructure-as-code solutions.

  • Modularity: Deploy Persistor initially just for data backup and decide on data reusability when it becomes relevant – or activate both functionalities right away. Align the choice with your and your clients’ immediate needs.

  • Efficient dev-test cycle: Resubmitter bridges the gap between development and testing. It was never easier to send data samples anywhere in the blink of an eye. When debugging or performing end-to-end tests, choose a subset of your data to send into a pipeline for testing purposes, and do it declaratively by supplying Resubmitter with attribute filters on the messages.

  • Targeted unit tests: Pipeline output efficacy and accuracy hinge on data. At times, potential issues might be overlooked due to insufficient failure-triggering data. Resubmitter with its dynamic filtering allows for testing with data crafted to expose specific vulnerabilities.

How does it work?

The Persistor stack consists of containerized components that communicate asynchronously and are loosely coupled via message brokers and service endpoints. Its versatility, aided by containerization, permits deployment across various environments with minimal configuration tweaks, especially in Kubernetes-supported ones. Persistor’s foundation includes two topic consumers and two API endpoints written in the Go programming language, complemented by a NoSQL database (MongoDB). Upon configuring Persistor, it acquires its unique subscription, alongside pre-existing ones on the topic. Incoming messages are batched in avro format and dispatched to the designated cloud storage.

The message attributes set at the source – usually defined as a mapping from keys to values – can be used to structure the data lake. Specific attribute keys can be configured when deploying Persistor. The values at these keys are interpreted as the message versions. Each message is sent to the directory on the data lake that corresponds to the version. By assigning meaningful message attributes, depending on the type or source of the message, you tell Persistor to corral batches of messages into blobs on the data lake.

These versioning attributes, combined with other message metadata, are batched, indexed, and made easily searchable with just one API call to the Indexer component. The metadata includes:

  • All producer-defined attributes

  • The originating topic

  • Unique message ID

  • Publish and ingestion time

  • The exact storage destination of the message

The Mongo database can be efficiently queried for this metadata for all messages within a specified time frame and filtered by the value of any number of attribute keys. Rather than querying this database directly, use the Resubmitter.

When the need arises to send those messages to another topic, a request to the Resubmitter API can be sent. The request header can incorporate the time range and the name of the source topic, while the URL accomodates query parameters such as the offset from the start of the topic and the maximum number of messages to be sent in case you would like to send only a sample.

With four components and a data base, here’s the exciting part: Not every topic you want to back up requires all components. In a runtime accessing multiple topics, deploy only one Persistor consumer component per topic. The remaining components can handle operations with messages from several topics concurrently, enabling topic resubmission anytime, anywhere.

Overview of the components

Here we lay out the interaction between different parts. The four components of Persistor and a Mongo database can run on premise, in the same cluster or on different clusters.

Overview of all components

Persistor – core

This is a processor for messages which arrive from a topic. It does two crucial things: store batches of messages in your data lake and send the topic-relevant metadata, as well as the information about where they are stored, to be indexed for later querying. It can also forward messages to a dead letter topic in case something goes wrong. This means you never lose any of your data.

Payloads and attributes are naturally batched in a serialized format and stored in blobs according to proximity in time, which means that with no other requirements, messages that arrive one after the other will likely end up in the same batch, and moreover, two consecutive batches will likely be accessible in the same data lake folder.

But just organizing by time is not enough when you have a highly complex system with many moving parts, so Persistor will direct messages into separate folders based on their attributes. These attributes can define the source of the data, such as the name of the device or system if multiple producers publish to the same topic, or they can identify the type of data. Suppose you have two topics you want to persist to the same data lake and arrange the data into folders based on the topic, day and hour of arrival and some version attribute. Your storage could be organized like this:

Structuring a data lake using metadata

One instance of Persistor is configured to save to the topic-one folder and another to topic-two. The name of a batch blob contains the identifier of the first message in it. The path format can be reconfigured to change the root folders from topic names to something else. It can contain more information about timestamps like the month and year, or the version can come first when new data is received in case the version is the primary method by which you want to access the messages.

Note also that indexing is optional. If you just want to persist messages and are fine with using another solution to interface with the storage when the need arises, deploying just the core component is fine. However, as you’ll hopefully agree, indexing is an indispensable feature of Persistor so let’s look at the other components.

Indexer

The core Persistor sends metadata to another topic, one which Indexer listens to and which you provision for that purpose. The job of the Indexer is to store that metadata and make it quickly accessible. For this purpose, we make use of Mongo, a non-relational document-based database that you can deploy or use an existing one if you already have it. Indexer makes a new Mongo collection and builds indices on it that are relevant to the incoming message metadata. This storage option is very efficient because, as most documents share properties and have similar values, the data can be compressed to a fraction of its actual size. A mongo database can contain many collections, and multiple indexer instances can write to the same collection or each use their own for maximum generality.

Indexer API

The indexer API is a lightweight component that responds to requests for metadata and reads from the Mongo database. You can use it yourself if you know the specific metadata you’re looking for, but the main visitor to the Indexer API is the Resubmitter.

Resubmitter API

So Persistor has been running for a week and now you need to replay the stream on another topic because you’ve added a new processing pipeline. Instead of pulling those messages from the source again and triple-checking the pipeline configuration is correct, you can just deploy the Resubmitter, send it a request with a time interval from which you need to copy over messages, as well as filtering options, and fill the topic in no time! Resubmitter will communicate with the data lake cleverly and extract all relevant messages in parallel. Nothing gets left behind when you want to resubmit. In case a blob was erroneously deleted in the meantime or a connection problem ensues, a partial resubmission will occur with the data that remains available. The endpoints served by Resubmitter include:

range – Take all messages published on a source topic in a given time period and send them to the resubmit topic. You define the source topic and the time interval in the request body.

resubmit – Resubmit specific messages with known message IDs. A message ID is a combination of the topic name and the ID of the message on the topic and uniquely identifies the specific message provided you don’t have two topics of the same name indexed on the same mongo collection.

query – Resubmit messages whose attributes and metadata match the mappings in the parameters. The parameters can include storage-relevant values like the path to the file where the message batch is stored.

The endpoints require the mongo collection name and the topic name to which resubmitting needs to be done as URL parameters, while metadata filters about the source, time and other information is provided in json format in request headers. When configuring multiple filters, you can decide whether a message needs to pass just one to be resubmitted, or if they all need to be satisfied. In this way you can combine complex logical statements featuring equality and comparison.

Resubmitting can be used to introduce data into a pipeline, or to send the same data after making changes to the processing logic. Suppose a production line has sensors which measure quantities related to physical conditions and throughput, and a pipeline is performing analysis on two of those quantities. If you now start tracking a third indicator, you should submit the data from the production line, but only for the new sensor. An appropriate solution is to filter the data by both the production line and indicator attributes. If you would like to test it incrementally without sending all the information on the first go, set the lower and upper bounds for the publish time.

Clipping of values can also be done if you only want to examine data in a specific number interval. We often notice abnormal values or outliers, which can cause pipelines to behave in unexpected ways, but are hard to test for in case you can’t reliably replicate them. By using the appropriate filter, you only resubmit messages with values above a desired threshold which lets you spot the errors faster and with certainty.

Take Action Now: Make data reusability a breeze

Ready to experience the benefits of Dataphos Persistor for yourself? Head over to our website now if you want to learn more about Syntio and how we can help you in your data journey (Homepage) and if you want to learn more about Persistor and how to get started visit our documentation page (Persistor).

Both community and enterprise versions of Persistor are at your disposal, with the enterprise edition offering round-the-clock support, access to new feature requests, and valuable assistance for developing use cases to drive your business forward.

We strive to eliminate the need to constantly worry about data management on a low level so you can focus on what really matters – business decisions. We hope Persistor is a huge step in this direction and that you can make it a part of your toolbelt in no time. Thanks for taking the time to read about our new product, and we look forward to hearing about how Persistor has made a difference in your life!