Mia Tadic
DATA ENGINEER
This blog demonstrates the task of ingesting data from remote API (i.e. Twitter’s API) to cloud (i.e. Google Cloud).
Google Cloud Platform (GCP) is a suite of public cloud computing services offered by Google. The platform includes a range of hosted services for compute, storage and application development that run on Google hardware. Services used in this blog are Google Cloud Pub/Sub and Google BigQuery.
-
Google Cloud Pub/Sub is a managed, scalable and real-time messaging ingestion service that allows messages to be exchanged between applications.
-
Google BigQuery is a scalable, managed enterpriseDATA WAREHOUSEfor analytics. It is a big data service for data processing and analytics for SQL-like queries made against multi-terabyte data sets.
Task idea
This blog demonstrates the task of ingesting data from remote API (i.e. Twitter’s API) to cloud (i.e. Google Cloud).
It was done using Producer and Consumer.
-
Producer sent tweets containing hashtag „dataengineering“ from Twitter to Pub/Sub topic. Producer was dockerized and then ran on Google Cloud’s Virtual Machine.
-
Consumer streamed messages from Pub/Sub topic to BigQuery table. Consumer was created using Google Cloud Functions, and it was automatically triggered by any new messages.
Described process is shown in following diagram:
Procedure
Here are the prerequisites and steps of our procedure.
Prerequisits for the task are:
-
Twitter Developer account
-
Google Cloud Platform account
-
Installed Docker
Main steps of procedure are:
-
Obtaining Twitter credentials
-
Creating a Producer
-
Creating a Consumer
-
Dockerizing a Producer
Let’s dig into steps of the procedure.
Obtain Twitter credentials
App’s client key and secret key, access token and token secret can be found in Twitter Dashboard.
Those were needed in later step.
Create a Producer
First, following resources were created:
1. Service account
Service account authorized access to needed GCP resources (Pub/Sub, BigQuery).
In GCP console,Service accountscan be found underIAM & Adminsection. By clicking on CREATE SERVICE ACCOUNT button, several steps were required:
-
Step 1: Filling in desired details
-
Step 2: Adding „Pub/Sub Editor“ and „BigQuery Data Editor“ roles
- Step 3: Downloading JSON key file by choosing CREATE KEY option
Additionally, user environment variable had to be added:
name=GOOGLE_AUTH_CREDENTIALS
value=path_to_downloaded_json_file
2. Pub/Sub Topic
Pub/Sub topic was the place where Producer sent tweets to.
3. Pub/Sub Subscription
Subscription pulled data from desired Pub/Sub topic. This was a way to see how data looked like.Subscriptionscan be found in the picture above.
Finally, it was all set for Producer to be created.
As mentioned, the Producer’s task was to send tweets containing hashtag “dataengineering” from Twitter to Pub/Sub topic.
Note: Keys and tokens obtained from Twitter were needed here.
Firstly, Pub/Sub package needed to be installed:
pip install –upgrade google-cloud-pubsub
To begin with Python script, following packages were imported:
import json
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from google.cloud import pubsub_v1
Since Producer was said to send tweets to Pub/Sub topic, connection to Pub/Sub topic had to be established somehow. It was done this way:
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(<project_id>, <topic_name>)
On the other side, since Producer accessed Twitter to grab its data, it was necessary to make connection to Twitter too. That connection was made this way:
auth = OAuthHandler(, )
auth.set_access_token(, )
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["dataengineering"])
This code calledtweepy‘s implemented methods. Additionally, objectlistenerwas introduced in the code for the purposes of the task. It was a class which inheritedtweepy‘sStreamListenerclass.
For the demonstration purposes, only following parts of the grabbed tweets were taken to save them to Pub/Sub topic:
-
created_at
-
id
-
text
-
source
-
retweet_count
-
user_name
-
user_location
-
user_followers_count
-
user_statuses_count
Extracted parts were concatenated intweet_messageafter whichtweet_messagewas published to Pub/Sub topic using following command:
self.client.publish(self.topic_path, data=tweet_message.encode('utf-8'))
Create a Consumer
As stated before, Consumer’s task was to stream messages from Pub/Sub topic to BigQuery table – so a BigQuery table had to be created.
BigQuery tables are generally contained in datasets. Therefore, dataset had to be created first. It was created in GCP console, by findingBigQueryunderBig Datasection, choosing project and clicking on CREATE DATASET. It popped a form which can be seen in a picture below:
Then, by entering the created dataset and simply clicking on CREATE TABLE button, table was created. Based on extracted tweet’s parts, table’s schema must have looked like this:
At last, Consumer was ready to be created. As mentioned, Consumer was created using Google Cloud Functions.
Missing values needed to be filled in:
Filemain.pycontained Consumer code, whilerequirements.txtcontained function dependencies (packages with their versions).
Google Cloud Function was triggered from a message on a Cloud Pub/Sub topic. This event was passed tohello_pubsubfunction as an argument and then function collected data (tweets) from the event:
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
Since Consumer’s task was to write messages to BigQuery table, Consumer had to connect to BigQuery table. To establish the connection, the following code was used:
client = bigquery.Client()
dataset_ref = client.dataset(<dataset_id>)
table_ref = dataset_ref.table(<table_id>)
table = client.get_table(table_ref)
client.insert_rows(table, )
Dockerize
Following step in oursteps of procedurewas to dockerize Producer. The idea was to pack the Producer into Docker image, send the image to virtual machine and then run it from there.
For the purposes of this task, Docker Desktop and Docker Hub were used and the instructions and installation file for various operational systems can be foundHERE.
For Docker, two files (Dockerfileandrequirements.txt) must be created and saved into the same Python project as producer code.
Dockerfile should look like this:
# Use an official Python runtime as a parent image
FROM python:3.7-slim
# Set the working directory to /app
WORKDIR /app
# Copy the current directory contents into the container at /app
ADD . /app
# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt
# Define environment variable
ENV GOOGLE_APPLICATION_CREDENTIALS=""
# Run app.py when the container launches
CMD ["python", "producer.py"]
Note:
-
After checking which Python version is being used, it is fine to just add suffix “slim” with no additional installation.
-
JSON key file must be copied into the same Python project directory (it is Dockerfile’s working directory now, so that is the place where Docker searches for environment variable)
As described in the beginning of the blog, Docker image was deployed to the virtual machine. Therefore, in this preparation for dockerizing, next step was to create GCP’s virtual machine.
To create VM,VM instancescan be found in GCP console, underComputesection and by choosingCompute Engineand then clicking CREATE INSTANCE. For this demonstration, following details were sufficient:
At last, steps for deploying Docker image to VM were:
-
Build Docker image
docker build -t producer .
-
Zip Docker image
docker save -o producer.tar producer
Note: Make sure to run both steps from producer’s Python project directory
3. Upload zip file to virtual machine
By clicking first on SSH button in VM instances in GCP console, and afterwards on Settings button,Upload fileoption was chosen andproducer.tarfile was uploaded.
4. Unzip Docker image
docker load -i producer.tar
5. Run Docker image
docker run -d producer
With this, the process of ingesting messages to Pub/Sub topic was completed.
To check if created Producer and Consumer work properly, BigQuery table can be checked. Table from this demonstration looked like this:
Conclusion and ideas
In this step-by-step tutorial, it was presented how to stream data to cloud from remote API using a simple Producer-Consumer flow. Instead of GCP and Twitter, there are plenty more cloud platforms and API’s to use. There are also other Google Cloud resources to play with, like Google Cloud Storage or Google Cloud SQL. Another tip is to send data from Cloud Pub/Sub topic to BigQuery using Google Cloud Dataflow.
Note: Don’t forget to delete the resources when they are not needed anymore.