How I Built A Cascading Data Pipeline Based on AWS (Part 2)

Author:Murphy  |  View: 22158  |  Time: 2025-03-23 12:55:01
Photo by Mehmet Ali Peker on Unsplash

Previously, I shared my experience in developing a data pipeline using AWS CloudFormation technology. It is not an optimal approach, though, because it leaves behind 3 more issues awaiting resolution:

  1. The deployment has to be imposed manually which could increase the chances of errors;
  2. All resources are created in one single stack, without proper boundaries and layers; as the development cycle goes on, the resource stack will be heavier, and managing it will be a disaster;
  3. Many resources are supposed to be sustained and reused in other projects.

In short, we are going to increase the manageability and reusability of this project, in an agile manner.

Solution

AWS enables users to implement 2 types of CloudFormation structural patterns: cross-stack reference and nested stacking. Cross-stack reference stands for a designing style of developing cloud stacks separately, and usually independently, while the resources among all stacks can be interrelated based on the reference relationship. Nested stacking means a CloudFormation stack composed of other stacks. It is achieved by using the [AWS::CloudFormation::Stack](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-stack.html) resource.

A nested stack in real life: a nest full of nests/eggs (Photo by Giorgi Iremadze on Unsplash)

Because one of our missions we aim to achieve is to come up with better Project Management, the project is going to be broken down by layered separation and nested stacking is the one to help. However, in regard to the intrinsic interrelationship between the artifacts of the existing stack, we would also need to take a drop of cross-stack reference.

Implementation

We created 3 Lambda functions, 3 DynamoDB tables, 1 IAM role along with its policies attached, several SQS queues, and several Cloudwatch alarms. Due to the complexity of the functions themselves, in this version, they are going to be defined in separate templates, with the services only used by themselves including alarms and dead letter queues. Apart from those, IAM resources will be another nested stack and so will the DynamoDB tables, in order to maintain their reusability. The SQS queues that deliver messages between lambda functions will be a different stack too. All these nested stacks will be put in a new directory called /templates.

Cascading-ETL-pipeline
├── LICENSE
├── README.md
├── branches
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── lambda_function.py
│   ├── requirements.txt
│   └── service
│       ├── config.py
│       └── service.py
├── sales
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── lambda_function.py
│   ├── requirements.txt
│   └── service
│       ├── config.py
│       └── service.py
├── salespersons
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── lambda_function.py
│   ├── requirements.txt
│   └── service
│       ├── config.py
│       └── service.py
├── templates
│   ├── BranchCollector.yml
│   ├── SaleCollector.yml
│   ├── SalespersonCollector.yml
│   ├── iam.yml
│   ├── queues.yml
│   └── tables.yml
├── template.yml
└── utils.py

The coding part is quite light in this session compared to the previous one, since mostly we only need to move these original codes from one file to another. For example, when configuring the DynamoDB table stack, what I did is only copying the snippet and pasting it to a new file:

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters:  #   Type: String
  Environment:
    Type: String
Resources:
  BranchDynamoDBTable: 
    Type: AWS::DynamoDB::Table
    DeletionPolicy: Delete
    Properties:
      BillingMode: PAY_PER_REQUEST 
      AttributeDefinitions: 
        - 
          AttributeName: "branch_id"
          AttributeType: "S"
      KeySchema: 
        - 
          AttributeName: "branch_id"
          KeyType: "HASH"
      StreamSpecification:
        StreamViewType: NEW_IMAGE
      TableName: !Sub branch-${Environment}

  SalespersonDynamoDBTable: 
    Type: AWS::DynamoDB::Table
    DeletionPolicy: Delete
    Properties:
      BillingMode: PAY_PER_REQUEST 
      AttributeDefinitions: 
        - 
          AttributeName: "employee_id"
          AttributeType: "S"
        - 
          AttributeName: "branch_id"
          AttributeType: "S"
      KeySchema: 
        - 
          AttributeName: "employee_id"
          KeyType: "HASH"
        - 
          AttributeName: "branch_id"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: NEW_IMAGE
      TableName: !Sub salesperson-${Environment}

  SaleDynamoDBTable:
    Type: AWS::DynamoDB::Table
    DeletionPolicy: Delete
    Properties:
      BillingMode: PAY_PER_REQUEST 
      AttributeDefinitions: 
        - 
          AttributeName: "sale_id"
          AttributeType: "S"
        - 
          AttributeName: "employee_id"
          AttributeType: "S"
      KeySchema: 
        - 
          AttributeName: "sale_id"
          KeyType: "HASH"
        - 
          AttributeName: "employee_id"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: NEW_IMAGE
      TableName: !Sub sale-${Environment}

Cross-nested-stack References

