Serverless ETL orchestration using AWS Step functions and on-demand Redshift cluster

Mara Kozic

DATA ENGINEER

We’re building a recommendation engine that is based on customer usage, billing data and a set of product offers in Product Catalogue.

The idea is that the user uploads the input tables from on premise Cloudera cluster, along with a JSON file specifying schemas, tables, table columns and corresponding filenames. A State Machine copies the data to the appropriate schemas/tables in the Redshift database, following which a number of database operations are performed. The resulting table gets unloaded to a CSV file in an S3 bucket.

All of the resources are defined through CloudFormation, and are split into two CF stacks. The first, initial stack, is running constantly and it entails roles, VPC, Subnet, VPC Endpoints, all the neccesary Lambdas and a State Machine definition – basically everything except the Redshift Cluster. The aforementioned State Machine initializes creation of the second (Redshift) stack, which runs the Redshift Cluster. In the State Machine input, the user defines whether or not to delete the Redshift stack upon the unload of the CSV results file.

The initial setup

The two CloudFormation Stacks

The initial CloudFormation Stack is started through CLI:

aws cloudformation create-stack --stack-name TRTinitial --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND --template-url https://mybucket.s3.eu-central-1.amazonaws.com/InitialStack.cfg --parameters ParameterKey=Username,ParameterValue="redshiftdbuser" ParameterKey=Password,ParameterValue="redshiftdbpassword"

and entails the following resources:

  1. User input Redshift database credentials (parameter keys and values from the above), for which we specify not to be echoed under the CloudFormation Parameters section:

    RedshiftSecret:
    Type: ‘AWS::SecretsManager::Secret’
    Properties:
    Name: MyRedshiftSecret
    SecretString: !Join [“”, [‘{“username”:”‘, !Ref Username, ‘”,”password”:”‘, !Ref Password, ‘”}’]]
    KmsKeyId: !Ref MyKMS

  2. KMS key – for encryption/decryption

  3. Lambdas Role – this is a role given to our Lambda functions so that they can be invoked and have access to SecretsManager, S3, Redshift, KMS, and CloudFormation

  4. Redshift Role – a role given to Redshift Cluster so that it has S3 access

  5. State Machine Role – a role that allows Lambdas to be invoked through the State Machine

  6. VPC – the Virtual Private Cloud that contains the resources below

  7. Private Subnet

  8. Private Subnet Route Table – Definition of a new Route Table in order for our Lambda functions to be able to read/write to S3 bucket that is outside our VPC. It is not possible to retrieve the reference/attributes of an automatically generated Route Table, hence we need to define a new one.

  9. Private Subnet Route Table Association

  10. VPC Endpoint for Secrets Manager

  11. VPC Endpoint for S3 (in addition to the Route Table)

  12. Lambda functions – these create/delete the Redshift stack, retrieve the Redshift Cluster Endpoint string (more on this later on), read from SecretsManager and S3, and execute SQL queries on Redshift

  13. State Machine definition – all of our Lambda functions are orchestrated by the State Machine

The Redshift CloudFormation Stack has fewer resources and therefore it takes a lot less time to create/delete it:

  1. Redshift Cluster subnet group

  2. Redshift Cluster definition

Including a State Machine definition in the Initial CouldFormation template

There are three ways to do this. The first one is to write a one-liner (which can very easily result in an unreadable code), and the second one is to write all of the stages explicitly writing the newline characters and then use a Fn::Sub command that will substitute them (this can result in a very long CF template document, depending on the number of states). The third way is to externalize the State Machine definition, utilize the Fn::Transform function:

StateMachine: Type: "AWS::StepFunctions::StateMachine" Properties: StateMachineName: MyStateMachine RoleArn: !GetAtt MyStateMachineRole.Arn Fn::Transform: Name: AWS::Include Parameters: Location: 's3://mybucket/StateMachine.yml'

and provide CF with a YAML template:

DefinitionString: Fn::Sub: | { "Comment": "TRT AWS Step Functions state machine.", "StartAt": "CreateStack", "States": { "CreateStack": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:createCloudFormationStack", "ResultPath": "$.result", "Next": "GetDescribeStacksOutputs" }, "GetDescribeStacksOutputs": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:getDescribeStackOutputs", "ResultPath": "$.getdescribe", "Next": "CreateTables" }, "CreateTables": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:lambdaSQLcreatetables", "ResultPath": "$.names", "Next": "CopyFromS3Map" }, "CopyFromS3Map": { "Type": "Map", "InputPath": "$", "ItemsPath": "$.names.stg", "Parameters": { "arrayvalue.$": "$$.Map.Item.Value", "getdescribe.$": "$.getdescribe", "s3bucket.$": "$.s3bucket", "names.$": "$.names.stg" }, "MaxConcurrency": 20, "Iterator": { "StartAt": "CopyTables", "States": { "CopyTables": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:lambdaSQLcopy", "End": true } } }, "ResultPath": "$.sqlcopy", "Next": "DoSomethingSQL" }, "DoSomethingSQL": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:lambdaSQLdosomething", "Parameters": { "getdescribe.$": "$.getdescribe" }, "ResultPath": "$.sqldosomething", "Next": "Unload" }, "Unload": { "Type": "Task", "Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:lambdaSQLunload", "Parameters": { "getdescribe.$": "$.getdescribe" }, "ResultPath": "$.unload", "Next": "SucceedState" }, "SucceedState": { "Type": "Succeed" } } }

Here, CreateStack state starts the Redshift Cluster. The Cluster Endpoint is consisted of the Cluster name and a string of characters that is generated automatically. Hence, there is a GetDescribeStacksOutputs state that runs a Lambda function which, among other values, returns the Cluster Endpoint string. CopyTables map state then iterates over the table names returned by the CreateTables state and copies the content from S3 into the Redshift tables. Next, we perform database operations by utilising different Lambdas to execute SQL queries (represented by DoSomethingSQL stage). Finally, the Unload state generates a resulting .csv file in the specified S3 location. Obviously, this is just an example, and in case a CF stack with the same name already exists, Create Stack step will fail. By giving the State Machine user input on whether to work with the pre-existing Redshift stack or delete it and create a new one, we can ensure the correct workflow. This, however, entails adding multiple checks and Wait/Choice states (see the figure below). This is so that we can periodically check the status of the Redshift stack before continuing to the next stage, as creating/deleting the Redshift stack may take several minutes to complete. Timely deletion steps were included so that the user input can be specified whether the Redshift stack should be deleted upon the Unload step.

Executing SQL queries on AWS Redshift using Lambda

Running SQL scripts in Redshift is done through AWS Lambda. The psycopg2 library is a PostgreSQL library able to run SQL queries on Redshift using Python, and the compiled version for Linux can be found here. With the addition of a Python module facilitating query submission (redshift_utils.py), your lambda function, and the SQL script, this is how the zip folder should look like:

The Lambdas that execute SQL queries are written in Python 2.7, as the psycopg2 library only works with Python 2. When writing SQL queries, it’s possible to pass variables in multiple ways. For instance, one way is to use the second argument of the execute() method and a %s variable placeholder. Another way is to use a {} placeholder and a format() method. Here’s an example of a copy command:

copy {dbname}.{schemaname}.{tablename} from %s iam_role %s removequotes emptyasnull blanksasnull delimiter '|' ignoreheader 1 gzip;

and it results in the following query:

copy myredshiftdatabase.myschema.mytable from ' s3://mybucket/mytable.csv.gz' iam_role 'arn:aws:iam::aws_account_id:role/RedshiftS3Access' removequotes emptyasnull blanksasnull delimiter '|' ignoreheader 1 gzip;

Recap

Overall, setting everything up in CloudFormation was relatively easy, with the exception of State Machine definition as a part of the Initial CloudFormation Stack. Considering the number of States in our State Machine, a one-liner was not an option. Similarly, writing the definition itself, including the explicit newline characters, within the CloudFormation template and using a Fn::Sub function to replace the newline characters would result in a messy template. The tricky bit was to figure out how to deal with newline characters in the YAML State Machine definition file. The solution was to use DefinitionString and Fn::Sub function along with JSON definition of State Machine, as follows:

DefinitionString: Fn::Sub: |

Moving on, in order for Lambda functions to be able to read/write to S3 bucket that is outside our VPC, we need an S3 VPC endpoint and a Route Table needs to be set up. Since it is not possible to retrieve the reference/attributes of an automatically generated Route Table, a new one was defined.

Finally, depending on the Redshift infrastructure chosen as well as the input dataset size, some queries can take longer than the others. This is why we implemented a wait loop that takes SQL query PID as input and keeps on checking the query status until ‘Completed’ is returned.