Labs & musings

GCP pipeline: pub/sub-lookup-storage (part 2/2) GCP pipeline: pub/sub-lookup-storage (part 2/2)

Code / 09.06.2020

GCP pipeline: pub/sub-lookup-storage (part 2/2)

This post will briefly describe how to create Cloud Run service and showcase two different cases for both serverless options followed by performance test results.

previous blog post described the implementation of a data pipeline with simple lookup logic implemented using the Cloud Function. Google Cloud, in addition to Cloud functions, offers Cloud Run as serverless option. This post will briefly describe how to create Cloud Run service and showcase two different cases for both serverless options followed by performance test results.

Introduction

The goal is to determine a preferable option for the pubsub - lookup – storage (Image 1 pubsub-lookup), Cloud Function or Cloud Run.

Also, to answer for following: Is there a case when having separate components is more desirable (for lookup in Redis, receiving messages from topics and data storage)?

1

The initial idea was to have three Cloud Functions: one dealing with collecting messages from topics, the second looking for additional value in the Memorystore for Redis and the third storing merged data collected from topics and Memorystore in Google Cloud Storage and Firestore.

There are two advantages to this approach; firstly, you can customize the performance of an individual Cloud Function as much as we need by changing parameters such as timeout and memory allocation. Secondly, you can use an existing Cloud function (such as reading from Redis) in another use-case.

However, there is also a downside to this. By using this approach, time is wasted time on HTTP calls, which also means that you will be charged more for using the service. For example, if a subscriber instance sends out an HTTP call to the Writer instance, it must block its execution and wait for the Writer component response. For example, let’s assume that the writer component takes 250 milliseconds to execute itself. That means that the subscriber function’s execution time will also be 250 milliseconds longer because of it. And since GCP charges by execution time in milliseconds, we are effectively paying for 500 milliseconds as a result, even though the entire thing took 250 milliseconds and the Writer function was the one doing all the work.

In the post Serverless Option, something interesting was mentioned. Coincidentally, it is also the reason for implementing version with Cloud Run: By design, Cloud Functions only handles one request per instance, ensuring each request has the full amount of compute and memory allocated to it. This may make rapid scaling slower with Cloud Functions, but App Engine standard environment, Cloud Run, and Cloud Run for Anthos can handle multiple concurrent requests per instance. This means these services can scale faster by handling more traffic per instance, but all requests in an instance must share resources.

Cloud Run Implementation

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via web requests or Pub/Sub events.

The following describes the process of creating a Cloud Run Service that receives messages from topics.

The code is written in Python and Docker was used to create and deploy code using containers.

Flask is used for handling incoming requests. Flask is a lightweight WSGI web application framework. It is classified as a microframework because it does not require particular tools or libraries. Following code shows the snips of source code:

from flask import Flask, request import json app = Flask(__name__) # [START run_pubsub_handler] @app.route('/', methods=['POST']) def index(): envelope = request.get_json() if not envelope: msg = 'no Pub/Sub message received' print(f'error: {msg}') return f'Bad Request: {msg}', 400 if not isinstance(envelope, dict) or 'message' not in envelope: msg = 'invalid Pub/Sub message format' print(f'error: {msg}') return f'Bad Request: {msg}', 400

DOCKER IMAGE

For creating images, you are going to need a Dockerfile. Docker can build images automatically by reading its instructions. A Dockerfile is a text document that contains all the commands a user can call on the command line to assemble an image.

In our case the Dockerfile looks like this:

FROM python:3.8 # Copy application dependency manifests to the container image. # Copying this separately prevents re-running pip install on every code change. COPY requirements.txt ./ # Install production dependencies. RUN pip install -r requirements.txt # Copy local code to the container image. ENV APP_HOME /app WORKDIR $APP_HOME COPY . ./ # Run the web service on container startup. # Use gunicorn webserver with one worker process and 8 threads. # For environments with multiple CPU cores, increase the number of workers # to be equal to the cores available. CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

After the configuration setup in Dockerfile, you can build the image with this command:

docker build [OPTIONS] PATH | URL | -

To push created image on the GCP Cloud Registry, tag name must include a registry name (Tagging the image with a registry name configures the docker push command to push the image to a specific location). This can be achieved by using the command:

docker tag [SOURCE_IMAGE] [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG]

To push the image to the Container Registry, use the following command:

docker push [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG]

The image can now be found on the Container Registry service using the GCPs web dashboard.

3

Run the following command to deploy your app:

gcloud run deploy NAME --image gcr.io/PROJECT_ID/IMAGE_NAME

2

If you want to deploy a code update to the service, repeat the previous steps. Each deployment to a service creates a new revision and automatically starts serving traffic when ready.

Integrating with Pub/Sub

First, it is necessary to create a topic for Pub/Sub

gcloud projects add-iam-policy-binding PROJECT_ID \ -member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \ -role=roles/iam.serviceAccountTokenCreator

Create or select a service account to represent the Pub/Sub subscription identity.

gcloud iam service-accounts create cloud-run-pubsub-invoker \ -display-name "Cloud Run Pub/Sub Invoker"

Create a Pub/Sub subscription with the service account using the appropriate tab. Give the invoker service account permission to invoke your Cloud Run service:

gcloud run services add-iam-policy-binding pubsub-tutorial \ -member=serviceAccount:cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com \ -role=roles/run.invoker

Create a Pub/Sub subscription with the service account:

gcloud pubsub subscriptions create myRunSubscription --topic myRunTopic \ -push-endpoint=SERVICE-URL/ \ -push-auth-service-account=cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com

You can find your SERVICE-URL here:

