Workflows + Cloud Scheduler で定期処理をサーバーレス構築(Cloud Composer との比較もあります)

こんにちは、AIテクノロジーグループの太田です。
普段は商品のカタログデータ基盤を開発・運用するチームで業務に携わっております。エニグモではそういったデータやAI関連の技術基盤としてGCPを利用しており、そこで利用したWorkflowsについて紹介したいと思います。

この記事はEnigmo Advent Calendar 2025の17日目の記事です。

1. はじめに:なぜこの構成に至ったか

  • 背景
    毎日追加・更新される商品データを Vector Search のインデックスに反映させる必要があった。
    Cloud Composer (Airflow) の利用実績はあったものの、より安価な Workflows に興味があった。
  • 課題
    「差分抽出 → 画像処理(Embedding) → インデックス更新」という一連のフローを毎日決まった時間に実行したい。
    各処理ステップ(特に画像処理とインデックス更新)は時間がかかるため、タイムアウトやリトライ制御を考慮する必要がある。
    当然コストは抑えたい。
  • 結論
    Cloud Composer を使わず、Workflows + Cloud Scheduler を採用することで、管理コストと金銭的コストを最小限に抑えたアーキテクチャを構築した。

ポイントは、重たい処理(画像処理・インデックス更新)は Dataflow に任せ、Workflows はあくまで「順序制御」に徹する構成にしたことです。

2. Goolge Cloud 構成:全体アーキテクチャ概要

処理の流れは次のとおりです。

  • Cloud Scheduler 毎日定時に Workflows をトリガー。
  • Workflows: 全体の指揮者。以下のステップを順次実行。
    • BigQuery: 前日データとの差分をSQLで抽出。
    • Workflows: 画像の Embedding 計算とGCSへの保存を実行する Dataflow を実行する。
    • Workflows: Vector Search のインデックス更新を実行する Dataflow を実行する。

ポイントは、長時間実行かつ単発で実行する機会がある Dataflow の実行を別の Workflows に委ね、メインの Workflows から別の Workflows を呼び出すようにしたことです。

Dataflowの実装については、本記事の趣旨から外れるので省略いたします。

3. 技術選定:なぜ Cloud Composer ではなく Workflows なのか

このセクションで、他の選択肢と比較し、なぜ今回の構成に至ったかを解説します。

比較項目 Workflows Cloud Composer
特性 サーバーレスで軽量、直線的なフローに最適 複雑な依存関係に強いが、常時稼働が必要
コスト 安価(実行回数課金)
1000ステップ0.01ドル1
Google Cloud 外へのアクセスを要する場合は1000ステップ0.025ドル
高い(小規模でも月額数万円〜)2
実際に月額約8万円かかっています
採用/不採用の決め手 今回の処理が「直線的」であり、複雑なDAGが不要だったため 日次バッチ一つに対してはオーバースペック

運用実績があったからといって、「とりあえず Airflow」とせずに、ワークフローの複雑さに応じてツールを選定できた点が良かったです。

実際に使ってみて、単純な A -> B -> C というフローなら Workflows の方が圧倒的に運用・コストメリットが大きいことが実感できました。

4. 実装サンプル:Workflows から Dataflow を起動する

ここでは、実際に Workflows を使って Dataflow (Flex Template) を起動するための定義(YAML)を紹介します。

【コード解説のポイント】

Dataflow の Flex Template を利用することで、Docker イメージ化したジョブをパラメータ付きで呼び出せます。
Workflows 側でジョブの完了を待機する(ポーリングする)ようにしました。
googleapi 3 で Dataflow 以外の各種 Google Cloud のプロダクトへアクセスすることができるので、参照してみてください。

【サンプルコード(YAML)】

実際に作成した Dataflow を起動する Workflows の定義ファイル(main.yaml)の一部を掲載します。

  • main.yaml の抜粋イメージ
