Schema Registry Comparison

Ena Dzanko, Juraj Komericki

DATA ENGINEERS

Introduction

In many messaging scenarios, the message payload is structured and therefore (de)serialized using a schema specification. Both senders and receivers of the messages will want to validate the data with a corresponding schema, but will also frequently change the format of the data. Schema Registry is a must-have component for every data management system that enables users to store, update and retrieve specified structured data, as well as check the validity of real data that is being sent to a certain target system. Inspired by Confluent’s Schema Registry and out-life experience, we developed our own Schema Registry that supports multiple message formats.

The comparison will be made with Avro message format, due to the fact that all of the Schema Registries included in the comparison support it, and we will cover basic operations with schemas, such as registering schemas and the corresponding data, updating and versioning, retrieving schema details as well as the schema specification.

Motivation

Imagine a scenario with two fictional companies: SynTistics Inc. and SynML Ltd. SynTistics is an independent statistical analysis company specializing in collecting datasets and performing exploratory analysis with them.

SynML Ltd. is a small machine learning company that uses schools of artificial intelligence to make the world a better place.

SynTistics provides SynML with hourly COVID-19 data (CSV data with 20 attributes) over a messaging system, and SynML consumes the data in a normal manner.

The workflow is simple and shown in the figure below

On the other hand, when a seamless change in a data model occurs (e.g. an additional attribute is added) and a consumer consumes such payload, trying to deserialize it, it will, in the worst case, make the consumer fail and panic. Custom data decoders should be made every time a change in a data model occurs which results in an unmaintainable environment in the long term.

The workflow in the figure below describes such faulty behaviour.

As a precaution, they employ Syntagmo, a fictional data engineering company, that proposes a solution with an intermediary component, with which this type of situation can never happen.

The figure below describes such a system.

The workflow is as follows:

  1. The SynTistics system registers a schema with 20 attributes

  2. It publishes the data on the input data topic

  3. Schema Registry internally checks if the input data adheres to the registered schema

  4. If the check is successful the data is sent to the valid data topic, otherwise, it is sent to a resubmission topic for possible use cases of saving the data, or to a dead letter topic if the data is rendered useless initially

Messages

As said before the observed format will be Avro so a small introduction would be useful.
Avro is a binary format protocol, known for its light overhead.
An Avro schema is defined in a schema ending in .avsc in JSON format
for example:

{
    "type": "record",
    "name": "Person",
    "fields": [
        {"name": "userName",        "type": "string"},
        {"name": "favouriteNumber", "type": ["null", "long"]},
        {"name": "interests",       "type": {"type": "array", "items": "string"}}
    ]
} 

This schema defines a Person which is a type record, a default type in avro, and with a fields field which is a JSON array holding the attribute specification.

So for communication an encoder and decoder are necessary for communication from point A to point B.

An encoder encodes a message conforming to the person schema like this:

We can see that some bytes determine metadata regarding the data types being encoded, and some are actual data, and this is the real magic of avro.

The next sections will say more about specific technologies for schema connoisseurs.

Azure Schema Registry

The Azure Schema Registry is a feature of Azure Event Hubs. It provides a central repository for storing schemas. This is a relatively new feature on Azure and is not recommended for production workloads.

Azure Event Hubs is a big data streaming platform and event ingestion service which can receive and process millions of events every second. Event Hubs represents the “front door” for an event pipeline, often called an event ingestor in solution architecture. The Azure’s Schema Registry is actually a feature of Event Hubs. We tried out and covered the use of the Schema Registry trough the Azure Portal as well as the Schema Registry Client. The summary of the steps as well as the results are noted below.

Concepts and prerequisites

Due to the fact that this feature relies on Event Hubs, some former steps need to be done in order to finally deploy a working Schema Registry. Prerequisites include the deployment of a resource group, Event Hub namespace and an Event Hub. The namespace hosts a Schema Registry with multiple schema groups alongside Event Hubs, and each Schema Group is a separate repository and a logical group that stores similar schemas. Finally, the schema we store in a schema group has a name, type, compatibility mode and a serialization type (just Avro for now).

Going on to the deployment, we’ve created a resource group sr-janitor-group and deployed an Event Hub namespace resource janitor-hub. We’ve created an Event Hub jan-hub1 within that namespace. It’s important to note that standard tier instead of basic tier is needed for Schema Registry feature.

Schema Registry via Azure Portal

