Snowpipe Streaming: Real-time Data Ingestion to Snowflake

Dominik Vrbanic

DATA ENGINEER

Introduction

Data loading into Snowflake can be achieved through various methods, including Snowpipe and Snowpipe Streaming. Both of these services provide automated and continuous data ingestion capabilities.

Snowpipe is a serverless data ingestion service offered by Snowflake, designed to simplify the process of loading data into Snowflake data warehouses. When new files are ingested from a source, Snowpipe automatically triggers a pipeline that loads the data into the relevant Snowflake table. However, it’s important to note that Snowpipe is continuous but not real-time. There may be a delay of several minutes before the ingested data becomes available for querying. Additionally, if a large amount of data is pushed through Snowpipe at once, it can lead to throughput issues where writes queue up.

On the other hand, Snowpipe Streaming is a new data ingestion feature released by Snowflake for public preview on March 7, 2023. This Java-based open-source API offers high-throughput, low-latency streaming data ingestion at a low cost. It allows users to write rowsets from streaming sources (such as app analytics, sensors, or CDC events) directly into Snowflake tables over HTTPS, without setting up a pipeline or using intermediary cloud storage (e.g. Amazon S3). Unlike the original Snowpipe, which is based on micro-batching files written to cloud storage, Snowpipe Streaming leverages the Snowpipe Streaming API and the new Client SDK. The primary advantage of using the new API is reduced latency, as it removes the additional step of moving data from the source to cloud storage. Furthermore, Snowpipe Streaming introduces new functionalities such as exactly-once delivery, ordered ingestion, and error handling with dead-letter queue (DLQ) support, ensuring data integrity and reliability.

Snowpipe Streaming also serves as an ingestion method for the Snowflake Connector for Kafka. This integration allows seamless streaming data ingestion from Kafka topics into Snowflake tables, leveraging the power and scalability of both technologies.

In the upcoming sections, we will explore Snowpipe Streaming in depth, covering its capabilities, implementation options, and use cases. We will conduct a comprehensive comparison with Snowpipe, offer best practices, and consider cost aspects.

Overview of Snowpipe Streaming API

Prior to the introduction of Snowpipe Streaming, Snowflake primarily relied on bulk loads through Snowpipe for data ingestion. However, this approach introduced latency and storage costs as data had to be stored in an intermediate stage before being loaded into a table.

To overcome these limitations, Snowflake developed the Streaming API, which allows you to establish a direct connection with your Snowflake Data Warehouse and write data directly to the database using your own managed application. By leveraging the Streaming API, you can reduce latency and eliminate the need for intermediate storage. With Streaming Ingest SDK, you can receive messages and hand them off to Snowflake for ingestion, mapping the values to the columns of a table and inserting the rows in real-time.

The Snowpipe Streaming API is utilized for creating a streaming Client, opening a Channel, and inserting data from the source into the Snowflake table. Client and Channel are used to communicate with Snowflake, which will be used to stream rows of data into a table. All this can be done in one single Java program with a few lines of code.

Channels are a key concept in Snowpipe Streaming, representing logical partitions that correspond to connections between clients and destination tables. A single channel maps to exactly one table in Snowflake, but multiple channels can point to the same table. The Client SDK has the ability to open multiple channels to multiple tables; however, the SDK cannot open channels across accounts. The ordering of rows and their corresponding offset tokens is preserved within a channel but not across channels pointing to the same table.

You can run SHOW CHANNELS command to list the channels for which you have access privileges.
Inactive channels and their offset tokens are deleted automatically after 30 days.

Use cases for Snowpipe Streaming

What can you do with Snowpipe Streaming that you couldn’t do with the original Snowpipe? There are two main scenarios where the new streaming ingestion feature can help you. Keep in mind that both versions can only write to Snowflake:

  • Real-time data ingestion: This is relevant when a business needs data to be available in Snowflake within seconds of the event being recorded. For example, a security analyst might need to analyze log data as soon as an incident is detected in order to determine the appropriate mitigation.

  • Unifying streaming and batch operations: In Snowflake’s announcement of the feature, they highlighted the potential for Snowpipe Streaming to simplify real-time architecture. The idea is that instead of running separate pipelines or systems for batch, micro-batch, and streaming ingestion, you can now use Snowpipe for both real-time and historical data.

Using Snowpipe Streaming

There are two main ways of using Snowpipe Streaming:

1. The Client SDK

Snowflake has provided a Client SDK as part of its Java Ingest SDK. It can be implemented within application code to send events directly to Snowflake via the new streaming API. Using the Streaming Ingest SDK with your own Java application is fairly straightforward but does require some work. It requires for you to have custom Java application code wrapper for the Snowflake Ingest SDK.

The Snowpipe Streaming service is currently implemented as a set of APIs for the Snowflake Ingest SDK. The SDK is available for download from the Maven Central Repository. The SDK supports Java version 8 (or higher) and requires Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

The SDK makes REST API calls to Snowflake. You might need to adjust your network firewall rules to allow connectivity.

The API requires a custom Java application interface capable of pumping rows of data and handling encountered errors. You are responsible for ensuring the application runs continuously and can recover from failure. For a given set of rows, the API supports the equivalent of ON_ERROR = CONTINUE | ABORT. ABORT aborts the entire batch after the first error is found and is the default setting, and CONTINUE continues to load the data if errors are found.

The application should capture errors using the response from the insertRow (single row) or insertRows (set of rows) methods.

2. Kafka Connector with Snowpipe Streaming

If you are currently using Kafka within your data pipeline, and it plays a role in transmitting data to Snowflake, there’s an even more convenient way to leverage Snowflake’s Streaming API. In this approach, you would deploy Kafka Connect as a separate component and then establish a connection to Snowflake using the Snowflake Connector. The Snowflake Streaming API enables streaming rows from Apache Kafka topics directly into target tables within Snowflake.