steps:
  - init:
      assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - location: "asia-northeast1"
        - job_name: "dataflow-launcher"
        # デプロイするときに --set-env-vars で設定した環境変数をここで読み込む
        - sdk_container_image: ${sys.get_env("sdk_container_image")}
        - gcs_bucket: ${sys.get_env("gcs_bucket")}
        # YYYYMMDD 形式の日付
        - ymd: ${text.replace_all(text.split(time.format(sys.now()), "T")[0], "-", "")}
  
  # 画像処理をするので時間がかかる Dataflow を実行するステップ
  - dataflow_start_crop_embedding:
      call: googleapis.dataflow.v1b3.projects.locations.flexTemplates.launch
      args:
        projectId: ${project_id}
        location: ${location}
        body:
          launchParameter:
            jobName: ${job_name + ymd}
            containerSpecGcsPath: ${gcs_bucket + "/templates/your-template-spec.json"}
            parameters:
              sdk_container_image: ${sdk_container_image}
            environment:
              stagingLocation: ${gcs_bucket + "/templates/staging"}
              tempLocation: ${gcs_bucket + "/templates/temp"}
              serviceAccountEmail: ${Dataflow 実行権限を持つ Service Account}
      result: dataflow_result
      next: initialize_polling

  # 以下、Dataflow を完了まで監視するステップ
  # ループした回数だけステップ数が増えてコストも増えていくのでループ数には注意してください。

  - initialize_polling:
      assign:
        - counter: 0
        # "max_retries * 60秒 (wait_60_secondsで定義) = 1時間"なので、最大1時間ポーリングを行う。
        - max_retries: 60
      next: poll_job_status

  - poll_job_status:
      call: googleapis.dataflow.v1b3.projects.locations.jobs.get
      args:
        projectId: ${project_id}
        location: ${location}
        # run_dataflow_job ステップの return から参照できる。
        jobId: ${dataflow_result.job.id}
      result: job_status
      next: check_job_status

  - check_job_status:
      switch:
        - condition: ${job_status.currentState == "JOB_STATE_DONE"}
          next: job_succeeded
        - condition: ${job_status.currentState == "JOB_STATE_FAILED" or job_status.currentState == "JOB_STATE_CANCELLED"}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が失敗しました。ステータス " + job_status.currentState}
        - condition: ${counter >= max_retries}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が1時間以内に完了しませんでした。タイムアウト。"}
        - condition: ${true}
          next: increment_counter

  - increment_counter:
      assign:
        - counter: ${counter + 1}
      next: wait_60_seconds

  - wait_60_seconds:
      call: sys.sleep
      args:
        seconds: 60
      next: poll_job_status

  - job_succeeded:
      return: "SUCCESS"

【サンプルコードをデプロイ】

作成した Workflows の定義ファイルをデプロイ4します。

gcloud workflows deploy sample_workflow \
        --source=main.yaml \
        --location="asia-northeast1" \
        --project=${PROJECT_ID} \
        --service-account=${SERVICE_ACCOUNT_EMAIL} \
        --set-env-vars sdk_container_image=${Artifact Registry にpushしたdockerイメージ} \
        --set-env-vars gcs_bucket="gs://YOUR_BUCKET" \

【サンプルコードを実行した結果】

ループしているので実際に実行されたステップ数は137でした。

コストは 137 * 0.01 / 1000 = 0.00137 ドルになります。

5. Workflows を採用する上で許容した「不便な点」

コストと手軽さは魅力的ですが、導入に際しては以下のデメリットも考慮する必要がありました。

  1. 開発体験のクセ(YAML地獄)

    • 課題
      Python で記述できる Airflow と異なり、Workflows は YAML (または JSON) でロジックを記述する必要があります。条件分岐やループ処理、変数の扱いが直感的ではなく、構文エラーに悩まされることが多いです(一般的に "YAML engineering" と揶揄される部分)。
    • 対応
      今回は「直列的なフロー」に留めることで複雑な記述を回避しました。複雑なロジックが必要な場合は、無理に Workflows 内に書かず、Cloud Functions や Dataflow に逃がす設計が重要です。
  2. ローカルテスト・デバッグの難易度

    • 課題
      Cloud Composer (Airflow) はローカル環境を構築可能なので DAG のテストが可能ですが、Workflows はクラウド上のリソースと密結合しているため、ローカルでの完全な再現・テストが困難です。「修正してデプロイして実行」のサイクルになりがちです。
    • 対応
      ステップごとの単体テストは諦め、結合テスト中心で進める割り切りが必要でした。 また、別の Workflows に分割することで、ステップごとに運用できるように対応しました。
  3. ステップ間のデータ受け渡し制限(メモリサイズ)

    • 課題
      Workflows は大規模なデータをステップ間で直接受け渡すこと(ペイロードサイズ制限)には向いていません。
    • 対応
      今回の設計では、画像データそのものや大量のリストは Workflows 上を通過させず、必ず GCS のパスや BigQuery のテーブル名といった「参照情報」のみを受け渡すように徹底しました。
  4. ベンダーロックイン

    • 課題
      Airflow はOSS標準ですが、Workflows は Google Cloud 固有のサービスです。将来的に他のクラウドへ移行する場合、ポータビリティがありません。
    • 対応
      今回は GCP 完結のシステムであり、フルマネージドの恩恵(管理レス)を最優先しました。

6. まとめ

Workflows + Cloud Scheduler の組み合わせにより、日次の Vector Search インデックス更新を完全自動化できました。

コスト面以外では、インフラ管理コスト(Cloud Composer の環境維持など)を削減し、本質的なロジック開発に集中できることが Workflows の大きなメリットに感じました。

デメリットでYAML記法に言及しましたが、逆に言えば、どなたでも気軽に試してみることができるとも言えます。コストも軽いのでこれを機に是非一度お試しください。

読者の皆様がこれで良い体験を得ることができましたら私としても大変嬉しく思います。

明日18日目はAIテクノロジーグループの吉田さんです。