Azure Event Hubs and Schema Registry are available and displayed as entities in the Azure Portal. In the previous step we’ve created the necessary resources in Event Hubs so we can go on to deploying a Schema Registry which includes creating a Schema Group for storing future schemas.

  • Create a Schema Group
    We’ve created a default schema group with avro serialization (the only one available) and backward compatibility. Other options include forward and no compatibility, but only with avro message format support.
  • Add or create a Schema
    Schemas can be created in an existing schema group by writing it or uploading an existing schema file. After creation all schemas can be found under the Schemas panel. In this example we’ve created our synschema1 in the default schema group, with the specification shown below.
  • Updating and versioning Schemas

    By selecting an existing schema we can easily make changes and validate them, thus creating a new version of that schema that we can save and potentially use in future.
    We can simply upload new versions by changing the schema specification in the user interface and clicking the Validate button. The validation should pass successfully, if not the Schema Registry will detect that the schema specification is not correct and will not create a new version.

Azure Schema Registry Client

In this section we covered the use of python based Azure Schema Registry Client which provides us with basic operations for handling schemas and data linked to those schemas.

Setup

Setup of a Local Client with Avro Serializator and connection to Azure Event Hub and Schema Registry in Python:

  1. Install the Azure Schema Registry Avro Serializer client library and Azure Identity client library for Python with PIP:
    pip install azure-schemaregistry-avroserializer azure-identity azure-eventhub
  2. Get connection strings for Event Hub and Schema Registry services
    1. Event Hub connection string:
      • Event Hub namespace → Shared Access Policies → RootManageSharedAccessKey
      • copy Connection string–primary key

    2. Schema Registry endpoint: Event Hub namespace → host name

      .servicebus.windows.net

  3. Configure Firewall

In Event Hub Namespace resource, under Networking → check Selected networks → Firewall: Add your Client IP address

Sending and retrieving data and schemas as events
Schema specification and message example:

# SCHEMA
{"namespace": "synexample.avro",
 "type": "record",
 "name": "Person",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favoriteNumber",  "type": ["null", "long"]},
     {"name": "interests", "type": ["string", "null"]}
 ]
}

# DATA
{"name": "Jure", "favoriteNumber": 22, "interests": "jogging"}

The code sections written below show cases of sending data and schemas to Event Hubs and retrieving schema specification and details from the Schema Registry. 

from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

# PARAMETERS
EVENTHUB_CONNECTION_STRING = '<primary_key_string>''
EVENTHUB_NAME = 'jan-hub1'
SCHEMA_REGISTRY_ENDPOINT = 'janitor-hub.servicebus.windows.net'
SCHEMA_GROUP = 'default'
SCHEMA_STRING = """{"namespace": "synexample.avro",
                    "type": "record",
                    "name": "Person",
                    "fields": [
                        {"name": "userName","type": "string"},
                        {"name": "favouriteNumber","type": ["null", "long"]},
                        {"name": "interests", "type": ["string", "null"]}
                     ]
                  }"""

The defined parameters are used as environment variables in Python and declared as:

os.environ [‘parameter_name’] = ‘value’

  • EVENTHUB_CONNECTION_STRING is found by going event_hub_namespace  Settings → Shared access policies  RootManageSharedAccessKey and copying the Connection string-primary key
  • SCHEMA_REGISTRY_ENDPOINT is the host name for event hubs namespace, found in the Overview panel → Host name in format namespace.servicebus.windows.net
  • SCHEMA_STRING is the schema specification which we will send

The eventHub_send.py will publish the data and the schema, which will be registered as an Event and if the schema registration was successful, the schema specification can be found under our default schema group. The eventHub_receive.py will pull event data that goes against a specific schema from a partition.

eventHub_send.py

# SEND
def send_event_data_batch(producer, serializer):
    event_data_batch = producer.create_batch()
    dict_data = {"name": "Jure", "favorite_number": 22, "interests: jogging": "blue"}
    payload_bytes = serializer.serialize(data=dict_data, schema=SCHEMA_STRING)
    event_data = EventData(body=payload_bytes)
    event_data_batch.add(event_data)
    producer.send_batch(event_data_batch)

# CREATE A PRODUCER INSTANCE
eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENTHUB_CONNECTION_STR,
    eventhub_name=EVENTHUB_NAME
)

# CREATE AN AVRO SERIALIZER INSTANCE
avro_serializer = SchemaRegistryAvroSerializer(
                  schema_registry=SchemaRegistryClient(
                                      endpoint=SCHEMA_REGISTRY_ENDPOINT,
                                      credential=DefaultAzureCredential()), 
                                      schema_group=SCHEMA_GROUP
                  )
                  
# PUBLISH EVENT
with eventhub_producer, avro_serializer:
      send_event_data_batch(eventhub_producer, avro_serializer)
 # RECEIVE
def on_event(partition_context, event):
  print("Received event from partition: {}.".format(partition_context.partition_id))
  bytes_payload = b"".join(b for b in event.body)
  print('The received bytes of the EventData is {}.'.format(bytes_payload))
  
  deserialized_data = avro_serializer.deserialize(bytes_payload)
  print('The dict data after deserialization is {}'.format(deserialized_data))
  