It's worth noting that there are still some modifications in need. If we need to make sure the resources defined within a stack can be referenced by others outside the stack, those exporting stacks need a new section called Outputs to output the values for references. In our case, the IAM role is going to be in use globally, so right after its definition, we output its ARN so that it's going to be visible within a certain scope.

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters:  #   Type: String
  Environment:
    Type: String
Resources:
  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${Environment}-lambda-role'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
            - events.amazonaws.com
          Action:
          - sts:AssumeRole
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/AWSLambdaExecute
      - arn:aws:iam::aws:policy/AmazonSQSFullAccess
      - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
      Path: '/'
  LambdaPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: !Sub '${AWS::StackName}-${Environment}-lambda-policy'
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Sid: EventBusAccess
          Effect: Allow
          Action:
          - events:PutEvents
          Resource: '*'
        - Sid: LambdaInvokeAccess
          Effect: Allow
          Action:
          - lambda:InvokeFunction
          Resource: "*"
        - Sid: LogAccess
          Effect: Allow
          Action:
          - logs:CreateLogGroup
          - logs:CreateLogStream
          - logs:PutLogEvents
          Resource: arn:aws:logs:*:*:*
      Roles:
      - !Ref LambdaRole

Outputs:
  Role:
    Description: The role to be used across the stacks
    Value: !GetAtt LambdaRole.Arn
    Export:
      Name: !Sub ${Environment}-Role

Similarly, the messenger queues are exported in the same way:

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters:  #   Type: String
  Environment:
    Type: String
Resources:
  EmployeeQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub employee-queue-${Environment}
      VisibilityTimeout: 900
      RedrivePolicy:
        deadLetterTargetArn: 
          Fn::GetAtt: EmployeeWorkloadDeadLetterQueue.Arn
        maxReceiveCount: 10

  EmployeeWorkloadDeadLetterQueue: 
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub employee-workload-dead-letter-queue-${Environment}
      MessageRetentionPeriod: 1209600

  SaleQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub sale-queue-${Environment}
      VisibilityTimeout: 900
      RedrivePolicy:
        deadLetterTargetArn: 
          Fn::GetAtt: SaleWorkloadDeadLetterQueue.Arn
        maxReceiveCount: 10

  SaleWorkloadDeadLetterQueue: 
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub sale-workload-dead-letter-queue-${Environment}
      MessageRetentionPeriod: 1209600

Outputs:
  EmployeeQueue:
    Description: The SQS queue that delivers the payloads from branch collector to salesperson collector
    Value: !Ref EmployeeQueue
    Export:
      Name: !Sub ${Environment}-EmployeeQueue
  SaleQueue:
    Description: The SQS queue that delivers the payloads from salesperson collector to sale collector
    Value: !Ref SaleQueue
    Export:
      Name: !Sub ${Environment}-SaleQueue

In the meantime, the stacks importing the resources from others also need a minor change. AWS supports this functionality by providing an intrinsic function [Fn::ImportValue](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-importvalue.html) . Take "Branch Collector" as an example. It involves the IAM role and a messenger queue which aren't created in the same stack. Thus, whenever the resources mentioned above occur, I replace them with the value of the function Fn::ImportValue .

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters:  #   Type: String
  Environment:
    Type: String
Resources:
  BranchCollector:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub branch-collector-${Environment}
      Handler: lambda_function.lambda_handler
      Runtime: python3.8
      CodeUri: ./../branches/
      Description: updating branch info in our DynamoDB table
      MemorySize: 128
      Timeout: 900
      Role: 
        Fn::ImportValue:
          !Sub ${Environment}-Role
      Environment:
        Variables:
          LOGGING_LEVEL: INFO
          APP_ENV: !Ref Environment
          SQS: 
            Fn::ImportValue:
              !Sub ${Environment}-EmployeeQueue
          DB: !Sub branches-${Environment}
      DeadLetterQueue:
        Type: SQS
        TargetArn: 
          Fn::GetAtt: BranchFunctionDeadLetterQueue.Arn
      Events:
        StartScheduledEvent:
          Type: Schedule
          Properties:
            Schedule: rate(1 hour)

  # dead letter queue
  BranchFunctionDeadLetterQueue: 
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Sub branch-function-dead-letter-queue-${Environment}
      MessageRetentionPeriod: 1209600

  # alarms
  BranchErrorAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      ComparisonOperator: GreaterThanOrEqualToThreshold
      Dimensions:
        - Name: FunctionName
          Value: !Ref BranchCollector
      EvaluationPeriods: 1
      MetricName: Errors
      Namespace: AWS/Lambda
      Period: 300
      Statistic: Sum
      Threshold: 1
      AlarmActions: 
        - arn:aws:sns:us-east-1:{id}:{alarm-name}
  BranchDurationAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      ComparisonOperator: GreaterThanOrEqualToThreshold
      Dimensions:
        - Name: FunctionName
          Value: !Ref BranchCollector
      EvaluationPeriods: 1
      MetricName: Duration
      Namespace: AWS/Lambda
      Period: 60
      Statistic: Maximum
      Threshold: 750000
      AlarmActions:
        - arn:aws:sns:us-east-1:{id}:{alarm-name}
  BranchThrottleAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      ComparisonOperator: GreaterThanOrEqualToThreshold
      Dimensions:
        - Name: FunctionName
          Value: !Ref BranchCollector
      EvaluationPeriods: 1
      MetricName: Throttles
      Namespace: AWS/Lambda
      Period: 300
      Statistic: Sum
      Threshold: 1
      AlarmActions:
        - arn:aws:sns:us-east-1:{id}:{alarm-name}

