AWS CDK を使用してタスクを並列実行するステートマシンを構築する

AWS

はじめに

AWS CDKを使用し、待機状態や並列タスクの実行、エラーハンドリングなどの機能を含むステートマシンを作成する方法を本記事で解説します。本記事では TypeScript を使用しています。

環境

本記事では以下のバージョンの AWS CDK を使用しています。

$ cdk version
2.80.0 (build bbdb16a)

ステートマシンの作成

ステートマシンの定義には以下の機能を含めます。

  • Waitステートを使用して実行時に入力された日時まで待機する
  • Parallelステートを使用して複数のタスクを並列実行する
  • 並列実行されるタスクのうち1つでも失敗すると他のタスクも停止しますが、これを防ぐためにエラーハンドリングを追加し、失敗した場合でもステートマシンが正常終了するようにします
  • しかし、並列タスクが失敗した場合にステートマシンが成功状態にならないように、エラーをキャッチした際に特定の文字列を出力し、それを基に後続のステートで分岐を行います

CDK のコード

以下に CDK のコードを示します。

import * as cdk from 'aws-cdk-lib';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import * as stepfunctions_tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class StepfunctionsDemoStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const success = this.defineSuccessState('success');
    const failure = this.defineFailureState('Fail');
    const waitTask = this.defineWaitState('wait');

    const childStateMachine1 = this.defineChildStateMachine('childStateMachine1', success);
    const childStateMachine2 = this.defineChildStateMachine('childStateMachine2', failure);

    const task1 = this.defineChildTask('childTask1', childStateMachine1);
    const task2 = this.defineChildTask('childTask2', childStateMachine2);

    const catchErrorState1 = this.defineCatchErrorState('catchErrorState1', 'task1');
    const catchErrorState2 = this.defineCatchErrorState('catchErrorState2', 'task2');

    const parallel = this.defineParallelState('All jobs', [task1, catchErrorState1], [task2, catchErrorState2]);

    const getResult = this.defineGetResultState('getResults');
    const parentSuccess = this.defineSuccessState('parentSuccess');
    const parentFailure = this.defineFailureState('parentFailure');
    const checkResult = this.defineChoiceState('checkResult', parentFailure, parentSuccess);

    this.defineParentStateMachine('parentStateMachine', waitTask, parallel, getResult, checkResult);
  }

  private defineSuccessState(id: string) {
    return new stepfunctions.Succeed(this, id);
  }

  private defineFailureState(id: string) {
    return new stepfunctions.Fail(this, id, {
      error: 'WorkflowFailure',
      cause: "Something went wrong",
    });
  }

  private defineWaitState(id: string) {
    return new stepfunctions.Wait(this, id, {
      time: stepfunctions.WaitTime.timestampPath('$.waitSeconds'),
    });
  }

  private defineChildStateMachine(id: string, definition: stepfunctions.IChainable) {
    return new stepfunctions.StateMachine(this, id, {
      stateMachineName: id,
      definition: definition,
    });
  }

  private defineChildTask(id: string, stateMachine: stepfunctions.IStateMachine) {
    return new stepfunctions_tasks.StepFunctionsStartExecution(this, id, {
      stateMachine: stateMachine,
      integrationPattern: stepfunctions.IntegrationPattern.RUN_JOB,
      input: stepfunctions.TaskInput.fromObject({
        token: stepfunctions.JsonPath.taskToken,
        foo: 'bar',
      }),
    });
  }

  private defineCatchErrorState(id: string, error: string) {
    return new stepfunctions.Pass(this, id, {
      result: stepfunctions.Result.fromObject({ error: error }),
    });
  }

  private defineParallelState(id: string, task1: [stepfunctions.TaskStateBase, stepfunctions.IChainable], task2: [stepfunctions.TaskStateBase, stepfunctions.IChainable]) {
    return new stepfunctions.Parallel(this, id)
      .branch(task1[0].addCatch(task1[1], { errors: ['States.ALL'] }))
      .branch(task2[0].addCatch(task2[1], { errors: ['States.ALL'] }));
  }

  private defineGetResultState(id: string) {
    return new stepfunctions.Pass(this, id, {
      parameters: { 'result.$': 'States.JsonToString($)' }
    });
  }

  private defineChoiceState(id: string, failureState: stepfunctions.IChainable, successState: stepfunctions.IChainable) {
    return new stepfunctions.Choice(this, id)
      .when(stepfunctions.Condition.stringMatches('$.result', '*error*'), failureState)
      .otherwise(successState);
  }

  private defineParentStateMachine(id: string, waitState: stepfunctions.INextable, parallelState: stepfunctions.IChainable, getResultState: stepfunctions.IChainable, choiceState: stepfunctions.IChainable) {
    return new stepfunctions.StateMachine(this, id, {
      stateMachineName: id,
      definition: waitState.next(parallelState).next(getResultState).next(choiceState)
    });
  }
}

作成されたステートマシンの定義

CDK によって実際に作成されたステートマシンの定義はこちらです。

{
  "StartAt": "wait",
  "States": {
    "wait": {
      "Type": "Wait",
      "TimestampPath": "$.waitSeconds",
      "Next": "All jobs"
    },
    "All jobs": {
      "Type": "Parallel",
      "Next": "getResults",
      "Branches": [
        {
          "StartAt": "childTask1",
          "States": {
            "childTask1": {
              "End": true,
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "catchErrorState1"
                }
              ],
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "Input": {
                  "token.$": "$$.Task.Token",
                  "foo": "bar"
                },
                "StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine1"
              }
            },
            "catchErrorState1": {
              "Type": "Pass",
              "Result": {
                "error": "task1"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "childTask2",
          "States": {
            "childTask2": {
              "End": true,
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "catchErrorState2"
                }
              ],
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "Input": {
                  "token.$": "$$.Task.Token",
                  "foo": "bar"
                },
                "StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine2"
              }
            },
            "catchErrorState2": {
              "Type": "Pass",
              "Result": {
                "error": "task2"
              },
              "End": true
            }
          }
        }
      ]
    },
    "getResults": {
      "Type": "Pass",
      "Parameters": {
        "result.$": "States.JsonToString($)"
      },
      "Next": "checkResult"
    },
    "checkResult": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.result",
          "StringMatches": "*error*",
          "Next": "parentFailure"
        }
      ],
      "Default": "parentSuccess"
    },
    "parentSuccess": {
      "Type": "Succeed"
    },
    "parentFailure": {
      "Type": "Fail",
      "Error": "WorkflowFailure",
      "Cause": "Something went wrong"
    }
  }
}

おわりに

本記事では、AWS CDKを使用して、複雑なステートマシンを簡単に作成する方法を説明しました。特定時刻までの待機、各タスクの並列実行、エラーハンドリング、タスクの出力に基づく条件分岐など、実際の業務で頻繁に必要とされる機能を網羅しています。
この記事がどなたかの参考になれば幸いです。

参考

コメント