# CREATE A CONSUMER INSTANCE
eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=EVENTHUB_CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

# CREATE AN AVRO SERIALIZER INSTANCE
avro_serializer = SchemaRegistryAvroSerializer(
    schema_registry=SchemaRegistryClient(
        endpoint=SCHEMA_REGISTRY_ENDPOINT,
        credential=DefaultAzureCredential()
    ),schema_group=SCHEMA_GROUP
)

# CONSUME
try:
    with eventhub_consumer, avro_serializer:
        eventhub_consumer.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
except KeyboardInterrupt:
    print('INT')

Significant functions available in azure.schemaregistry library are stated below. In case when a schema doesn’t exist, is invalid or the data does not match the schema specification an exception will be raised.

 # CREATE A CLIENT
schema_registry_client = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)

# REGISTER A SCHEMA or UPDATE EXISTING
schema_properties = schema_registry_client.register_schema(SCHEMA_GROUP, SCHEMA_NAME, SERIALIZATION_TYPE, SCHEMA_CONTENT)

# GET SCHEMA BY ID
schema = schema_registry_client.get_schema(schema_id)

# GET ID
schema_properties = schema_registry_client.get_schema_id(schema_group, schema_name, serialization_type, schema_content)

GCP Schema registry

GCP has implemented schema registration and validation in conjunction with its Pub/Sub messaging system.

The motivation for having this centralized authority stays the same: monitoring message flow and versioning distinct schemas.

GCP supports schema versioning for two popular binary serialization formats: Apache Avro and Protocol buffers. Both are popular for having small overheads over its text-based counterparts JSON and CSV.

GCP docs say that a schema can be created as a standalone versioned resource, meaning it is not bound with Pub/Sub, associate schemas with multiple Pub/Sub topics, and validate inbound messages.

GCP offers us multiple ways to use a schema registry client, however in this blog a python based client will be used.

In the figure below a simple schema registration process is done via a simple client API

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# Set your GCP project params here
project_id = "syntio-schema-registry"
schema_id = "customer-schema"
avsc_file = "/home/juraj/dev/schema-registry/avro/schema/schema.avsc"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

# 1
schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
# 2
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

# 3
try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.") 
  1. We instantiate a new client for schema registration
  2. We define a new Schema object with necessary arguments: name, type and definition
  3. As a last step we register a new schema, on success the response is the name of the schema with its definition. An “AlreadyExists” error is returned when we want to register another schema with the same id

Getting schema metadata also takes a few lines of code

 try:
    result = schema_client.get_schema(request={"name": schema_path})
    print(f"Got a schema:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

 

A schema can be bound to a Pub/Sub topic on topic creation from the Cloud console

Schema validation is the last step for ensuring that the data that will be publish adheres to a defined schema

So if we send a message that doesn’t conform to the desired schema, it will not be published to the pub/sub topic (it will be rejected), otherwise it is normally sent to Pub/sub.

A code example taken from Google cloud pubsub docs:

 record = {"userName": "jkomericki", "favouriteNumber": 42, "interests": ["Data engineering", "Dogecoin"]}

try:
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    encoder = BinaryEncoder(bout)
    writer.write(record, encoder)
    data = bout.getvalue()
    print(f"Preparing a binary-encoded message:\n{data}")
    future = publisher_client.publish(topic_path, data)
    print(f"Sent message, id {future.result()}")
except NotFound:
    print(f"topic {topic_id} not found")

A result of publishing is a future object which has a result method that returns the ID which is assigned to the future object when the message is successfully published on Pub/Sub, as seen on the figure below

Janitor Schema Registry

We at Syntio have made our own variant of a schema registry, although the basic use case stays the same. A short overview of Syntio GCP Janitor features:

  1. Multiple format support
  2. REST API for schema management
  3. Schema evolution for JSON and CSV messages
  4. Ease of deployment via deployment scripts

Multiple format support

Text based formats:

  • CSV
  • JSON
  • XML

Binary based formats:

  • Avro
  • Protocol buffers

REST API for schema management

All schema related operations are implemented through a RESTful API, supported methods include:
Schema registration – for saving schema definition in schema registry/database
Schema retrieval – for schema retrieval
Schema evolution – infer new schema from message

Schema evolution for JSON and CSV messages

The basic use case for evolution in JSON and CSV is like with its binary based counterparts we want to register newly changed data that the publisher started publishing, so that the consumer can capture that change and continue working seamlessly.

Ease of deployment via deployment scripts

At Syntio, we are continuously trying to make the deployment process of our products as painless as we can.

That is why the only thing configuration wise, is following our deployment wiki for configurations and running our deployment script via Google Cloud Console bash.

Architecture

As we have already mentioned, Janitor represents a middle-man component which can make data easier to register, version and detect changes.

The workflow is as follows:

  1. A client publisher component first registers a schema to the Schema Registry module which is ran on Cloud run. The schema persists on the Cloud Firestore NoSQL storage.
  2. The publisher now starts publishing messages, each message is a trigger event for a Cloud Function (e.g. the Central consumer module) which validates this data with the schema that was initially sent.
  3. From this point there are several cases that are handled, if the message is valid then it is sent to a valid topic without a doubt, if the message cannot be validated and is not of JSON or CSV type (Avro, Protobuf and XML) it is sent to a deadletter topic. If the messages cannot be validated and are of JSON or CSV type then they are sent to invalid-topic-json or invalid-topic-csv for later possible evolution. All these validations are performed by communicating with the Schema Registry.
  4. A Cloud scheduler schedules the interval of invoking the Puller&Cleaner module, which is an HTTP triggered cloud function. The function pulls a batch of messages from the before mentioned invalid topics and tries to evolve them using Schema Registry and third party dependencies, if the evolution is successful, the new schema is registered in the Schema Registry module and the message is sent to the valid topic, otherwise, the message is sent to a deadletter topic.

Usage example

We are going to simulate a sensor data stream which will adhere to a defined json schema, also an unregistered (invalid) message will be sent every once in a while so that we can use every Janitor module. For this case we would like to use a message example in json format as shown below. We can do the same with Avro format like in GCP and Azure, but we wanted this showcase to be compatible and readable for PC, which supports only JSON and CSV.

Valid message example:

{"id":1, 
"temperature": 27, 
"unit":"C"}

Invalid message example:

 {"id":1, 
"temperature": 27,
"humidity": 0.56, 
"unit":"C"}

Suppose there is a dummy produce like in the following listing:

class Publisher:
    id = ""
    version = -1
    def __init__(self, project_id, topic_id, duration_secs = 20):        
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_id)
        self.duration_secs = duration_secs

    def register_schema(self, url, req_body):
        res = requests.post(url, data=req_body).text
        res_json = json.loads(res.text)
        self.id = res_json['identification']
        self.version = res_json['version']


    def publish_sensor_data(self):
        start_time = time.time()
        while time.time() - start_time < self.duration_secs:
            data = SensorData().__repr__().encode('utf-8')
            print(data)
            future = self.publisher.publish(self.topic_path, data, schemaId=self.id,
             versionId=str(self.version),format="json")
            print(f'Published {future.result()}')
            time.sleep(0.5)