4

Connections for Cloud Run

In order to be able to connect to the database (SQL) and Memorystore, it is necessary to add connections in the advanced settings.

It is possible to access the cache through a Cloud Run, by connecting the Cloud Run to a serverless VPC connector in the same authorized network as the Memorystore instance.

5

Like any configuration change, setting a new configuration for the Cloud SQL connection leads to the creation of a new Cloud Run revision.

TESTS

The following pipelines are used for testing:

  • Case 1: The writer and lookup in Redis are no longer separate components, they are modules of the Subscriber. 

 6

  • Case 2: All components (subscriber, writer and Redis Proxy) are located in separate Cloud Run services. 

 7

  • Case 3: All components are located in one function.

 7

  • Case 4: Initial case, Image 1 pubsub-lookup.

For all tests, data was stored in Redis and all messages which were sent were valid. The results were analyzed in the same way for every version; using the Metrics explorer that tracks the metric set to the subscriber function execution time and the time difference between the time a message is published and the time delay which represents the difference of the instance invocation (triggered by pubsub) time and the time found in context.timestamp obtained from the message from the topic (Function parameters).

Due to the large number of requests to the firestore, storage and memorystore, the number of attempts was set to three as it was enough for the next tests. All quotas that were set by default on the project did not change, and quotas such as the maximum and minimum number of instances of Cloud Function were set to automatically create new instances as needed. For Cloud Run, they were set to 1000 (which is the maximum number of instances, however, it is possible to request a larger number).

The cases were tested by sending:

  1. 240,000 messages at a rate of 200 messages/second for 20 minutes, one publisher.

  2. 240,000 messages at same time, one publisher.

  3. 140,000 messages, three publishers (in total 420,000 messages).

The following images show the graphs of average execution time for Case 1, Case 2 and Case 3 for Test 1 (240,000 messages at a rate of 200 messages/second for 20 minutes, one publisher): 

11

Case1

Average execution time is 0.130 s

No components raised errors during the test.

All of the 240,000 messages were stored as valid.

 12

Case2

Average execution time is 0.218 s

No components raised errors during the test. 

All of the 240,000 messages were stored as valid.

 13

Case3

Average execution time is 213.17 ms.

No components raised errors during the test.
All of the 240,000 messages were stored as valid.

Next set of images shows the graphs of average execution time for Case 1, Case 2 and Case 3 for Test 2 (240,000 messages at same time, one publisher):

 13

Case1

Average execution time is is 0.147 s

No components raised errors during the test.
All of the 240,000 messages were stored as valid.

 15

Case2

Average execution time is 0.277 s

No components raised errors during the test. 

All of the 240,000 messages were stored as valid.

 /datastore/imagestore/original/16046553897ddcb61c-70aa-4a71-a0d3-0ad0e2af273b.png?v=1604655389

Case3

Average execution time is 364.01 ms.

No components raised errors during the test. 

All of the 240,000 messages were stored as valid.

Last set of images shows the graphs of average execution time for Case 1, Case 2 and Case 3 for Test 3 (140,000 messages, three publishers): 

 16

Case1

Average execution time is 0.155 s.

Not all data is saved in Cloud Storage and Cloud Firestore: 341,728 is saved, and the remaining 78,272 is not saved.

We can see that the following warning appears in the logs:

 16

With textPayload: "The request was aborted because there was no available instance." 

According to Google documentation, in our case, an issue can be caused by a sudden immense increase in traffic or if the service has reached its maximum container instance limit. By default, container instances can scale up to 1000 instances, therefore we can try to increase the number of instances for the next test.

17

Case3

Average execution time is 316.56 ms

No components raised errors during the test.

All of the 420,000 messages were stored as valid.

Cases

Number of test

Avrg. Execution time [ms]

Finishing time of last instance

Test success

Case 1

1

130

0.27 s

+

2

147

2:14.955

+

3

155

9:59.43

-

Case 2

1

218

0.188 s

+

2

277

7:40.987

+

3

375

12:8.153

+

Case 3

1

213

2.25 s

+

2

364

3:21.774

+

3

316

5:38.685

+

Case 4

1

239.23

4.55 s

+

2

481.84

2:47.56

+

3

371

5:09.91

-


The table for Test 1 and Test 2 shows that Case 1 has the lowest average execution time of instances and that the process of collecting, updating and storing data is done with the shortest delay (from the moment the message is sent). In Test 3, Case 3 and Case 2 managed to store all data (an error occurred in the remaining ones, resulting in the loss of messages). So, from the conducted tests we can conclude that Case 1 is a better option for our use-case when there is no large flow of data in a short period of time. If there was a case that required a large amount of data to be sent in a short time, Cloud Function-v2 would have been a better option.
It is important take note of the reason why some tests were not successful. For the Case 4, the quota limit was reached: Quota exceeded for quota group 'CPUMilliSeconds-europe-west1' and limit 'CPU allocation in function invocations for europe-west1 per 100 seconds. So, the quota is shared between all Cloud Functions that are in the same project and in the same region. Furthermore, for the Case 1 we reached the quota for maximum number of instances. If needed, it is possible to increase both limits by contacting support.
We believe that a sample of three tests for each version is not enough to draw definite conclusions, however it allowed us to gain considerable insight into serverless options for streaming data.

References:

PART 1/2

BACK TO LAB

Cookie policy

To make this website run properly and to improve your experience, we use cookies. For more detailed information, please check our Cookie Policy.

Choice of cookies on this website

Allow or deny the website to use functional and/or advertising cookies described below:

Settings Accept necessary Accept selected