To leverage the Snowflake Streaming API with Kafka, ensure that your connector is at least version 1.9.1. The connector uses the Client SDK under the hood, but reduces the configuration overhead and saves users the need to write a Java client (for the use cases where it is applicable). The streaming API can be used with open-source Kafka as well as the cloud-managed Kafka offered by AWS or Confluent.

If you’re on that version or greater, all you need to do is update your Snowflake properties by setting: Snowflake.Ingest.Method to SNOWPIPE_STREAMING, and Snowflake.Role.Name to the role that you want to perform the inserts into the table. Those are the only two required configuration changes to set up your Kafka connector to use the new Snowflake Streaming API. However, there are other properties you can set for the buffer, polling, and error handling. You can find more information on these and other option properties here.

The Kafka connector with Snowpipe Streaming currently doesn’t support schema detection or schema evolution. It uses the same Schema of Tables as the one used with Snowpipe.

Snowpipe Streaming API vs. Snowpipe

The API is intended to complement Snowpipe, not replace it. Use the Snowpipe Streaming API in streaming scenarios where data is streamed via rows (e.g. Apache Kafka topics) rather than written to files. It is meant to support real-time use cases, where a specific event needs to be written to a Snowflake table while ensuring exactly-once semantics and deduplication at the event (rather than a file) level. The API fits into an ingest workflow that includes an existing custom Java application that produces or receives records. The API removes the need to create files to load data into Snowflake tables and enables the automatic, continuous loading of data streams into Snowflake as the data becomes available.

The following table describes the differences between Snowpipe Streaming and Snowpipe:

Category Snowpipe Streaming Snowpipe
Data ingestion method Streaming Continuous / micro-batching
Sources

Kafka, various (over HTTPS via Java code)

Cloud storage
Real-time ingestion Yes, but insert only. Near real-time.
Form of data to load Rows. Files. If your existing data pipeline generates files in blob storage, it is recommended to use Snowpipe rather than the API.
Third-party software requirements Custom Java application code wrapper for the Snowflake Ingest SDK. None.
Resource allocation Serverless. Serverless.
Data ordering Ordered insertions within each channel. Not supported. Snowpipe can load data from files in an order different from the file creation timestamps in cloud storage.
Load history Load history recorded in SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY view (Account Usage). Load history recorded in LOAD_HISTORY view (Account Usage) and COPY_HISTORY function (Information Schema).
Pipe object Does not require a pipe object. The API writes records directly to target tables. Requires a pipe object that queues and loads staged file data into target tables.
Primary Use Cases Event-based data: IoT, app analytics, ads, CDC… Multiple sources with varying
frequency and size.

Best practices for Snowpipe Streaming

To ensure optimal cost efficiency and performance, consider implementing the following best practices:

  1. Cost Optimization:
    • Call the Snowpipe Streaming API with fewer clients that write more data per second.
    • Aggregate data from multiple sources, such as IoT devices or sensors, using a Java or Scala application, and then call the API to load data using the Snowflake Ingest SDK at higher flow rates.
    • Utilize the same tables for both batch and streaming ingestion to reduce Snowpipe Streaming compute costs due to pre-empted file migration operations.
  2. Performance Recommendations:
    • Use supported types from the java.time package for TIME, DATE, and TIMESTAMP columns.
    • Set OnErrorOption to OnErrorOption.CONTINUE when creating a channel and manually check the return value from insertRows for ingestion errors.
    • If you are loading multiple rows, using insertRows will be more performant and cost-effective than calling insertRow multiple times, as there is less time spent on locks.
    • When setting the default log level to DEBUG, make sure that the following loggers keep logging on INFO. That’s because their DEBUG output is very verbose, which can lead to significant performance degradation.

By following these best practices, you can optimize the cost efficiency and performance of your Snowpipe Streaming implementation.

Costs and Considerations

With Snowpipe Streaming’s serverless compute model, you can stream any volume of data without managing a virtual warehouse. Instead, Snowflake handles the compute resources, automatically scaling based on the current Snowpipe Streaming load. Compute costs are based on migration compute costs and per-second client ingestion time.

Note the following:

  • File migration may sometimes be pre-empted by clustering or other DML operations.

  • Migration may not always occur, and therefore computing costs may be reduced.

Given the number of factors that can differentiate Snowpipe Streaming loads, it is very difficult for Snowflake to provide sample costs. The size of records, number of records, data types, etc. can affect the compute resource consumption for file migration. Client charges are dictated only by how many clients are actively writing data to Snowflake on a per-second basis.

Account administrators (users with the ACCOUNTADMIN role) or users with a role granted the MONITOR USAGE global privilege can use SQL commands to view the credits billed to your Snowflake account within a specified date range.

To query the history of data migrated into Snowflake tables, the amount of time spent loading data into Snowflake tables using Snowpipe Streaming, and the credits consumed, query the following views:

 

Conclusion

There are many approaches to data ingestion, but the best practice is to reduce complexity while achieving your business requirements. Batch and Streaming ingestion can work together to provide the simplest and most cost-effective solution to your data pipelines. Streaming ingestion is not meant to replace file-based ingestion but rather to augment it for data loading scenarios that better fit your business needs.

For most batch scenarios, ingesting large files using COPY or Snowpipe Auto-Ingest provides a performant and efficient mechanism for moving bulk data to Snowflake. For streaming scenarios where rows can be sent from applications or event streaming platforms like Kafka, using Snowpipe Streaming enables low-latency ingestion directly into Snowflake tables at a lower cost.

Consider your data generation sources and latency requirements to leverage both batch and streaming ingestion to reduce complexity and cost.