Amazon EventBridge Pipes
Amazon EventBridge とは
AWSサービス、カスタムイベント、SaaSからのイベント連携のためのマネージドサービス
ノーコード/ローコードでのイベント連携を実現
- イベントソース
- イベントバス
- ルール
- ターゲット
JSON形式で届いたイベントをルールによってフィルターして次に受け渡すことができる
ターゲットはAWSだとStep FunctionsやLambda関数など、その他のサービスにも連携可能
EventBridge Pipesとは
sourceからのデータをフィルターしてターゲットに受け渡すことは同じだが、enrichmentフェーズの追加されている
これによりAPIをHookしてイベントの内部情報を拡張することが可能
イベントバスとパイプの違い
パイプにはイベントバスがなく、イベントバスを使わずに送受信が可能
イベントバスとの使い分けとしては
イベントバス:多数のイベントソースと多数の送信先(Many publishers and many counsumers)
パイプ:一つのイベントソースと一つの送信先(Single publisher to single consumer)
Input Transformによって簡単なデータ操作であればテンプレートを使ってデータの書き換えなども可能
サービス間連携の課題
これまでは連携するためだけのコード(Integration (glue) code)が必要で下記の考慮が必要だった
- サービスごとに異なる仕様や実装
- エラーハンドリング
- 配信順序の維持
- 認証、認可、流量制御
- パフォーマンステスト
- デプロイ
- 運用監視
上記を EventBridge Pipes で解決できる
EventBridge Pipes を構成する要素
Source |
---|
Amazon SQS キュー |
Amazon Kinesis Data Streams |
Amazon DynamoDB Streams |
Amazon MSK トピック |
Amazon MQ |
セルフマネージドApache Kafka ストリーム |
Filtering(optional) |
---|
Enrichment(optional) |
---|
Amazon Eventridge(API Destinations) |
AWS Step Functions(同期) |
Amazon API Gateway |
AWS Lamnbda(同期) |
Input Transformer(optional) |
---|
Target |
---|
Amazon Eventridge(イベントバス、API Destinations) |
AWS Batch ジョブキュー |
Amazon Redshift Data API |
Amazon Simple Notification Service(SNS)※標準トピックのみ |
Amazon Elastic Container Service(ECS)タスク |
Amazon SageMaker Pipeleines |
Amazon Kinesis Data Streams |
Amazon Kinesis Data Firehose |
AWS Step Functions(標準:同期、Express:同期 or 非同期) |
Amazon API Gateway |
Amazon SQS |
AWS Lamnbda(同期 or 非同期) |
Amazon CloudWatch ロググループ |
Amazon Inspector 評価テンプレート |
EventBridge Pipes の主な機能
イベントのフィルタリング | パターンマッチングによるフィルターを追加して、ターゲットに渡す必要なないイベントを除外できる。除外されたイベントは課金対象外、フィルターではJOSNバスが利用可能 |
バッチ処理 | イベントソースから受診したイベントを、ターゲット/エンリッチメントに対してバッチで送信できる。バッチの利用可否や最大バッチサイズは後続サービスの仕様に依存する。 |
順序保証 | イベントソースから受診したイベントの順序を維持したまま動機でターゲットを呼び出す。順序保証がないソースからのイベントの場合は、非同期でターゲットを呼び出す |
同時実行による高いスループット | バックログに応じて自動でスケールアップ/ダウンを行う。サービスごとに最大同時実行数は異なる(例:Amazon SQS は 1,250) |
イベントデータの拡張 | AWS Lambda,AWS Step Funcitons,API Gateway,API Destinationsを呼び出して追加のデータを取得できる。各サービスは同期的に呼び出し、最大レスポンスサイズは6MBに制限される |
ユースケース
- SQS→Step Functions
キュー内のメッセージに対して、複雑な一連の処理(ワークフロー)を実行 - Kinesis Data Steamsの分離
多重化されたデータストリームをドメインに応じて複数のデータストリームに分割 - Kafka→EventBridge API Destinations
セルフマネージドKafkaクラスターとAWSサービスを接続し、コードを書かずにHTTP(s) APIを実行 - DynamoDB→EventBridge
DynamoDBレコードの変更をノーコードで他AWSサービスに連携
※EventBridge Pipesからイベントバスを通すことで複数に連携可能
バッチの設定について
- ソースからの取得およびターゲットの呼び出しでバッチをサポート
- 複数のイベントがJSONレコードの配列として渡される
- バッチの利用可否や最大バッチサイズは利用する各サービスに依存
- エンリッチメントについてはLambdaおよびStep Functionsでサポート
- エンリッチメントおよびターゲットの呼び出しは、各サービスのバッチAPIにマッピングされる(※LambdaやStep Functionsの場合は1人区ストの入力にJSON配列全体が渡される)
- エンリッチメントはレスポンスとしてJSONの配列を返すようにする
- ターゲットがサポートするより大きいバッチサイズを指定することはできない
- バッチ内の一部のレコードの処理に失敗した場合の挙動
- SQS、Kinesis、DynamoDBなどのストリームがソースの場合、失敗したレコードに対して自動的にリトライを行う
- エンリッチメントでの失敗の場合、部分的なリトライは行われない
- ターゲットがLambdaまたはStep Functionsの場合、batchItemFailuresにリトライを行うイベントIDを含めてレスポンスを返すことで、部分的なリトライの実行が可能
ターゲットごとの最大バッチサイズ
ターゲットごとに最大バッチサイズが異なる
LambdaおよびStep Functionsについては、バッチに含まれるペイロードの合計サイズが制限を超過しないか注意する必要がある
Lambda:6MB,Step Functions:262144 Bytes
スループットと同時実行について
- STARTED状態のパイプはソースからのイベントを継続的にポーリングし、利用可能なバックログとバッチの設定に応じて自動的にスケーリングを行う
- 単一のパイプは、デフォルトで最大同時実行数までスケールするが、一部上限緩和の申請は可能
- パイプの実行時間は最大5分まで(※エンリッチメント泳ぎターゲットを含む)
実行ロールの設定
- パイプごとに実行IAMロールを指定する必要がある(コンソールから作れば付与される)
- ソースへのアクセス許可を付与
- 必要に応じてエンリッチメントおよびターゲットの呼び出し許可を付与
ログの設定
- 各実行ステップでログレコードを生成
- ログレベルを選択可能(OFF、ERROR、INFO、TRACE)
- ログの配信先としてCloudWatch,Kinesis Data Firehose,S3をサポート
- CloudWatchはログレコードの最大サイズ256KBでフィールドごとに自動で切り捨て
- 実行データをログに含めるか選択可能(機密データがログに含まれる可能性がある点に注意)
料金
- パイプが処理するリクエスト数に応じて課金
- 100万リクエストごとに$0.50
- フィルタリングで除外されたイベントは課金対象外
- ペイロードサイズ64KBを上限として1リクエストとして扱う(例:256KBなら4リクエスト)
クォータ
上限緩和申請は可能
- アカウント全体で作成できるパイプの上限:1,000
- アカウント全体でのパイプの最大同時実行数:1,000(USリージョンは3,000)