Amazon EventBridge Pipes

出典:AWS Blackbelt

Amazon EventBridge とは

AWSサービス、カスタムイベント、SaaSからのイベント連携のためのマネージドサービス
ノーコード/ローコードでのイベント連携を実現

  • イベントソース
  • イベントバス
  • ルール
  • ターゲット

JSON形式で届いたイベントをルールによってフィルターして次に受け渡すことができる
ターゲットはAWSだとStep FunctionsやLambda関数など、その他のサービスにも連携可能

EventBridge Pipesとは

sourceからのデータをフィルターしてターゲットに受け渡すことは同じだが、enrichmentフェーズの追加されている
これによりAPIをHookしてイベントの内部情報を拡張することが可能

イベントバスとパイプの違い

パイプにはイベントバスがなく、イベントバスを使わずに送受信が可能
イベントバスとの使い分けとしては
イベントバス:多数のイベントソースと多数の送信先(Many publishers and many counsumers)
パイプ:一つのイベントソースと一つの送信先(Single publisher to single consumer)
Input Transformによって簡単なデータ操作であればテンプレートを使ってデータの書き換えなども可能

サービス間連携の課題

これまでは連携するためだけのコード(Integration (glue) code)が必要で下記の考慮が必要だった

  • サービスごとに異なる仕様や実装
  • エラーハンドリング
  • 配信順序の維持
  • 認証、認可、流量制御
  • パフォーマンステスト
  • デプロイ
  • 運用監視

上記を EventBridge Pipes で解決できる

EventBridge Pipes を構成する要素

Source
Amazon SQS キュー
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
Amazon MSK トピック
Amazon MQ
セルフマネージドApache Kafka ストリーム
Filtering(optional)
Enrichment(optional)
Amazon Eventridge(API Destinations)
AWS Step Functions(同期)
Amazon API Gateway
AWS Lamnbda(同期)
Input Transformer(optional)
Target
Amazon Eventridge(イベントバス、API Destinations)
AWS Batch ジョブキュー
Amazon Redshift Data API
Amazon Simple Notification Service(SNS)※標準トピックのみ
Amazon Elastic Container Service(ECS)タスク
Amazon SageMaker Pipeleines
Amazon Kinesis Data Streams
Amazon Kinesis Data Firehose
AWS Step Functions(標準:同期、Express:同期 or 非同期)
Amazon API Gateway
Amazon SQS
AWS Lamnbda(同期 or 非同期)
Amazon CloudWatch ロググループ
Amazon Inspector 評価テンプレート

EventBridge Pipes  の主な機能

イベントのフィルタリング パターンマッチングによるフィルターを追加して、ターゲットに渡す必要なないイベントを除外できる。除外されたイベントは課金対象外、フィルターではJOSNバスが利用可能
バッチ処理 イベントソースから受診したイベントを、ターゲット/エンリッチメントに対してバッチで送信できる。バッチの利用可否や最大バッチサイズは後続サービスの仕様に依存する。
順序保証 イベントソースから受診したイベントの順序を維持したまま動機でターゲットを呼び出す。順序保証がないソースからのイベントの場合は、非同期でターゲットを呼び出す
同時実行による高いスループット バックログに応じて自動でスケールアップ/ダウンを行う。サービスごとに最大同時実行数は異なる(例:Amazon SQS は 1,250)
イベントデータの拡張 AWS Lambda,AWS Step Funcitons,API Gateway,API Destinationsを呼び出して追加のデータを取得できる。各サービスは同期的に呼び出し、最大レスポンスサイズは6MBに制限される

ユースケース

  1. SQS→Step Functions
    キュー内のメッセージに対して、複雑な一連の処理(ワークフロー)を実行
  2. Kinesis Data Steamsの分離
    多重化されたデータストリームをドメインに応じて複数のデータストリームに分割
  3. Kafka→EventBridge API Destinations
    セルフマネージドKafkaクラスターとAWSサービスを接続し、コードを書かずにHTTP(s) APIを実行
  4. DynamoDB→EventBridge
    DynamoDBレコードの変更をノーコードで他AWSサービスに連携
    ※EventBridge Pipesからイベントバスを通すことで複数に連携可能

バッチの設定について

  • ソースからの取得およびターゲットの呼び出しでバッチをサポート
  • 複数のイベントがJSONレコードの配列として渡される
  • バッチの利用可否や最大バッチサイズは利用する各サービスに依存
  • エンリッチメントについてはLambdaおよびStep Functionsでサポート
  • エンリッチメントおよびターゲットの呼び出しは、各サービスのバッチAPIにマッピングされる(※LambdaやStep Functionsの場合は1人区ストの入力にJSON配列全体が渡される)
  • エンリッチメントはレスポンスとしてJSONの配列を返すようにする
  • ターゲットがサポートするより大きいバッチサイズを指定することはできない
  • バッチ内の一部のレコードの処理に失敗した場合の挙動
    • SQS、Kinesis、DynamoDBなどのストリームがソースの場合、失敗したレコードに対して自動的にリトライを行う
    • エンリッチメントでの失敗の場合、部分的なリトライは行われない
    • ターゲットがLambdaまたはStep Functionsの場合、batchItemFailuresにリトライを行うイベントIDを含めてレスポンスを返すことで、部分的なリトライの実行が可能

ターゲットごとの最大バッチサイズ

ターゲットごとに最大バッチサイズが異なる
LambdaおよびStep Functionsについては、バッチに含まれるペイロードの合計サイズが制限を超過しないか注意する必要がある
Lambda:6MB,Step Functions:262144 Bytes

スループットと同時実行について

  • STARTED状態のパイプはソースからのイベントを継続的にポーリングし、利用可能なバックログとバッチの設定に応じて自動的にスケーリングを行う
  • 単一のパイプは、デフォルトで最大同時実行数までスケールするが、一部上限緩和の申請は可能
  • パイプの実行時間は最大5分まで(※エンリッチメント泳ぎターゲットを含む)

実行ロールの設定

  • パイプごとに実行IAMロールを指定する必要がある(コンソールから作れば付与される)
  • ソースへのアクセス許可を付与
  • 必要に応じてエンリッチメントおよびターゲットの呼び出し許可を付与

ログの設定

  • 各実行ステップでログレコードを生成
  • ログレベルを選択可能(OFF、ERROR、INFO、TRACE)
  • ログの配信先としてCloudWatch,Kinesis Data Firehose,S3をサポート
  • CloudWatchはログレコードの最大サイズ256KBでフィールドごとに自動で切り捨て
  • 実行データをログに含めるか選択可能(機密データがログに含まれる可能性がある点に注意)

料金

  • パイプが処理するリクエスト数に応じて課金
  • 100万リクエストごとに$0.50
  • フィルタリングで除外されたイベントは課金対象外
  • ペイロードサイズ64KBを上限として1リクエストとして扱う(例:256KBなら4リクエスト)

クォータ

上限緩和申請は可能

  • アカウント全体で作成できるパイプの上限:1,000
  • アカウント全体でのパイプの最大同時実行数:1,000(USリージョンは3,000)