Root Stack

Now that all nested stacks are defined and created, there should be a root stack to integrate the entire infrastructure altogether. Consider the infrastructure that utilizes nested stack style as a hierarchy, the root stack is a parent that all the nested ones belong to (though the nested stacks can be parents to others too). In our case, it's quite as easy as to replace the existing snippets that define individual resources with the references to the nested stack CloudFormation templates in the predefined template.yml .

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters:  #   Type: String
  Environment:
    Type: String
Resources:
  # =========================================================================================
  # IAM ROLES, POLICIES, PERMISSIONS
  # =========================================================================================
  IAM:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/iam.yml
      Parameters: 
        Environment: !Ref Environment
  # =========================================================================================
  # AWS LAMBDA FUNCTIONS
  # ========================================================================================= 
  BranchCollector:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/BranchCollector.yml
      Parameters: 
        Environment: !Ref Environment
    DependsOn: 
      - IAM
      - Queues

  SalespersonCollector:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/SalespersonCollector.yml
      Parameters: 
        Environment: !Ref Environment
    DependsOn: 
      - IAM
      - Queues

  SaleCollector:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/SaleCollector.yml
      Parameters: 
        Environment: !Ref Environment
    DependsOn: 
      - IAM
      - Queues

  # =========================================================================================
  # AWS DynamoDB TABLES
  # ========================================================================================= 
  Tables:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/tables.yml
      Parameters: 
        Environment: !Ref Environment

  # =========================================================================================
  # AWS SQS QUEUES
  # ========================================================================================= 
  Queues:
    Type: AWS::CloudFormation::Stack
    Properties:
      TemplateURL: ./templates/queues.yml
      Parameters: 
        Environment: !Ref Environment

From here, it's safe to say that the CloudFormation upgrade is complete!

But wait, does it mean that we need to rebuild and redeploy it? Sadly speaking, yeah, and that's what the next section is going to address.

Auto Deployment

Since I always store everything I code in the GitHub repo, I am going to leverage GitHub Actions. GitHub Actions is a feature of GitHub to automate the workflows stored on any GitHub repos so as to seamlessly support building, testing, and deployment. There are pre-defined workflows though, owing to the uniqueness and the complexity of our mission, we need to customize one that meets our needs.

Generally speaking, the workflow should mimic what we do manually in the cloud environment during the software build & deployment stage. Namely, it includes the following steps:

  1. Assign values to the parameters, e.g.environment
  2. Set up AWS configuration, e.g. inputting the default AWS credentials and service region code
  3. Install SAM CLI
  4. Grant the AWS role access to the permissions of the services/resources that will be utilized
  5. Create a S3 bucket as the Cloudformation storage location
  6. Clone the common code base to every independent directory
  7. Build & deploy the Serverless Application Model (SAM)

To wrap them all up in a workflow file, it will be like:

name: A workflow that automates the data pipeline deployment
on: 
  workflow_dispatch:
  push:
    branches:
      - main
    paths-ignore: 
      - '.gitignore'
      - '*.png'
      - 'README.md'
  pull_request:
    paths-ignore:
      - '.gitignore'
      - '*.png'
      - 'README.md'

