How I Built A Cascading Data Pipeline Based on AWS (Part 2)

Previously, I shared my experience in developing a data pipeline using AWS CloudFormation technology. It is not an optimal approach, though, because it leaves behind 3 more issues awaiting resolution:
- The deployment has to be imposed manually which could increase the chances of errors;
- All resources are created in one single stack, without proper boundaries and layers; as the development cycle goes on, the resource stack will be heavier, and managing it will be a disaster;
- Many resources are supposed to be sustained and reused in other projects.
In short, we are going to increase the manageability and reusability of this project, in an agile manner.
Solution
AWS enables users to implement 2 types of CloudFormation structural patterns: cross-stack reference and nested stacking. Cross-stack reference stands for a designing style of developing cloud stacks separately, and usually independently, while the resources among all stacks can be interrelated based on the reference relationship. Nested stacking means a CloudFormation stack composed of other stacks. It is achieved by using the [AWS::CloudFormation::Stack](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-stack.html)
resource.

Because one of our missions we aim to achieve is to come up with better Project Management, the project is going to be broken down by layered separation and nested stacking is the one to help. However, in regard to the intrinsic interrelationship between the artifacts of the existing stack, we would also need to take a drop of cross-stack reference.
Implementation
We created 3 Lambda functions, 3 DynamoDB tables, 1 IAM role along with its policies attached, several SQS queues, and several Cloudwatch alarms. Due to the complexity of the functions themselves, in this version, they are going to be defined in separate templates, with the services only used by themselves including alarms and dead letter queues. Apart from those, IAM resources will be another nested stack and so will the DynamoDB tables, in order to maintain their reusability. The SQS queues that deliver messages between lambda functions will be a different stack too. All these nested stacks will be put in a new directory called /templates
.
Cascading-ETL-pipeline
├── LICENSE
├── README.md
├── branches
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── sales
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── salespersons
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── templates
│ ├── BranchCollector.yml
│ ├── SaleCollector.yml
│ ├── SalespersonCollector.yml
│ ├── iam.yml
│ ├── queues.yml
│ └── tables.yml
├── template.yml
└── utils.py
The coding part is quite light in this session compared to the previous one, since mostly we only need to move these original codes from one file to another. For example, when configuring the DynamoDB table stack, what I did is only copying the snippet and pasting it to a new file:
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
BranchDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "branch_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "branch_id"
KeyType: "HASH"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub branch-${Environment}
SalespersonDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "employee_id"
AttributeType: "S"
-
AttributeName: "branch_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "employee_id"
KeyType: "HASH"
-
AttributeName: "branch_id"
KeyType: "RANGE"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub salesperson-${Environment}
SaleDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "sale_id"
AttributeType: "S"
-
AttributeName: "employee_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "sale_id"
KeyType: "HASH"
-
AttributeName: "employee_id"
KeyType: "RANGE"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub sale-${Environment}
Cross-nested-stack References
It's worth noting that there are still some modifications in need. If we need to make sure the resources defined within a stack can be referenced by others outside the stack, those exporting stacks need a new section called Outputs
to output the values for references. In our case, the IAM role is going to be in use globally, so right after its definition, we output its ARN so that it's going to be visible within a certain scope.
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-lambda-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- events.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
Path: '/'
LambdaPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: !Sub '${AWS::StackName}-${Environment}-lambda-policy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: EventBusAccess
Effect: Allow
Action:
- events:PutEvents
Resource: '*'
- Sid: LambdaInvokeAccess
Effect: Allow
Action:
- lambda:InvokeFunction
Resource: "*"
- Sid: LogAccess
Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
Roles:
- !Ref LambdaRole
Outputs:
Role:
Description: The role to be used across the stacks
Value: !GetAtt LambdaRole.Arn
Export:
Name: !Sub ${Environment}-Role
Similarly, the messenger queues are exported in the same way:
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
EmployeeQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub employee-queue-${Environment}
VisibilityTimeout: 900
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt: EmployeeWorkloadDeadLetterQueue.Arn
maxReceiveCount: 10
EmployeeWorkloadDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub employee-workload-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
SaleQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub sale-queue-${Environment}
VisibilityTimeout: 900
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt: SaleWorkloadDeadLetterQueue.Arn
maxReceiveCount: 10
SaleWorkloadDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub sale-workload-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
Outputs:
EmployeeQueue:
Description: The SQS queue that delivers the payloads from branch collector to salesperson collector
Value: !Ref EmployeeQueue
Export:
Name: !Sub ${Environment}-EmployeeQueue
SaleQueue:
Description: The SQS queue that delivers the payloads from salesperson collector to sale collector
Value: !Ref SaleQueue
Export:
Name: !Sub ${Environment}-SaleQueue
In the meantime, the stacks importing the resources from others also need a minor change. AWS supports this functionality by providing an intrinsic function [Fn::ImportValue](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-importvalue.html)
. Take "Branch Collector" as an example. It involves the IAM role and a messenger queue which aren't created in the same stack. Thus, whenever the resources mentioned above occur, I replace them with the value of the function Fn::ImportValue
.
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
BranchCollector:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub branch-collector-${Environment}
Handler: lambda_function.lambda_handler
Runtime: python3.8
CodeUri: ./../branches/
Description: updating branch info in our DynamoDB table
MemorySize: 128
Timeout: 900
Role:
Fn::ImportValue:
!Sub ${Environment}-Role
Environment:
Variables:
LOGGING_LEVEL: INFO
APP_ENV: !Ref Environment
SQS:
Fn::ImportValue:
!Sub ${Environment}-EmployeeQueue
DB: !Sub branches-${Environment}
DeadLetterQueue:
Type: SQS
TargetArn:
Fn::GetAtt: BranchFunctionDeadLetterQueue.Arn
Events:
StartScheduledEvent:
Type: Schedule
Properties:
Schedule: rate(1 hour)
# dead letter queue
BranchFunctionDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub branch-function-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
# alarms
BranchErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Errors
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: 1
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-name}
BranchDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Duration
Namespace: AWS/Lambda
Period: 60
Statistic: Maximum
Threshold: 750000
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-name}
BranchThrottleAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Throttles
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: 1
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-name}
Root Stack
Now that all nested stacks are defined and created, there should be a root stack to integrate the entire infrastructure altogether. Consider the infrastructure that utilizes nested stack style as a hierarchy, the root stack is a parent that all the nested ones belong to (though the nested stacks can be parents to others too). In our case, it's quite as easy as to replace the existing snippets that define individual resources with the references to the nested stack CloudFormation templates in the predefined template.yml
.
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
# =========================================================================================
# IAM ROLES, POLICIES, PERMISSIONS
# =========================================================================================
IAM:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/iam.yml
Parameters:
Environment: !Ref Environment
# =========================================================================================
# AWS LAMBDA FUNCTIONS
# =========================================================================================
BranchCollector:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/BranchCollector.yml
Parameters:
Environment: !Ref Environment
DependsOn:
- IAM
- Queues
SalespersonCollector:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/SalespersonCollector.yml
Parameters:
Environment: !Ref Environment
DependsOn:
- IAM
- Queues
SaleCollector:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/SaleCollector.yml
Parameters:
Environment: !Ref Environment
DependsOn:
- IAM
- Queues
# =========================================================================================
# AWS DynamoDB TABLES
# =========================================================================================
Tables:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/tables.yml
Parameters:
Environment: !Ref Environment
# =========================================================================================
# AWS SQS QUEUES
# =========================================================================================
Queues:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: ./templates/queues.yml
Parameters:
Environment: !Ref Environment
From here, it's safe to say that the CloudFormation upgrade is complete!
But wait, does it mean that we need to rebuild and redeploy it? Sadly speaking, yeah, and that's what the next section is going to address.
Auto Deployment
Since I always store everything I code in the GitHub repo, I am going to leverage GitHub Actions. GitHub Actions is a feature of GitHub to automate the workflows stored on any GitHub repos so as to seamlessly support building, testing, and deployment. There are pre-defined workflows though, owing to the uniqueness and the complexity of our mission, we need to customize one that meets our needs.
Generally speaking, the workflow should mimic what we do manually in the cloud environment during the software build & deployment stage. Namely, it includes the following steps:
- Assign values to the parameters, e.g.
environment
- Set up AWS configuration, e.g. inputting the default AWS credentials and service region code
- Install SAM CLI
- Grant the AWS role access to the permissions of the services/resources that will be utilized
- Create a S3 bucket as the Cloudformation storage location
- Clone the common code base to every independent directory
- Build & deploy the Serverless Application Model (SAM)
To wrap them all up in a workflow file, it will be like:
name: A workflow that automates the data pipeline deployment
on:
workflow_dispatch:
push:
branches:
- main
paths-ignore:
- '.gitignore'
- '*.png'
- 'README.md'
pull_request:
paths-ignore:
- '.gitignore'
- '*.png'
- 'README.md'
jobs:
deploy:
container:
image: lambci/lambda:build-python3.8
runs-on: ubuntu-latest
env:
BUCKET_NAME: your-bucket-name
steps:
- name: Set Environment
id: setenv
run: |
echo "Running on branch ${{ github.ref }}"
if [ "${{ github.ref }}" = "refs/heads/main" ]; then
echo "::set-output name=env_name::prod"
else
echo "::set-output name=env_name::dev"
fi
- name: Set Repo
id: setrepo
run: |
echo "::set-output name=repo_name::${{ github.event.repository.name }}"
- name: Set Branch
id: setbranch
run: |
echo "::set-output name=branch_name::${{ github.head_ref}}"
- name: Checkout
uses: actions/checkout@v2
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{secrets.AWS_ACCESS_KEY_ID}}
aws-secret-access-key: ${{secrets.AWS_SECRET_ACCESS_KEY}}
aws-region: us-east-1
# role-to-assume: arn:aws:iam::807324965916:role/cdk-hnb659fds-deploy-role-807324965916-us-east-1
role-duration-seconds: 900
- name: Install sam cli
run: 'pip3 install aws-sam-cli'
- name: Complete policies
run: |
aws iam attach-user-policy
--policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
--policy-arn arn:aws:iam::aws:policy/CloudWatchEventsFullAccess
--policy-arn arn:aws:iam::aws:policy/AWSLambda_FullAccess
--policy-arn arn:aws:iam::aws:policy/IAMFullAccess
--policy-arn arn:aws:iam::aws:policy/AWSCloudFormationFullAccess
--policy-arn arn:aws:iam::aws:policy/AmazonSQSFullAccess
--policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
--user-name Memphis
- name: Create S3 Bucket
run: |
if ! aws s3api head-bucket --bucket "${{env.BUCKET_NAME}}" 2>/dev/null; then
aws s3api create-bucket --bucket "${{env.BUCKET_NAME}}"
else
echo "Bucket ${{env.BUCKET_NAME}} already exists"
fi
- name: Copy utils.py
run: 'for d in */; do cp utils.py "$d"; done'
- name: build
run: sam build && sam package --s3-bucket ${{env.BUCKET_NAME}} --s3-prefix "${{steps.setrepo.outputs.repo_name}}/${{steps.setbranch.outputs.branch_name}}/${{steps.setenv.outputs.env_name}}" --output-template-file packaged.yaml --region us-east-1 || { echo 'my_command failed' ; exit 1; }
- name: deploy
run: sam deploy --template-file packaged.yaml --s3-bucket ${{env.BUCKET_NAME}} --s3-prefix "${{steps.setrepo.outputs.repo_name}}/${{steps.setbranch.outputs.branch_name}}/${{steps.setenv.outputs.env_name}}" --stack-name "${{steps.setrepo.outputs.repo_name}}-${{steps.setenv.outputs.env_name}}-stack" --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND --region us-east-1 --no-fail-on-empty-changeset --parameter-overrides Environment=${{steps.setenv.outputs.env_name}} || { echo 'my_command failed' ; exit 1; }
Here I include one job called "deploy". It's containerized by the image lambci/lambda:build-python3.8
and run on the Ubuntu System, which is defined within this job. A variable called BUCKET_NAME
is created to store the string value of the name that we are going to name the S3 bucket to be built.
Step 1 is to create a new parameter env_name
which stores the value of the working environment name. It depends on the branch by which the workflow is triggered: unless the workflow is running on the main branch, it's called prod
; otherwise, it's dev
.
Step 2 records the repository name in another output value. This is going to be part of the prefix of the location of the template file in the S3 bucket.
Step 3 is similar to step 2, getting the name of the branch to be used later as another part of the prefix.
Step 4 simply uses actions/checkout, a package to check out the current repo under a different workspace so the workflow is able to access it.
Step 5 also uses a package. With the help of aws-actions/configure-aws-credentials@v1, we can effortlessly configure the AWS working environment by only providing the AWS access key ID, AWS secret access key, and region. Note that it's recommended to keep your sensitive credentials (AWS access key ID and AWS secret access key) in secret.
Step 6~11 is executed in the bash command line. In this order, the rest of the workflow installs SAM CLI; then attaches all necessary policy ARNs to the IAM user; creates an S3 bucket with the bucket name provided as an environmental parameter, if it doesn't exist; last but not least, a CloudFormation stack will be built, packaged and deployed by SAM commands.
Using workflows, developers will be saved from constantly rerunning part of, if not all of, the steps personally. Instead, as long as a pull request is made, any new commit will trigger a new workflow run with the codes on a non-main branch; when the pull request is merged, the most recent workflow will also be triggered along with the resources on the main branch.
Before you leave
Thanks for making it here. In this post, I talked about one of the major upgrades to improve the manageability and reusability of the codes, while introducing a solution to automating the stack building and deployment. If you are interested in the details, feel free to check out my repo!
By the way, the approach that I organized the resources and artifacts in the stack can also be a good start to kicking off microservices. If you are interested in microservices, don't forget to subscribe to follow my next articles!