AWSのLambda、SNS、SQS、S3でサーバレスのファンアウトを実装しよう!

eyecatch-aws-fan-out
目次

サーバーレスとは?

概要

サーバーレスとは開発者がインフラストラクチャの管理を意識することなくコードを実行できる仕組みです。この仕組みでは、サーバーのプロビジョニング、スケーリング、管理をクラウドプロバイダーが全て自動で行います。開発者はアプリケーションのロジックに集中でき、必要なリソースが必要なときに自動的に提供されます。

AWSのサーバーレスアーキテクチャは、柔軟性とスケーラビリティが求められるアプリケーションに適しており、特にAWSにおけるサーバーレスと言った場合はAWS Lambdaを中心としたサービス群を指すことが多いです。関連するサービスとしてはSQS、SNS、S3、API Gateway、Event Bridge、Step Functionsなどがあります。詳細についてはこちらの公式ドキュメントをご参照ください。

ファンアウト

今回構築するアーキテクチャはSNSとSQSを連携させたファンアウト(fan-out)と呼ばれる構成になります。ちなみにファンアウトは四方八方に散るという意味があります。

SQSは複数のコンシューマにメッセージを送ることができないため、SNSを組み合わせることでSNSから複数のSQSにメッセージを配信し分散処理を実行することができます。例えばあるオンライン注文の際に同時にいくつもの注文を処理する場合に利用することが想定されます。

AWS, Amazon の一般的なSNSシナリオ

また下記の資料ではSQSとSNS+SQSの組み合わせが比較されており、わかりやすいと思いましたので参考として引用します。

AWS, AWS Black Belt Online Seminar Amazon Simple Queue Service P41

今回のアーキテクチャ構成

今回構築するアーキテクチャ構成を下図に示します。

まずS3バケットに画像ファイルをアップロードし、そのアップロードイベントが発火します。そして上記で解説したファンアウトが動作し、SNSから2つのSQSへと処理が流れていきます。各SQSに対応したLambdaで画像の処理が行われ、最終的にS3バケットに処理済み画像が保存する流れとなります。

サーバーレスアプリ構築

リソース作成

SNS

  • マネジメントコンソールからSNSを検索しクリック
  • 左メニューのトピックから「トピックの作成」をクリック
    • タイプ:「スタンダード」 ※FIFOを選択すると後続の手順で失敗する
  • 「トピックの作成」をクリック
  • 「アクセスポリシー – オプション」で下記のアクセスポリシーを設定
    • SNS_ARNにはトピックのARNを、SNS_OWNERはトピックの所有者を記述
    • このポリシーを設定することでS3からSNSにメッセージを発行
{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish"
      ],
      "Resource": "SNS_ARN",
      "Condition": {
        "StringEquals": {
          "AWS:SourceAccount": "SNS_OWNER"
        }
      }
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SNS:Publish",
      "Resource": "SNS_ARN",
      "Condition": {
        "StringEquals": {
          "AWS:SourceAccount": "SNS_OWNER"
        }
      }
    }
  ]
}

SQS

SQSではファンアウトのためにキューを2つ作成

  • マネジメントコンソールからSQSを検索しクリック
  • 「キューを作成」を選択
    • タイプ:「標準」
  • キューの詳細ページで「SNSサブスクリプション」をクリック
  • 「Amazon SNSトピックにサブスクライブ」を選択し、上記で作成したSNSを選択

S3

  • マネジメントコンソールからS3を検索しクリック
  • 【バケットの作成**********::】
  • 作成したバケットの「プロパティ」から「イベント通知」にて「イベント通知を作成」を選択
    • 一般的な設定
    • イベントタイプ
      • 「全てのオブジェクト作成イベント」を選択
    • 送信先
      • 送信先:「SNSトピック」
      • SNSトピックを特定:ドロップダウンから上記で作成したSNSを指定

Lambda

ここまでの過程でSQSが2つ作成されているので、それぞれに対応するよう2つのLambdaを作成

  • マネジメントコンソールからLambdaを検索しクリック
  • 「関数の作成」をクリック
  • 以下を設定
    • 一から作成
    • ランタイムは「python3.12」
    • 「関数の作成」をクリック
  • 「トリガーを追加」をクリック
    • 「トリガーの設定」でSQSを選択
    • SQSキューで上記で作成したSQSを選択
    • 「追加」をクリック
  • 「設定」タブ⇒「アクセス権限」をクリック
    • ロール名のリンクをクリックしIAMの画面に遷移
    • 対象のロールに下記のロールを追加
      • AmazonS3FullAccess
      • AmazonSQSFullAccess
      • AWSLambda_FullAccess
  • 「コード」タブをクリック
    • 下記のコードを記述
    • コードの内容はアップロードファイルをコピーし、_copy1を付与する処理
    • 2つ目のLambdaを作成する場合は33、35行目の_copy1を_copy2に変更
    • 「Deploy」をクリック
import boto3
import json
import urllib.parse

s3 = boto3.client('s3')

def lambda_handler(event, context):
    # SQSからのイベントメッセージを解析
    for record in event['Records']:
        sns_message = json.loads(record['body'])
        s3_event = json.loads(sns_message['Message'])

        # S3イベントからバケット名とファイル名を取得
        source_bucket = s3_event['Records'][0]['s3']['bucket']['name']
        source_key = urllib.parse.unquote_plus(s3_event['Records'][0]['s3']['object']['key'])

        # コピー先のファイル名を生成
        destination_key = generate_copy_key(source_key)

        try:
            # ファイルをコピー
            copy_source = {'Bucket': source_bucket, 'Key': source_key}
            s3.copy_object(CopySource=copy_source, Bucket=source_bucket, Key=destination_key)
            print(f"File copied from {source_key} to {destination_key}")
        except Exception as e:
            print(f"Error copying file: {str(e)}")
            raise e

def generate_copy_key(source_key):
    # ファイル名と拡張子に分ける
    if '.' in source_key:
        base_name, extension = source_key.rsplit('.', 1)
        destination_key = f"{base_name}_copy1.{extension}"
    else:
        destination_key = f"{source_key}_copy1"
    
    return destination_key

実行結果確認

S3へのファイルアップロード

  • 動作確認用に任意のファイルを用意し、上記で作成したS3バケットにアップロードする

S3とCloudWatchの確認

  • S3の期待動作
    • アップロードしたファイルと同階層に2つのコピーファイルが作成されていること
    • コピーファイル名に_copyXが付与されていること
  • CloudWatchの期待動作
    • Lambdaの画面で「モニタリング」⇒「CloudWatchログを表示」をクリック
    • ログにErrorが出力されていないこと

まとめ

本記事ではLambda、SNS、SQS、S3を使ったサーバレスのファンアウト処理を実装しました。AWSのサービスだけで並列処理を実装したい場合に利用できる構成ですので機会があったらぜひ試してみてくださいね!

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

Hack Luck Labの管理人hakula(ハクラ)です。2012年にSIerに新卒入社し、SE、新規事業、情シスを担当。その後、ITコンサルを経て、現在はバックエンドエンジニア。過去にはC#、SQL Server、JavaScriptで開発を行い、現在はPython、Rest Framework、Postgresql、Linux、AWSなどを使用しています。ノーコードツールやDX関連も興味あり。「技術は価値を生むために使う」ことが信条で、顧客や組織への貢献を重視しています。

目次