jobs:
  deploy:
    container: 
      image: lambci/lambda:build-python3.8
    runs-on: ubuntu-latest
    env:
      BUCKET_NAME: your-bucket-name
    steps:
      - name: Set Environment
        id: setenv
        run: |
          echo "Running on branch ${{ github.ref }}"
          if [ "${{ github.ref }}" = "refs/heads/main" ]; then
            echo "::set-output name=env_name::prod"
          else
             echo "::set-output name=env_name::dev"
          fi
      - name: Set Repo
        id: setrepo
        run: |
          echo "::set-output name=repo_name::${{ github.event.repository.name }}"
      - name: Set Branch
        id: setbranch
        run: |
          echo "::set-output name=branch_name::${{ github.head_ref}}"
      - name: Checkout
        uses: actions/checkout@v2
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{secrets.AWS_ACCESS_KEY_ID}}
          aws-secret-access-key: ${{secrets.AWS_SECRET_ACCESS_KEY}}
          aws-region: us-east-1
          # role-to-assume: arn:aws:iam::807324965916:role/cdk-hnb659fds-deploy-role-807324965916-us-east-1
          role-duration-seconds: 900
      - name: Install sam cli
        run: 'pip3 install aws-sam-cli'
      - name: Complete policies
        run: |
          aws iam attach-user-policy 
          --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess 
          --policy-arn arn:aws:iam::aws:policy/CloudWatchEventsFullAccess 
          --policy-arn arn:aws:iam::aws:policy/AWSLambda_FullAccess 
          --policy-arn arn:aws:iam::aws:policy/IAMFullAccess 
          --policy-arn arn:aws:iam::aws:policy/AWSCloudFormationFullAccess 
          --policy-arn arn:aws:iam::aws:policy/AmazonSQSFullAccess 
          --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess 
          --user-name Memphis
      - name: Create S3 Bucket
        run: |
          if ! aws s3api head-bucket --bucket "${{env.BUCKET_NAME}}" 2>/dev/null; then
            aws s3api create-bucket --bucket "${{env.BUCKET_NAME}}"
          else
            echo "Bucket ${{env.BUCKET_NAME}} already exists"
          fi

      - name: Copy utils.py
        run: 'for d in */; do cp utils.py "$d"; done'
      - name: build
        run: sam build && sam package --s3-bucket ${{env.BUCKET_NAME}} --s3-prefix "${{steps.setrepo.outputs.repo_name}}/${{steps.setbranch.outputs.branch_name}}/${{steps.setenv.outputs.env_name}}" --output-template-file packaged.yaml --region us-east-1 || { echo 'my_command failed' ; exit 1; }
      - name: deploy
        run: sam deploy --template-file packaged.yaml --s3-bucket ${{env.BUCKET_NAME}} --s3-prefix "${{steps.setrepo.outputs.repo_name}}/${{steps.setbranch.outputs.branch_name}}/${{steps.setenv.outputs.env_name}}" --stack-name "${{steps.setrepo.outputs.repo_name}}-${{steps.setenv.outputs.env_name}}-stack" --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND --region us-east-1 --no-fail-on-empty-changeset --parameter-overrides Environment=${{steps.setenv.outputs.env_name}} || { echo 'my_command failed' ; exit 1; }

Here I include one job called "deploy". It's containerized by the image lambci/lambda:build-python3.8 and run on the Ubuntu System, which is defined within this job. A variable called BUCKET_NAME is created to store the string value of the name that we are going to name the S3 bucket to be built.

Step 1 is to create a new parameter env_name which stores the value of the working environment name. It depends on the branch by which the workflow is triggered: unless the workflow is running on the main branch, it's called prod ; otherwise, it's dev .

Step 2 records the repository name in another output value. This is going to be part of the prefix of the location of the template file in the S3 bucket.

Step 3 is similar to step 2, getting the name of the branch to be used later as another part of the prefix.

Step 4 simply uses actions/checkout, a package to check out the current repo under a different workspace so the workflow is able to access it.

Step 5 also uses a package. With the help of aws-actions/configure-aws-credentials@v1, we can effortlessly configure the AWS working environment by only providing the AWS access key ID, AWS secret access key, and region. Note that it's recommended to keep your sensitive credentials (AWS access key ID and AWS secret access key) in secret.

Step 6~11 is executed in the bash command line. In this order, the rest of the workflow installs SAM CLI; then attaches all necessary policy ARNs to the IAM user; creates an S3 bucket with the bucket name provided as an environmental parameter, if it doesn't exist; last but not least, a CloudFormation stack will be built, packaged and deployed by SAM commands.

Using workflows, developers will be saved from constantly rerunning part of, if not all of, the steps personally. Instead, as long as a pull request is made, any new commit will trigger a new workflow run with the codes on a non-main branch; when the pull request is merged, the most recent workflow will also be triggered along with the resources on the main branch.

Before you leave

Thanks for making it here. In this post, I talked about one of the major upgrades to improve the manageability and reusability of the codes, while introducing a solution to automating the stack building and deployment. If you are interested in the details, feel free to check out my repo!

By the way, the approach that I organized the resources and artifacts in the stack can also be a good start to kicking off microservices. If you are interested in microservices, don't forget to subscribe to follow my next articles!

Tags: AWS Data Engineering github-actions Nested Stack Project Management

Comment