はじめに
AWS CDKを使用し、待機状態や並列タスクの実行、エラーハンドリングなどの機能を含むステートマシンを作成する方法を本記事で解説します。本記事では TypeScript を使用しています。
環境
本記事では以下のバージョンの AWS CDK を使用しています。
$ cdk version
2.80.0 (build bbdb16a)
ステートマシンの作成
ステートマシンの定義には以下の機能を含めます。
- Waitステートを使用して実行時に入力された日時まで待機する
- Parallelステートを使用して複数のタスクを並列実行する
- 並列実行されるタスクのうち1つでも失敗すると他のタスクも停止しますが、これを防ぐためにエラーハンドリングを追加し、失敗した場合でもステートマシンが正常終了するようにします
- しかし、並列タスクが失敗した場合にステートマシンが成功状態にならないように、エラーをキャッチした際に特定の文字列を出力し、それを基に後続のステートで分岐を行います
CDK のコード
以下に CDK のコードを示します。
import * as cdk from 'aws-cdk-lib';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import * as stepfunctions_tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
export class StepfunctionsDemoStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const success = this.defineSuccessState('success');
const failure = this.defineFailureState('Fail');
const waitTask = this.defineWaitState('wait');
const childStateMachine1 = this.defineChildStateMachine('childStateMachine1', success);
const childStateMachine2 = this.defineChildStateMachine('childStateMachine2', failure);
const task1 = this.defineChildTask('childTask1', childStateMachine1);
const task2 = this.defineChildTask('childTask2', childStateMachine2);
const catchErrorState1 = this.defineCatchErrorState('catchErrorState1', 'task1');
const catchErrorState2 = this.defineCatchErrorState('catchErrorState2', 'task2');
const parallel = this.defineParallelState('All jobs', [task1, catchErrorState1], [task2, catchErrorState2]);
const getResult = this.defineGetResultState('getResults');
const parentSuccess = this.defineSuccessState('parentSuccess');
const parentFailure = this.defineFailureState('parentFailure');
const checkResult = this.defineChoiceState('checkResult', parentFailure, parentSuccess);
this.defineParentStateMachine('parentStateMachine', waitTask, parallel, getResult, checkResult);
}
private defineSuccessState(id: string) {
return new stepfunctions.Succeed(this, id);
}
private defineFailureState(id: string) {
return new stepfunctions.Fail(this, id, {
error: 'WorkflowFailure',
cause: "Something went wrong",
});
}
private defineWaitState(id: string) {
return new stepfunctions.Wait(this, id, {
time: stepfunctions.WaitTime.timestampPath('$.waitSeconds'),
});
}
private defineChildStateMachine(id: string, definition: stepfunctions.IChainable) {
return new stepfunctions.StateMachine(this, id, {
stateMachineName: id,
definition: definition,
});
}
private defineChildTask(id: string, stateMachine: stepfunctions.IStateMachine) {
return new stepfunctions_tasks.StepFunctionsStartExecution(this, id, {
stateMachine: stateMachine,
integrationPattern: stepfunctions.IntegrationPattern.RUN_JOB,
input: stepfunctions.TaskInput.fromObject({
token: stepfunctions.JsonPath.taskToken,
foo: 'bar',
}),
});
}
private defineCatchErrorState(id: string, error: string) {
return new stepfunctions.Pass(this, id, {
result: stepfunctions.Result.fromObject({ error: error }),
});
}
private defineParallelState(id: string, task1: [stepfunctions.TaskStateBase, stepfunctions.IChainable], task2: [stepfunctions.TaskStateBase, stepfunctions.IChainable]) {
return new stepfunctions.Parallel(this, id)
.branch(task1[0].addCatch(task1[1], { errors: ['States.ALL'] }))
.branch(task2[0].addCatch(task2[1], { errors: ['States.ALL'] }));
}
private defineGetResultState(id: string) {
return new stepfunctions.Pass(this, id, {
parameters: { 'result.$': 'States.JsonToString($)' }
});
}
private defineChoiceState(id: string, failureState: stepfunctions.IChainable, successState: stepfunctions.IChainable) {
return new stepfunctions.Choice(this, id)
.when(stepfunctions.Condition.stringMatches('$.result', '*error*'), failureState)
.otherwise(successState);
}
private defineParentStateMachine(id: string, waitState: stepfunctions.INextable, parallelState: stepfunctions.IChainable, getResultState: stepfunctions.IChainable, choiceState: stepfunctions.IChainable) {
return new stepfunctions.StateMachine(this, id, {
stateMachineName: id,
definition: waitState.next(parallelState).next(getResultState).next(choiceState)
});
}
}
作成されたステートマシンの定義
CDK によって実際に作成されたステートマシンの定義はこちらです。
{
"StartAt": "wait",
"States": {
"wait": {
"Type": "Wait",
"TimestampPath": "$.waitSeconds",
"Next": "All jobs"
},
"All jobs": {
"Type": "Parallel",
"Next": "getResults",
"Branches": [
{
"StartAt": "childTask1",
"States": {
"childTask1": {
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "catchErrorState1"
}
],
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"token.$": "$$.Task.Token",
"foo": "bar"
},
"StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine1"
}
},
"catchErrorState1": {
"Type": "Pass",
"Result": {
"error": "task1"
},
"End": true
}
}
},
{
"StartAt": "childTask2",
"States": {
"childTask2": {
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "catchErrorState2"
}
],
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"token.$": "$$.Task.Token",
"foo": "bar"
},
"StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine2"
}
},
"catchErrorState2": {
"Type": "Pass",
"Result": {
"error": "task2"
},
"End": true
}
}
}
]
},
"getResults": {
"Type": "Pass",
"Parameters": {
"result.$": "States.JsonToString($)"
},
"Next": "checkResult"
},
"checkResult": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.result",
"StringMatches": "*error*",
"Next": "parentFailure"
}
],
"Default": "parentSuccess"
},
"parentSuccess": {
"Type": "Succeed"
},
"parentFailure": {
"Type": "Fail",
"Error": "WorkflowFailure",
"Cause": "Something went wrong"
}
}
}
おわりに
本記事では、AWS CDKを使用して、複雑なステートマシンを簡単に作成する方法を説明しました。特定時刻までの待機、各タスクの並列実行、エラーハンドリング、タスクの出力に基づく条件分岐など、実際の業務で頻繁に必要とされる機能を網羅しています。
この記事がどなたかの参考になれば幸いです。
コメント