コンテンツへスキップ

QualiArtsengineer blog

IDOLY PRIDEのバッチ基盤

IDOLY PRIDEのバッチ基盤

5 min read

はじめに

株式会社QualiArtsでバックエンドエンジニアをしている島田です。 2021年6月リリースの「IDOLY PRIDE」(以降、アイプラ)の開発に携わり、基盤開発やインゲーム実装を担当しました。

世の中の多くのシステムと同様に、ゲームにおいてもバッチ処理は欠かせない存在です。ゲーム内イベントにおける対戦相手のマッチング処理やランキングの集計処理、エラーやデータの不整合に対するリカバリ処理、ユーザー分析のためのデータ加工やログ出力等、多くの場面でバッチを利用しています。 本記事では、アイプラのバッチ基盤をどのように構築しているかについてお話しします。

バッチ処理の全体像

アイプラでは、コンピューティングにGKE、バックエンドのプログラミング言語にGoを採用しています。 バッチ処理においてはKubernetesのCronJobリソースを利用して、Goで作成したCLIプログラムを実行するような方式をとっています。以下にイメージ図を示します。

CronJobとJobについて馴染みがない方向けに簡単に説明させていただくと、 CronJobはcrontabの一行に相当し、Cron形式で記述された指定のスケジュールの基づき、定期的にジョブを作成します。Jobは1つ以上のPodを作成し、指定数のPodが正常に終了するまで、Podの実行を行います。

イメージ図を見ていただければわかる通り、そこまで複雑なことはしておらずシンプルな構成です。

実装のポイント

続いてバッチ実装におけるポイントを解説します。

モノレポでの実装

アイプラのバックエンドリソースについてはモノレポを採用しています。そのため、ゲームAPIと同じ処理をそのまま流用することができて便利です。バッチ処理の実装にはspf13/cobraを利用しており、各バッチ処理を1つのCLIコマンドとして実装しています。

cmd
├── admin # 運用管理ツール(Webサーバー)
│   └── main.go
├── api   # ゲームAPI(gRPCサーバー)
│   └── main.go
├── batch # バッチ処理(CLI)
│   └── main.go
├── debug # デバッグツール(Webサーバー)
│   └── main.go
...

バッチの実行通知

バッチの実行完了やエラーをSlackに通知しています。 処理の進捗や実行結果についてはバッチの種類に応じて異なるため、個別の処理内で対応しています。一方で、エラー発生時の通知はどのバッチでも共通で行いたいため、基盤部分にて対応しています。以下は実装イメージです。

func Execute(ctx context.Context, config *Config) (err error) {
	slackNotifier := slack.NewNotifier(config.SlackToken)
	systemNoticeService := systemnotice.New(slackNotifier)
	
	defer func() {
		p := recover()
		if p == nil && err == nil {
			return
		}

		var message string
		switch {
		case p != nil:
			message = fmt.Sprintf("```%v```\n", p)
		case err != nil:
			message = fmt.Sprintf("```%v```\n", err)
		}

		// バッチの異常終了をSlackに通知
		systemNoticeService.NotifyBatchStatus(ctx, systemnotice.BatchNoticeTypeError, config.BatchName, message)

		// Podを異常終了にするためExitCodeを指定
		os.Exit(1)
	}()

	cmd, err := newRootCmd(ctx, config)
	if err != nil {
		return err
	}

	return cmd.ExecuteContext(ctx)
}

トランザクションの分割

アイプラではデータベースとしてGoogle Cloud Spanner(以降、Spanner)を採用しており、1つのトランザクションで更新できるミューテーション数には上限があります。バッチ処理においては大量データの更新を行うケースが多いため、データの更新が必要となるバッチ処理では、いくつかの単位にデータを分割してトランザクション処理を実行しています。

// 100レコード単位で分割してトランザクション処理を実行する
for _, splitedUserEvents := range userEvents.Split(100) {
	err := TxManager.Transaction(ctx, func(ctx context.Context, tx database.Tx) error {
		// データの更新処理
		return nil
	})
	...
}

上記の例では直列に処理を記述していますが、可能な場合はGoroutineで並行に処理を実行しています。 アイプラでのSpanner利用については、過去ブログ記事をご参照ください。

ログ出力

GKEのデフォルトのロギングエージェントでは1秒あたり100KBの制限があり、バッチ処理で一気にログの出力をしようとすると、ログの欠損が発生する可能性もあります。 そのため、バッチ処理内においては直接CloudLoggingに対してログを送信するロガーを実装して利用しています。 現在では高スループットのログエージェントも提供されているので、ユースケースによってはこちらを利用しても良いかもしれません。

バッチ処理に応じてリソースをスケール

アイプラではKubernetesのカスタムリソースを利用して時間帯に応じて各種リソースをスケールイン・アウトさせる運用を行っています。これを利用して、重めのバッチ処理を走らせるタイミングでは、Spannerのノード数等の必要リソースを事前に増やしておくようにしています。こちらも過去ブログ記事にて記述していますのでご参照ください。

運用管理ツールからのバッチ処理実行

リカバリやテスト時等、CronJobに設定したスケジュール以外でバッチ処理を行いたい場合もあります。kubectl create job xxx --from=cronjob/xxxコマンドで、CronJobからJobを手動で作成することも可能ですが、非エンジニアからするとハードルが高くなってしまいます。そこで、別途開発している運用管理ツールにおいて、GoのKubernetesクライアントを利用して、kubectl create jobに相当するCronJobからJobを生成する機能を実装しています。これにより、任意のタイミングで簡単にバッチ処理を実行可能にしています。

func CreateJobFromCronJob(ctx context.Context, cronJobName string, args []string, envMap map[string]string) error {
	cronJobList, err := r.client.GetCronJob(ctx, batchNamespace, cronJobName)
	if err != nil {
		return err
	}
	cronJob := cronJobList.Items[0]

	jobSpec := cronJob.Spec.JobTemplate.Spec
	for i := 0; i < len(jobSpec.Template.Spec.Containers); i++ {
		c := &jobSpec.Template.Spec.Containers[i]
		// 引数追加
		c.Args = append(c.Args, args...)
		// 環境変数を更新
		updatedEnvNames := map[string]struct{}{}
		for j := range c.Env {
			e := &c.Env[j]
			if value, ok := envMap[e.Name]; ok {
				e.Value = value
				updatedEnvNames[e.Name] = struct{}{}
			}
		}
		// 環境変数を追加
		for name, value := range envMap {
			if _, ok := updatedEnvNames[name]; ok {
				continue
			}
			c.Env = append(c.Env, corev1.EnvVar{Name: name, Value: value})
		}
	}

	if err := r.client.CreateJob(ctx, batchNamespace, &v1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%s-manual%d", cronJob.Name, time.Now().Unix()),
			Namespace: batchNamespace,
		},
		Spec: jobSpec,
	}); err != nil {
		return err
	}

	return nil
}

おわりに

今回はアイプラのバッチ基盤をどのように構築しているかについて、いくつかのポイントを紹介させていただきました。 Kubernetesをすでに利用している環境下であれば、CronJobを利用することで、バッチ実装のコストを低く抑えられると思います。 本記事が皆さまの開発に少しでもお役に立てば幸いです。

2013年にサイバーエージェントに新卒入社。バックエンドエンジニアや開発ディレクターとして複数のプロジェクトに携わる。