This publisher provides all the necessary prerequisites for using Janitor. It has access to the Schema Registry for registration and sending data. A registered schema for this use case consists of three attributes: the sensor id, the sensors temperature reading, and the unit of measurement (e.g Kelvin or Celsius), but a new type of sensor which measures humidity also exists in the sensor network and is not registered. Let us view how this looks on the publisher side. 

After these are published, the central consumer delegates them to the according topics dependant on being valid (registered) or not.

At the client consumer side seen with the code below, we can see that all messages that adhere to the before registered schema, are consumed with no problem.

But what happened to the messages containing humidity? These were sent to the invalid-topic-json topic and are waiting to be evolved and discarded by the Puller&Cleaner component. When it’s time for the Puller&Cleaner module to be invoked, a new schema is created after invocation and can be consumed with little to no problem.

Comparison and Conclusion

In the table below we’ve compared the most important features a Schema Registry needs to have. We can see that all of the SR vendors offer schema registration, validation and retrieval, but the significant difference is that Janitor covers all of the most popular formats and the evolution.

Azure GCP Janitor
Schema
registration
Yes Yes Yes
Schema
evolution
No No Yes
Detail
retrieval
Yes Yes Yes
Schema
validation
Yes Yes Yes
Supported
formats
Avro Avro, Protocol buffers Avro, CSV, JSON, Protocol buffer, XML
Compatibility
types
Backward, Forward Backward, Forward
Architecture complexity/
dependencies
Event Hubs Pub/Sub

Pub/Sub,

GCP resource setup

Event Hubs and Kafka supported in upcoming versions.

Ease of use Easy to use but with a drawback of a new feature with few available code examples Easy to use, big community, and code examples, easy to understand API Easy to use, requires a REST endpoint for registration on the publisher side and a connection to pub/sub.

 

In summary, a Schema Registry is something we all love to hear about and use, for it’s main functionality of being able to elegantly handle large amounts of structured data. The Azure Schema Registry, GCP Schema Registry and Janitor Schema Registry provide those functionalities and are inspired by each other.

Even though Janitor’s Schema Registry is, we dare to say, the most progressive out of all three, due to the fact that it enables schema evolution and supports not one, but five formats, we are glad that this is globally recognized as an issue by all who need a solution. Other big platforms (and all of us) strive in the direction of finding one, or as Syntio likes to state and still stands by these words – create a perfect Schema Registry! This is just the beginning.

Stay tuned for part 2…