機械学習実験を加速させる dbt による特徴量管理の実践

こんにちは、AI テクノロジーグループ データサイエンティストの髙橋です。業務では企画/分析/機械学習モデル作成/プロダクション向けの実装/効果検証を一貫して行っています。この記事は Enigmo Advent Calendar 2025 の 19 日目の記事です。

本記事では、 dbt を利用した機械学習モデルの特徴量管理について紹介します。この特徴量管理を活用することで、機械学習を利用したプロジェクトで多くの実験を効率的に実施でき、利益増加というビジネス成果に繋げることができました。

www.getdbt.com

※文中に記載するディレクトリやファイル名、 SQL コード、コマンドなどは全てイメージです。

特徴量管理の目的・成果

dbt を利用した特徴量管理を導入した目的は、ある機械学習プロジェクトにて効率的に数多くの特徴量を試す必要があったためです。まず、そのプロジェクトと得られた成果について説明します。
弊社が運営している CtoC ECサービス BUYMA では、MA (Marketing Automation) ツールを通じてクーポンなどのインセンティブをメールで配布しています。これまで、様々な会員セグメントを定義し、各セグメントに対してのインセンティブ配布ルールの運用を行っており、そのルールのチューニングで改善を図っていました。このアプローチではルールをもとに一律配布しているため、機械学習による最適化余地があると考えました。具体的には、インセンティブがなくても購入する会員にもコストをかけてしまったり、インセンティブがあれば購入する会員に配布できていなかったりなどの最適化の余地があるのではないかと考えました。そこで、機械学習を活用してデータに基づくより効果的なインセンティブ配布を実現することを目指しました。

BUYMA はリリースから 20年を超えるサービスであり、様々な MA シナリオ(会員セグメントと配布ルールの組み合わせを以降 MA シナリオと呼びます)が運用されています。出来るだけ多くの MA シナリオに対して機械学習による最適化を適用したく、そのためには様々な特徴量を効率良く試せるようにすべきと考えました。

そこで、今回紹介する dbt を利用した特徴量管理を導入しました。その結果、約1年間でおよそ8個の MA シナリオに機械学習による配布最適化を試すことができ、シナリオによってばらつきはあるものの約20%の利益増加が実現できました。

数多くの特徴量を試す上での課題

まず前提として、特徴量は BigQuery で作成する方針としました。理由は、既に BUYMA のデータは BigQuery に保存されていたことと、Python 実行環境(ノートブックなど)への特徴量作成のもとになる行動データのダウンロードに非常に時間がかかったためです。時間がかかる理由は、BUYMA は会員数・商品数が非常に多く、それに伴いユーザーの行動ログのデータ量も非常に多いためです。具体的には、会員数1185万人、商品数590万品であり*1、利用する行動ログのレコード数は数千万件になることもあります。

この前提のもと、BigQuery で数多くの特徴量を効率的に試すには以下の課題がありました。

  • 特徴量の数が多くなると SQL が肥大化して可読性が低下する。
    • 例えば、弊社で過去別の機能で作成した特徴量 SQL ファイルは2000行を超えており読むのが大変でした。
  • 特徴量作成のロジックが複雑になると可読性が低下する
    • 例えば、過去 n 日間の閲覧数という特徴量を複数 n に対して記述すると、SQL が長くなり可読性が低下します。
  • 別の実験で特徴量を再利用しづらい
    • 再利用する場合は、その特徴量部分を毎回コピペする必要があるためです。

dbt による課題解決

そこで、 dbt を利用して特徴量管理を行うことで、これら課題の解決を図りました

まず、 dbt について簡単に説明します。ただし、dbt は多くの機能があるため今回の課題解決に関連する機能に絞って説明します。全体を詳しく知りたい方は公式ドキュメントを参照ください。 今回役立ったのは以下の機能です。

  • SQL ファイルの分割
  • for などのロジックの記述

SQL ファイルの分割について、dbt を利用することで CTE (Common Table Expression) を別のファイルに分けることができます。例として、以下のような CTE を使った SQL を考えます。

-- main.sql
WITH users AS (
    SELECT
        id AS user_id,
        first_name,
        last_name
    FROM `project.dataset.users`
)

SELECT
    *
FROM
    users

dbt を利用することでこの SQL を2つのファイルに分けることができます。

-- user.sql
SELECT
    id AS user_id,
    first_name,
    last_name
FROM `project.dataset.users`

-- main.sql
SELECT
    *
FROM
    {{ ref("user") }}

ここで、 {{ (ref("user")) }} は user.sql を参照することを意味します。 dbt run コマンドを実行すると、 user.sqlmain.sql のテーブルビューが作成されます。この機能を利用することで、特徴量作成などの SQL を分割して可読性を向上させることが出来ます。具体的には、以下のように SQL ファイルを分割しました。

models
├── datasets
│   └── dataset.sql
└── features
│   ├── features_user_attributes.sql
│   └── features_user_action_log.sql
└── labels
    ├── label_type_1.sql
    └── label_type_2.sql

ここで、 models ディレクトリは dbt でデータ取得のための SQL を配置するディレクトリです。*2

各 datasets ファイルの中身は以下のようにしました。

SELECT
    *
FROM
    {{ var("user_ids_table_name") }}
LEFT JOIN
    {{ ref("features_user_attributes") }}
USING(user_id)
LEFT JOIN
    {{ ref("features_user_action_log") }}
USING(user_id)
LEFT JOIN
    {{ ref("label_type_1") }}
USING(user_id)

ここで、 var は dbt で利用できる変数です。*3 様々な会員セグメントについて実験するために、セグメントごとの会員 ID を別テーブルにあらかじめ保存しておき、変数として切り替えられるようにしました。 features ディレクトリ配下のファイルは意味がある粒度で特徴量を分けて再利用しやすくしました。また、機械学習の目的変数である label も複数パターン試せるようにしました。 こうしたことで、実験が進むごとに特徴量が増加しても、SQL が肥大化して読みにくくなることを防げました。具体的には、1つの SQL ファイルあたり長くとも約100行におさまるようになりました。また、 dataset.sql を見ればどのような特徴量が利用されているかが一目で分かるようになりました。

for などのロジックの記述について、dbt では Jinjaというテンプレートエンジン の記法で for などのロジックを記述することができます。これを利用して、例えば過去 1、3、7日間の閲覧、お気に入り回数の集計は以下のように記述できます。

{%- set agg_actions = ["view", "like"] -%}
{%- set last_n_days = [1, 3, 7] -%}

SELECT
    user_id,
    {% for agg_action in agg_actions %}
        {% for last_n_day in last_n_days %}
            COUNTIF(
                day_from_base_date <= {{ last_n_day }}
                AND action = "{{ agg_action }}"
            ) AS cnt_action_{{ agg_action }}_last_{{ last_n_day }}_days,
        {% endfor %}
    {% endfor %}
FROM
    `project.dataset.user_action_log`
GROUP BY
    user_id

ここで、簡単のために user_action_log テーブルに特徴量作成の基準日から何日前のログかを示す day_from_base_date カラムが存在すると仮定しています。これを通常の SQL で記述すると以下のようになります。

SELECT
    user_id,
    COUNTIF(
        day_from_base_date <= 1
        AND action = "view"
    ) AS cnt_action_view_last_1_days,
    COUNTIF(
        day_from_base_date <= 3
        AND action = "view"
    ) AS cnt_action_view_last_3_days,
    COUNTIF(
        day_from_base_date <= 7
        AND action = "view"
    ) AS cnt_action_view_last_7_days,
    COUNTIF(
        day_from_base_date <= 1
        AND action = "like"
    ) AS cnt_action_like_last_1_days,
    COUNTIF(
        day_from_base_date <= 3
        AND action = "like"
    ) AS cnt_action_like_last_3_days,
    COUNTIF(
        day_from_base_date <= 7
        AND action = "like"
    ) AS cnt_action_like_last_7_days
FROM
    `project.dataset.user_action_log`
GROUP BY
    user_id

比較してみると、 dbt を利用することで SQL が短くなり、かつ変数を定義できるためどの行動を過去何日分集計するかが一目で分かるようになりました。これにより特徴量が複雑になっても可読性が低下することを防げました。また、集計する日数や行動が増えたとしても、変数のリストに要素を追加するだけで対応できるようになりました。

dbt による特徴量管理時の工夫

より多くの特徴量を素早く試せるように行った工夫があるため、それらも紹介します。ここでは2つ紹介します。

1つ目はデータセット管理表を用意し、実験で利用するデータセットごとに ID を採番し、 dataset_001dataset_002 のようにファイルを作成していく方針としたことです。

データセット管理表のイメージ:

データセットID データセット説明
1 セグメント1に対して特徴量 A を利用したデータセット
2 セグメント2に対して特徴量 A, B を利用したデータセット

作成したファイルのイメージ:

models
└── datasets
    ├── dataset_001.sql
    └── dataset_002.sql

こうすることで、新しいデータセットを簡単に追加できるようにし、かつ過去のデータセットも参照しやすくしました。実際に2025年12月時点ではデータセット ID は 100 を超えていますが、問題なく運用出来ています。

2つ目は dbt で作成したテーブルのビューから Python 実行環境でデータを取得する際は、以下のような SQL で一度 GCS にエクスポートしてダウンロードするようにしたことです。これは、Python で BigQuery SDK を利用してデータ取得するとレコード数が多い場合非常に時間がかかるためです。

-- analyses/export_dataset.sql
{%-
    set bucket_folder = "datasets/" + var("dataset_id")
-%}
{%-
    set table_name = 
        target.database
        + "."
        + target.schema
        + "."
        + "dataset_"
        + var("dataset_id")
-%}

-- BigQuery の export data 構文において _table_suffix を含んでいるとエラーが発生するため
-- CREATE TEMP TABLE 構文を利用。
-- https://stackoverflow.com/a/70033601
CREATE TEMP TABLE temp_dataset AS (
    SELECT
        *
    FROM
        {{ table_name }}
);

EXPORT DATA
OPTIONS (
    uri = "gs://{{ var('bucket_name') }}/{{ bucket_folder }}/*.gz",
    format = "Parquet",
    overwrite = true,
    compression = "GZIP"
)
AS (
    SELECT
        *
    FROM
        temp_dataset
);

ここで、GCS バケット名やフォルダ、エクスポートするデータセット ID を dbt 変数としており、これによりデータセットによってエクスポート先を変更できるようにしました。また、この SQL はテーブルビューを作成する必要がないため、 analyses ディレクトリに配置して、以下のコマンドでコンパイルして実行するようにしました。

dbt compile \
    --select analyses/export_dataset.sql \
    --vars '{bucket_name: "your_bucket_name", bucket_folder: "your_bucket_folder", dataset_id: "001"}' && \
bq query < target/compiled/your_dbt_project_name/analyses/export_dataset.sql

ここで、 dbt compile コマンドは Jinja 記法などを解決して実行可能な SQLコンパイルするコマンドであり、コンパイルされたファイルは target/compiled 配下に保存されます。また、 dbt の analyses ディレクトリとは models ディレクトリとは異なり一時的な分析用 SQL などを配置するのに適したものになります。*4

まとめ

本記事では、dbt を利用した特徴量管理について紹介しました。SQL の肥大化や特徴量の再利用しづらさという課題を、 SQL ファイルの分割や for などのロジック記述により解決しました。また、データセット管理の方法や Python 実行環境でのデータ取得の高速化というより効率的に多くの特徴量を試す方法も紹介しました。これにより、複数の MA シナリオに対して機械学習を利用したインセンティブ配布最適化を試すことができ、利益増加という成果に繋げることが出来ました。

本記事が特徴量管理の参考になれば幸いです。

明日の記事は BUYMA TRAVEL のエンジニアの赤間さんです。お楽しみに!


株式会社エニグモ すべての求人一覧 hrmos.co

*1:2025年10月末時点の数値です。https://enigmo.co.jp/ir/

*2:dbt models について詳細は dbt 公式ドキュメントを参照ください。

*3:dbt の変数について詳細は dbt 公式ドキュメントを参照ください。

*4:dbt analyses について詳細は dbt 公式ドキュメントを参照ください。

AWS Savings Plans検討をGemini Gemsで自動化する~~プロンプト作成のコツと組織管理の課題~~

こんにちは、インフラエンジニアの森田です。

この記事はEnigmo Advent Calendar 2025の 14日目の記事です。
今回は、業務効率化のためにGoogle Geminiのカスタム指示(Gems)を作成し、
実際の業務で使ってみた使用感や気づきについて紹介します。

どのような業務に活用したか

私は直近でAWSのコスト削減に取り組んでいます。
特にSavings Plansなどを購入する際、複数アカウントのオンデマンドコストと推奨コミット額を見比べ、
その購入計画が適切かを整理する必要がありました。
これを人力で行うのは工数もかかり、ミスのリスクもあるため「辛い」作業でした。

そこで、Savings Plans推奨事項のCSVファイルと、
Cost ExplorerからCLIで取得したJSONファイルを読み込ませることで、
コミット額の適切性検証とコストメリットの整理を行ってくれるGemsを作成しました。

結果として、非常に良好な感触を得られました。
やはり、複雑な数値の突き合わせや計算は計算機(AI)に任せるのが一番です。
本記事では、実際にツールを作ってみて「気をつけると良い点」と、
組織で運用する上で「課題だと感じた点」を共有します。

カスタム指示(Gems)作成のポイント

コスト削減アシスタントGemsを作成する過程で、特に重要だと感じたポイントは以下の3点です。

1. 具体的な使い方の説明(ガイド)を含める

スクリプトと異なり、対話形式で進むため、
初見のユーザーでも迷わないよう「利用手順」を指示に含めておくと親切です。
今回はデータを読み込ませて分析するツールなので、
以下のようにデータの取得手順を案内させるようにしました。

## 0. ユーザーサポート / 使い方ガイド

ユーザーから「使い方を教えて」「何が必要?」と問われた場合、または挨拶のみでデータが未提供の場合は、以下の3ステップのデータ取得手順を案内してください。

### 手順1: 推奨事項CSVのダウンロード (AWS Console)
<取得手順を記載>

### 手順2: 直近のオンデマンド料金取得 (AWS CLI)
<取得手順を記載>

### 手順3: データの取得
<取得手順を記載>

2. 入出力のイメージを厳密に定義する

曖昧な指示だと、実行のたびにAIの解釈が変わり、
出力フォーマットがブレて使いづらくなります。
AIに勝手な解釈をさせないよう、入力データの処理ルールと出力形式を
以下のように固定することをお勧めします。

入力ルールの例:

## 2. 入力データの処理ルール

以下のデータがテキストまたはファイルとして与えられます。

1. **推奨事項 (CSV):** `savings-plans-recommendations.csv`
    * ここから「アカウントID」「推奨コミットメント額($/h)」「推定削減率」を抽出します。

2. **実績コスト (JSON):** `ec2_ondemand_daily_filtered.json` (Cost Explorer出力)
    * **安全性判定:** `推奨コミットメント額($/h) × 24h``日次実績コスト(過去30日間の最小値)` を下回っているか確認してください。実績を下回っていれば「安全(使い切れる)」、上回っていれば「注意(使い切れないリスクあり)」と判定します。
(略)

出力ルールの例:

必ず以下の **【出力1】** 〜 **【出力3】** の形式で出力してください。

-----

### 【出力1】
<出力1の構造を指示する>
(略)

3. 複雑なファイルは「キャプチャ画像」で読ませる

複雑なレイアウトのExcelやPDFファイルは、
テキスト抽出時に構造が崩れ、正しく解析できない場合があります。
そのような場合、対象箇所のキャプチャ画像を撮って画像を読ませる方が、
精度が高くなるケースがありました。
テキストでの読み込みで精度が出ない場合は、「画像を読ませる」という選択肢を
頭の片隅に置いておくと良いでしょう。

管理・運用上の懸念点

個人のツールとしては優秀なGemsですが、これを「会社の資産」として管理しようとした際、
いくつか課題も見えてきました。

変更履歴が見えない

作り込んだカスタム指示は長文になりがちですが、現状のGemsには変更履歴(Diff)を見る機能がありません。
「誰が・いつ・なぜ変更したか」が追えないため、チーム開発には不向きです。
GCPのVertex AI AgentsであればTerraform等で管理可能ですが、
Gemini(Gems)単体では難しいため、現状は「プロンプトの内容をテキストファイルとしてGitで管理し、変更時はGitを通してから手動でGemsを更新する」という運用が現実解になりそうです。
スマートではありませんが、資産管理としては必要です。

作成者のアカウント削除でGemsも消える

Gemsの実体は、作成者の「マイドライブ/Gemini Gems/」配下に保存されるファイルとして扱われるようです。
そのため、作成者が退職等でアカウント削除されると、
マイドライブ内のデータと共にGemsも消失してしまいます。
これを回避するために共有ドライブへの集約を試みましたが、
共有ドライブ上のGemsファイルを開こうとするとエラーが発生しました(下図参照)。
現状では、誰か個人のマイドライブに配置されている必要がありそうです。

共有ドライブ上のGemsを開いた際のエラー

モデル更新による挙動の変化(AIドリフト)

これはLLMを利用する全般的なリスクですが、バックエンドのモデルが更新された際、
以前と同じプロンプトでも挙動が変わる可能性があります。
ChatGPTのCustom GPTsのようにモデルバージョンを固定する機能は、現状のGemsには見当たりません。
影響を最小限にするためには、前述の通り「入出力を厳密に定義」してAIの解釈の幅を狭めておくことが重要です。
また、モデル更新のアナウンスがあった際は、簡単な動作確認フローを設けるのが良いでしょう。

まとめ

スプレッドシートでオンデマンドコストとコミットコストを整理して購入計画を立てていたときは2,3日かかっていたところ、
業務特化型のGemsを作成することで、正味1時間あれば整理が完了するようになり大幅に効率化することができました。

一方で、チームや組織で永続的に管理・運用していくには、
バージョン管理やオーナー権限の面でまだ工夫が必要だと感じています。

今後、Google Workspaceの機能アップデートにより、
これらの管理機能が強化されることを期待しつつ、
まずはGit管理などの運用ルールでカバーしながら活用していきたいと思います。

「AIでさがす」サービスのリニューアル - BUYMA内記事コンテンツをベースにした商品提案エージェントの実現

こんにちは、AIテクノロジーグループのエンジニアの吉田です。

本記事はEnigmo Advent Calendar 2025の 18日目の記事です。

普段は検索システム全般、機械学習システムのMLOps、AI関連の機能開発を担当しております。

この記事では「AIでさがす」サービスのリニューアルについて紹介します。

「AIでさがす」サービスとは

「AIでさがす」サービスは、BUYMAのWebサイトおよびアプリで提供している、AIを活用した商品提案サービスです。

実際の機能は以下からご利用頂けます。(BUYMAアカウントでのログインが必要となります。)

「AIでさがす」サービス

ユーザーが文章で質問すると、AIが質問内容を理解し、おすすめの商品を提案します。例えば「春のデートにぴったりなワンピースを教えて」といった質問に対して、AIが回答文とともに具体的な商品を紹介します。

従来のキーワード検索では見つけにくかった商品や、ユーザー自身が気づいていなかった新しい商品との出会いを提供することで、BUYMAでのショッピング体験をより豊かにすることを目指しています。

※商品画像はモザイク加工しております。

リニューアルの背景

旧システムは、ChatGPT APIを活用した商品提案サービスでしたが、主な課題が3点ありました。

  • BUYMAの知識不足
    • ChatGPT が一般的な知識で回答を生成するため、BUYMAならではのトレンドや商品特性を反映できない。
  • 根拠の不明確さ
  • 検索キーワード生成の精度
    • 形態素解析ツールの MeCab を併用していましたが、文脈や意味を理解した検索キーワード生成ができない。

また、リリースから2年が経過し、本格的にバージョンアップが必要なタイミングでもありました。

※旧システムの詳細はこちらの記事で紹介しております。

ちょうどチームメンバーが社内ドキュメントのAI検索システムを開発しており、この仕組みをBUYMAの多数の記事コンテンツに適用すれば、よりBUYMAらしい商品提案が可能になると考えました。

そこで、今回のリニューアルでは、BUYMA内の記事コンテンツ群をベースに会話するエージェントを作成しました。これにより、BUYMAならではの知識を持ったAIが、よりBUYMAでおすすめしたい商品を提案できるようになりました。

システム変更前後の比較

旧システムと新システムの違いは以下の通りです。

旧システムでは、ChatGPT が一般的な知識で回答を生成し、MeCab による単純な形態素解析で検索キーワードを生成していました。そのため、BUYMAならではの文脈を理解した商品提案が難しい状況でした。

新システムでは、BUYMA内記事コンテンツを参照した Vertex AI Search が回答文を生成し、Gemini が文脈を理解した検索キーワードを生成します。その結果、よりBUYMAらしい商品提案が可能になりました。

それぞれの処理フローは以下の通りです。

旧システム処理フロー

  1. BUYMA基幹システムから「AIでさがす」APIにリクエスト
  2. 「AIでさがす」APIがユーザーの質問を ChatGPT APIに送信
  3. ChatGPT が回答文とおすすめアイテムリストを生成
  4. アイテム名を MeCab形態素解析)で解析し、検索キーワードを生成
  5. 検索APIで商品情報を取得し、ユーザーに表示

新システム処理フロー

  1. BUYMA基幹システムから「AIでさがす」APIにリクエスト
  2. 「AIでさがす」APIがユーザーの質問を Vertex AI Search に送信
  3. Vertex AI Search (事前にBUYMA内記事コンテンツをインポート済み)が回答文を生成
  4. 質問文と回答文を Gemini に送信し、検索キーワードを生成
  5. 検索APIで商品情報を取得し、ユーザーに表示

アーキテクチャー特徴

Vertex AI Search を利用して、インポートしたBUYMA内記事コンテンツをベースに会話を行うエージェントを構築しました。

  • BUYMA内記事コンテンツのインポート
    • 約4000件の記事をデータストアにインポート
  • プロンプト設計
    • 「ファッションECサイトBUYMAのショッピングアドバイザー」として定義し、ユーザーの質問に対して最適な商品を提案する形で回答を生成

2. Gemini

Gemini を活用する事により、会話内容から商品検索キーワードを生成する機能を作成しました。

  • プロンプト設計
    • ECサイトの検索キーワードを生成する専門家」として定義し、会話の文脈を理解して検索キーワードを生成
  • MeCab との違い
    • MeCab は単語の分解のみだが、Gemini は文脈を理解してブランド名・カテゴリ名・モデル名を組み合わせた検索キーワードを生成

実装時の課題・解決策・工夫した点

  • Vertex AI Search の幻覚への対応

    • 初回質問時に Vertex AI Search が過去から質問が続いているような幻覚を見る場合がありました。当初は初回と2回目以降の会話を共通のプロンプトで行っており、「ユーザーの過去の質問履歴」の項目に入っている文言の有無から初回なのか、2回目以降の会話なのかを判断する指示を出していました。ところが、「過去」という文言に引きずられてなのか、初回なのに過去の質問をAI側が捏造して、その続きとして回答する場合が稀にありました。
    • プロンプトテンプレートを初回用と2回目以降用の2種類に分け、初回用のプロンプトからは「ユーザーの過去の質問履歴」の文言自体を削除する事によって対応しました。
  • 敵対的クエリへの対応

    • 敵対的クエリ(不適切な質問)の場合、Vertex AI Search のAPIからのレスポンスフォーマットが通常とは異なるものになり、要約が生成できないにもかかわらず、無理やり商品紹介を行ってしまいました。
    • 敵対的クエリーのフォーマットを検知した場合は、要約失敗として扱い、商品紹介を行わないように修正しました。
    • この場合以外でも稀に異なるフォーマットのレスポンスになる場合があり、サービス継続に支障が出ないように都度改善を行いました。
  • Gemini のライブラリ移行

    • もともと使用していたライブラリがサポート終了を迎えるため、社内では実績がない新しいライブラリに移行する必要がありました。移行後、従来使用していた Gemini モデルが初期設定では使用できず、次世代のモデルを試したところレスポンスタイムが大幅に遅くなってしまいました。新しいライブラリという事もあり、AIツールではなかなか解決できず、最終的にはGoogleサポートに問い合わせして解決に至りました。

得られた学びとノウハウ

  • AIツールの活用と限界

    • 「AIでさがす」のバックエンドAPIリポジトリは、ほぼ全部作り直したのですが、AIツールを活用する事によって、工数を節約する事ができました。Terraform 関連のリソース修正、テストケース作成やMOCK用のフロントエンド実装等においてもAIツールにより大幅な工数削減ができました。
    • 一方で、Gemini のライブラリ移行など、ドキュメントの記載やインターネット上での知見が少ない領域ではAIツールでは解決できず、結果的に公式サポートへの問い合わせが必要でした。
  • AIの不確定な挙動への対応

    • Vertex AI Search の幻覚や部分的な失敗など、AIサービス特有の不確実性に対して、初回用と2回目以降用でテンプレートを分けるなど、細かな調整が重要でした。また、プロンプトだけではどうする事もできない場合があり、そのような場合は後処理でルールベースのロジックを追加する必要がありました。

効果測定

リニューアル後、以下のような指標が上昇しました。

  • 1スレッドあたりの質問数の平均
    • 会話の継続性が向上し、ユーザーが複数回質問を続けるようになりました。
  • 1ユーザー1日あたりの質問数
    • 利用頻度が向上し、ユーザーがより積極的に機能を活用するようになりました。
  • 検索URLに遷移された回数
    • 商品検索への誘導効果が向上し、実際の商品閲覧につながるケースが増加しました。

これらの結果から、BUYMA内記事コンテンツを根拠とした回答の提供と、文脈を理解した検索キーワード生成により、ユーザーの満足度と利用価値が向上したと考えられます。

直近の対応/今後の展開・課題

  • 金額絞り込み機能の追加(今月対応)

    • ユーザーからの要望が多い金額絞り込み機能の対応をしました。価格帯に関する質問に対して適切な商品提案ができていない課題があったため、Gemini で検索API用の金額フィルタークエリを生成することで対応しました。
  • コンテンツの拡充

    • 現在はBUYMA内記事コンテンツのみを Vertex AI Search にインポートしていますが、今後はYouTubeでの発信内容も追加する予定です。記事以外のコンテンツも活用することで、より幅広い情報をユーザーに提供できるようになります。
  • 継続的なメンテナンス

    • AIのライブラリやモデルは随時更新されていくため、継続的なメンテナンスが課題となります。特に Gemini や Vertex AI Search などのサービスは進化が早く、新しいモデルへの対応や非推奨ライブラリ/バージョンから移行など、定期的な見直しが必要です。

まとめ

本記事では、「AIでさがす」サービスのリニューアルについて紹介しました。

旧システムでは ChatGPT と MeCab を使用していましたが、BUYMA特有の知識不足や根拠の不明確さなどの課題がありました。リニューアルでは Vertex AI Search と Gemini を採用し、BUYMA内記事コンテンツを根拠とした回答生成と文脈を理解した検索キーワード生成を実現しました。

実装時には敵対的クエリへの対応やAIサービス特有の不確実性への対処など様々な課題に直面しましたが、ロジックでの細かい制御やAIツールの活用により解決できました。リニューアル後は会話継続性や利用頻度、商品検索への誘導効果が明らかに向上しています。

明日の記事は同じAIテクノロジーグループの髙橋さんです。お楽しみに。


株式会社エニグモ すべての求人一覧 hrmos.co

Workflows + Cloud Scheduler で定期処理をサーバーレス構築(Cloud Composer との比較もあります)

こんにちは、AIテクノロジーグループの太田です。
普段は商品のカタログデータ基盤を開発・運用するチームで業務に携わっております。エニグモではそういったデータやAI関連の技術基盤としてGCPを利用しており、そこで利用したWorkflowsについて紹介したいと思います。

この記事はEnigmo Advent Calendar 2025の17日目の記事です。

1. はじめに:なぜこの構成に至ったか

  • 背景
    毎日追加・更新される商品データを Vector Search のインデックスに反映させる必要があった。
    Cloud Composer (Airflow) の利用実績はあったものの、より安価な Workflows に興味があった。
  • 課題
    「差分抽出 → 画像処理(Embedding) → インデックス更新」という一連のフローを毎日決まった時間に実行したい。
    各処理ステップ(特に画像処理とインデックス更新)は時間がかかるため、タイムアウトやリトライ制御を考慮する必要がある。
    当然コストは抑えたい。
  • 結論
    Cloud Composer を使わず、Workflows + Cloud Scheduler を採用することで、管理コストと金銭的コストを最小限に抑えたアーキテクチャを構築した。

ポイントは、重たい処理(画像処理・インデックス更新)は Dataflow に任せ、Workflows はあくまで「順序制御」に徹する構成にしたことです。

2. Goolge Cloud 構成:全体アーキテクチャ概要

処理の流れは次のとおりです。

  • Cloud Scheduler 毎日定時に Workflows をトリガー。
  • Workflows: 全体の指揮者。以下のステップを順次実行。
    • BigQuery: 前日データとの差分をSQLで抽出。
    • Workflows: 画像の Embedding 計算とGCSへの保存を実行する Dataflow を実行する。
    • Workflows: Vector Search のインデックス更新を実行する Dataflow を実行する。

ポイントは、長時間実行かつ単発で実行する機会がある Dataflow の実行を別の Workflows に委ね、メインの Workflows から別の Workflows を呼び出すようにしたことです。

Dataflowの実装については、本記事の趣旨から外れるので省略いたします。

3. 技術選定:なぜ Cloud Composer ではなく Workflows なのか

このセクションで、他の選択肢と比較し、なぜ今回の構成に至ったかを解説します。

比較項目 Workflows Cloud Composer
特性 サーバーレスで軽量、直線的なフローに最適 複雑な依存関係に強いが、常時稼働が必要
コスト 安価(実行回数課金)
1000ステップ0.01ドル1
Google Cloud 外へのアクセスを要する場合は1000ステップ0.025ドル
高い(小規模でも月額数万円〜)2
実際に月額約8万円かかっています
採用/不採用の決め手 今回の処理が「直線的」であり、複雑なDAGが不要だったため 日次バッチ一つに対してはオーバースペック

運用実績があったからといって、「とりあえず Airflow」とせずに、ワークフローの複雑さに応じてツールを選定できた点が良かったです。

実際に使ってみて、単純な A -> B -> C というフローなら Workflows の方が圧倒的に運用・コストメリットが大きいことが実感できました。

4. 実装サンプル:Workflows から Dataflow を起動する

ここでは、実際に Workflows を使って Dataflow (Flex Template) を起動するための定義(YAML)を紹介します。

【コード解説のポイント】

Dataflow の Flex Template を利用することで、Docker イメージ化したジョブをパラメータ付きで呼び出せます。
Workflows 側でジョブの完了を待機する(ポーリングする)ようにしました。
googleapi 3 で Dataflow 以外の各種 Google Cloud のプロダクトへアクセスすることができるので、参照してみてください。

【サンプルコード(YAML)】

実際に作成した Dataflow を起動する Workflows の定義ファイル(main.yaml)の一部を掲載します。

  • main.yaml の抜粋イメージ
steps:
  - init:
      assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - location: "asia-northeast1"
        - job_name: "dataflow-launcher"
        # デプロイするときに --set-env-vars で設定した環境変数をここで読み込む
        - sdk_container_image: ${sys.get_env("sdk_container_image")}
        - gcs_bucket: ${sys.get_env("gcs_bucket")}
        # YYYYMMDD 形式の日付
        - ymd: ${text.replace_all(text.split(time.format(sys.now()), "T")[0], "-", "")}
  
  # 画像処理をするので時間がかかる Dataflow を実行するステップ
  - dataflow_start_crop_embedding:
      call: googleapis.dataflow.v1b3.projects.locations.flexTemplates.launch
      args:
        projectId: ${project_id}
        location: ${location}
        body:
          launchParameter:
            jobName: ${job_name + ymd}
            containerSpecGcsPath: ${gcs_bucket + "/templates/your-template-spec.json"}
            parameters:
              sdk_container_image: ${sdk_container_image}
            environment:
              stagingLocation: ${gcs_bucket + "/templates/staging"}
              tempLocation: ${gcs_bucket + "/templates/temp"}
              serviceAccountEmail: ${Dataflow 実行権限を持つ Service Account}
      result: dataflow_result
      next: initialize_polling

  # 以下、Dataflow を完了まで監視するステップ
  # ループした回数だけステップ数が増えてコストも増えていくのでループ数には注意してください。

  - initialize_polling:
      assign:
        - counter: 0
        # "max_retries * 60秒 (wait_60_secondsで定義) = 1時間"なので、最大1時間ポーリングを行う。
        - max_retries: 60
      next: poll_job_status

  - poll_job_status:
      call: googleapis.dataflow.v1b3.projects.locations.jobs.get
      args:
        projectId: ${project_id}
        location: ${location}
        # run_dataflow_job ステップの return から参照できる。
        jobId: ${dataflow_result.job.id}
      result: job_status
      next: check_job_status

  - check_job_status:
      switch:
        - condition: ${job_status.currentState == "JOB_STATE_DONE"}
          next: job_succeeded
        - condition: ${job_status.currentState == "JOB_STATE_FAILED" or job_status.currentState == "JOB_STATE_CANCELLED"}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が失敗しました。ステータス " + job_status.currentState}
        - condition: ${counter >= max_retries}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が1時間以内に完了しませんでした。タイムアウト。"}
        - condition: ${true}
          next: increment_counter

  - increment_counter:
      assign:
        - counter: ${counter + 1}
      next: wait_60_seconds

  - wait_60_seconds:
      call: sys.sleep
      args:
        seconds: 60
      next: poll_job_status

  - job_succeeded:
      return: "SUCCESS"

【サンプルコードをデプロイ】

作成した Workflows の定義ファイルをデプロイ4します。

gcloud workflows deploy sample_workflow \
        --source=main.yaml \
        --location="asia-northeast1" \
        --project=${PROJECT_ID} \
        --service-account=${SERVICE_ACCOUNT_EMAIL} \
        --set-env-vars sdk_container_image=${Artifact Registry にpushしたdockerイメージ} \
        --set-env-vars gcs_bucket="gs://YOUR_BUCKET" \

【サンプルコードを実行した結果】

ループしているので実際に実行されたステップ数は137でした。

コストは 137 * 0.01 / 1000 = 0.00137 ドルになります。

5. Workflows を採用する上で許容した「不便な点」

コストと手軽さは魅力的ですが、導入に際しては以下のデメリットも考慮する必要がありました。

  1. 開発体験のクセ(YAML地獄)

    • 課題
      Python で記述できる Airflow と異なり、Workflows は YAML (または JSON) でロジックを記述する必要があります。条件分岐やループ処理、変数の扱いが直感的ではなく、構文エラーに悩まされることが多いです(一般的に "YAML engineering" と揶揄される部分)。
    • 対応
      今回は「直列的なフロー」に留めることで複雑な記述を回避しました。複雑なロジックが必要な場合は、無理に Workflows 内に書かず、Cloud Functions や Dataflow に逃がす設計が重要です。
  2. ローカルテスト・デバッグの難易度

    • 課題
      Cloud Composer (Airflow) はローカル環境を構築可能なので DAG のテストが可能ですが、Workflows はクラウド上のリソースと密結合しているため、ローカルでの完全な再現・テストが困難です。「修正してデプロイして実行」のサイクルになりがちです。
    • 対応
      ステップごとの単体テストは諦め、結合テスト中心で進める割り切りが必要でした。 また、別の Workflows に分割することで、ステップごとに運用できるように対応しました。
  3. ステップ間のデータ受け渡し制限(メモリサイズ)

    • 課題
      Workflows は大規模なデータをステップ間で直接受け渡すこと(ペイロードサイズ制限)には向いていません。
    • 対応
      今回の設計では、画像データそのものや大量のリストは Workflows 上を通過させず、必ず GCS のパスや BigQuery のテーブル名といった「参照情報」のみを受け渡すように徹底しました。
  4. ベンダーロックイン

    • 課題
      Airflow はOSS標準ですが、Workflows は Google Cloud 固有のサービスです。将来的に他のクラウドへ移行する場合、ポータビリティがありません。
    • 対応
      今回は GCP 完結のシステムであり、フルマネージドの恩恵(管理レス)を最優先しました。

6. まとめ

Workflows + Cloud Scheduler の組み合わせにより、日次の Vector Search インデックス更新を完全自動化できました。

コスト面以外では、インフラ管理コスト(Cloud Composer の環境維持など)を削減し、本質的なロジック開発に集中できることが Workflows の大きなメリットに感じました。

デメリットでYAML記法に言及しましたが、逆に言えば、どなたでも気軽に試してみることができるとも言えます。コストも軽いのでこれを機に是非一度お試しください。

読者の皆様がこれで良い体験を得ることができましたら私としても大変嬉しく思います。

明日18日目はAIテクノロジーグループの吉田さんです。

PHPerがRubyistになろうとしてつまづいたところ⑥プロセス内キャッシュ

WEBアプリケーションエンジニアの小松です!

プロセス内キャッシュの挙動に馴染みがなかったので、どういう挙動なのか。

ネットワーク越しのキャッシュとの使い分け。

他言語との比較でRails特有の仕様なのかどうか。

という疑問が湧いたので調査し、それを記事にしました。

この記事は[Enigmo Advent Calendar 2025]の16日目の記事です。  

 


ローカルキャッシュとは何か

ローカルキャッシュとは、Ruby プロセス内のメモリに値を保持し、同じプロセス内であれば何度呼び出されても再計算や再読み込みを行わない仕組みのことを指す。

Ruby では次の構文がある。

@config ||= YAML.load_file("config/settings.yml")

この構文は最初の一回だけ YAML.load_file が実行され、以降はメモリに保持された @config が返される。

Rails プロセスが動いている限り、この値は保持され続ける。


今回直面した疑問と調査内容

実際に自分が直面した疑問は次のようなものだった。

  • Rails サーバーが複数ある場合、各プロセスごとにキャッシュされるということは、そもそも「キャッシュ」と言えるのか

  • Memcached や Redis など外部キャッシュと比べて本当に速いのか

  • 毎回インスタンス変数に保存するだけで高速化されるように見えるが、仕組みとして本当に正しいのか

  • そもそもこれは Rails の仕様なのか、Ruby の仕様なのか

  • 他言語ではどう実現しているのか

これらを順番に整理していった。


「ディスク IO を避けたいだけなら」プロセス内キャッシュが最も速い

Rails.cache(Memcached/Redis)のキャッシュも高速だが、必ずネットワーク越しの通信が発生する。

クラウド環境であれば数百マイクロ秒〜数ミリ秒のオーバーヘッドが加わる。

一方、プロセス内キャッシュは Ruby プロセスが持つメモリに直接アクセスするだけで、ネットワークもディスクも介さない。

最短経路でデータにアクセスできるという点では最速になる。

ただし、これは「ローカルに存在する静的データ」に限った話である。

更新頻度が高いデータには適さない。


実際に採用したコード

今回検討していたコードは次のような YAML 読み込み処理だった。

def contents(condition)
  yaml = YAML.load_file('config/item_cate_desc.yml')
  # 以下ロジック...
end

これでは毎回ファイルを読み込み、ディスク IO が発生するため遅い。

そこで、次のように改善した。

def item_cate_yaml
  @item_cate_yaml ||= YAML.load_file('config/item_cate_desc.yml')
end

この1行によって、「最初の一回だけ読み込む」処理に変わる。

後はメモリに保持され続けるため、各リクエストで読み込む必要がない。

 


Rails 特有の挙動

Rails のコントローラでインスタンス変数を使っても、それはリクエストごとに新しく生成されるオブジェクトに所属するため、キャッシュとしては機能しない。

キャッシュとして効くのは、プロセスが生きている限り保持され続ける「クラスインスタンス変数」や「クラス変数」の方である。

PHP のようにリクエスト終了時にプロセスが破棄される言語とは異なり、Ruby(特に Rails のアプリサーバー)はプロセス常駐型のため、同じクラスインスタンス変数へ複数リクエストがアクセスする構造になっている。

この違いが理解しづらく、PHPer には馴染みがなく疑ってすらいたので、

railsアプリではクラスインスタンス変数の注意する #Ruby - Qiita

などの記事を参考にしてファクトチェックもしました。

 


Rails サーバーが複数台ある場合の挙動

ここについても疑問を持ったが、調べた結論は次のとおり。

  • Rails プロセス内で一度だけ読み込まれ、それぞれが独立してデータを保持する

  • よってプロセスを跨いだ共有キャッシュではない

  • ただし配置ファイル(YAML)が全サーバーで共通であれば問題はない

  • プロセス間の同期は不要で、むしろ高速

「複数サーバーだからキャッシュが効かない」という誤解があるが、ローカルキャッシュは各プロセス単位で成立するため問題ない。


キャッシュとしての位置づけの違い

データの性質に応じてどのキャッシュを選ぶべきか整理すると次のようになる。

種類 特徴 向いているケース
プロセス内キャッシュ 最高速。プロセスごと独立。データ変更には弱い 設定ファイル、マスターデータ
Rails.cache(Memcached/Redis) 共有キャッシュ。通信が必要 変更頻度がありサーバー間で共通化したいデータ
DB キャッシュ 一貫性は高いが IO コストあり モデルデータ

今回のような静的な YAML データであれば、間違いなくプロセス内キャッシュが適している。


この仕組みは Ruby 特有なのか

Ruby||= を使ったプロセス内キャッシュは極めて自然で扱いやすい。

もちろん他の言語でも似たことはできるが、次のように比較すると Ruby の簡潔さが際立つ。

Java

static 変数+ダブルチェックロックなど同期処理が必要で、明らかにコードが冗長。

Go

sync.Once を使う必要がある。

パッケージスコープの変数は設計上の制約も多い。

PHP

そもそも 1 リクエスト 1 プロセスのため、プロセス内キャッシュという概念が成立しない。

APCu など外部拡張に頼る必要がある。

Node.js

モジュールキャッシュにより Ruby に近い感覚で扱えるが、副作用の管理が必要で Ruby の手軽さとはやや異なる。

Ruby はプロセス常駐型で、かつクラスインスタンス変数が自然にキャッシュとして機能するため、他の言語と比較して特に扱いやすい。

 


まとめ

今回の検討で分かったのは、次のような点である。

  • Ruby@var ||= ... によるローカルキャッシュは、非常に手軽に使える最速のキャッシュ方式

  • 複数サーバーでも問題なく、各プロセスが独立してキャッシュを保持する

  • Memcached や Redis より速いのは、ネットワーク通信が一切ないため

  • データの性質に応じてキャッシュ方式は使い分けるべき

  • 他言語でも実現は可能だが、Ruby ほど自然で簡潔な形にはならない

静的な設定データを高速化したい場面では、最初に検討すべき手法と言える。

 

明日17日目はAIテクノロジーグループの太田さんです。

 

エニグモでは問い合わせをどう捌いているのか?ヘルプデスク業務の裏側をお伝えします

こんにちは、コーポレートエンジニア(コーポレートITチーム)の藤田です。 この記事は Enigmo Advent Calendar 2025 の15日目の記事です。

コーポレートIT(以下CO-IT)の業務において、地味ながらも非常に重要な「ヘルプデスク業務」についてお伝えします。 「どのようなツールを使って、どのようなフローで対応し、どうやってナレッジを残しているのか」 普段あまり表に出ることのない、運用の裏側をご紹介しようと思います。

自己紹介

本題の前に少し自己紹介をさせていただきます。

私は今年3月にエニグモに入社いたしました。エニグモへ入社する前は、鉄骨製作会社の情報システム部門で働いており、いわゆる「一人情シス」として働いていました。それ以前はシステムエンジニアとして開発業務に携わっていた経験があります。

なぜこの記事を書くのか

過去のAdvent Calendarでは、入社エントリーやチームビルディングに関する記事はありましたが、具体的な「CO-ITの業務内容」にフォーカスした記事はありませんでした。

そこで今回は、社内外の方に「エニグモのCO-ITって具体的にどんな業務をしているの?」を知っていただくため、主業務の一つであるヘルプデスクについて深掘りしてみたいと思います。

(他のCO-ITメンバーが作成した記事もぜひ目を通してみてください!)

エニグモのヘルプデスク構成要素

まず、問い合わせ対応に使用しているツールを紹介します。 

  • Slackワークフロー: 問い合わせ受付からナレッジ化までのデータ入力インターフェース。
  • Googleスプレッドシート: ログの集約・一時保管・ID管理
  • Zapier: ツール間の連携・自動化
  • Asana: タスク・進捗管理

ヘルプデスク対応のフロー

実際の問い合わせから完了までの流れは、以下のように自動化されています。 極力、人の手による「転記作業」をなくすように設計しています。

【User】問い合わせ入力

  ユーザーがSlackワークフローから問い合わせ内容を入力します。

【System】自動起票・通知

  問い合わせ内容が自動的にGoogleスプレッドシートへ転記されます。

  同時にCO-ITの「問い合わせ対応チャンネル」に通知が飛びます。

【CO-IT】担当者アサイ

  通知に対し、CO-ITメンバーが「担当します」ボタンをクリック。

  Asanaのタスクに担当者名が自動入力されます。

【CO-IT】対応・解決

  実際の調査・対応を行います。

【System】クロージング・ナレッジ化

  対応完了後、Slackワークフローに対応内容を入力。

  Asanaが「完了」ステータスに更新され、対応内容が記録されます。

  最後に「問い合わせナレッジチャンネル」へ内容が自動投稿されます。

運用における3つのこだわり

入り口の設計:「優しさ」と「セキュリティ」の両立

問い合わせの入り口は、内容の性質に合わせて「オープン」と「プライベート」の2つのワークフローを用意しています。

オープン問い合わせ

 用途:PCトラブルや仕様確認など、他のユーザーと共有しても有益な内容。

プライベート問い合わせ

 用途:人事関与など、秘匿性の高い内容。

これにより、ユーザーは適切な窓口を選択することで、セキュリティと利便性を両立させています。

出口の設計:「個人の記憶」ではなく「組織の記録」へ

ヘルプデスク業務において最も避けたい事態は、「過去に同様の問い合わせがあったはずなのに、どう解決したか分からない」という状況です。特に、退職したメンバーしか詳細を知らない案件などがブラックボックス化してしまうと、組織としての対応力は大きく低下してしまいます。それを解決するために、以下の2点の取り組みを行なっています。

あらゆる対応をナレッジ化する

突発的な相談や、日々のコミュニケーションの中で偶発的に発生した問い合わせに関しても、漏れなく記録・管理できる仕組みを整えています。「入り口」は柔軟に受け入れつつも、最終的にナレッジとして蓄積することで、情報の散逸を防いでいます。

自動的な情報の共有と蓄積

対応フローの最後に「問い合わせナレッジチャンネルへの自動投稿」を組み込んでいます。これにより、対応した担当者が不在でも、Slack上でキーワード検索をするだけで過去の類似事例や経緯を即座に引き出すことが可能になります。

「誰か一人が知っている」ではなく「組織全員がいつでも引き出せる」状態を作ること。これがエニグモのヘルプデスクです。

改善の設計:対応して終わりではなく「減らす」までが業務

ただ問い合わせを捌くだけでは、業務は改善しません。 私たちは月に一度、チーム内で「問い合わせに関する会議」を実施しています。

ここでは、その月の問い合わせ件数の推移を確認するだけでなく、頻発した問い合わせ内容について深掘りを行います。「なぜその問い合わせが発生したのか」「根本解決のためにどのような対策が必要か」「今後は対応方針をどう変えるべきか」を議論し、再発防止や業務フローの改善につなげています。

今後の展望

次なるステップとして「AI活用」を見据えています。

具体的には、これまでに蓄積されたナレッジデータを学習データとして活用し、生成AIによる「一次回答の自動化」や、担当者への「類似回答のレコメンド」機能の実装などに挑戦していきたいと考えています。 問い合わせ対応のスピードと質をさらに向上させ、ユーザーにとっても解決までの時間を短縮できるような環境を目指します。

おわりに

ヘルプデスク業務は、一般的に「雑用」や「誰でもできる仕事」と捉えられがちかもしれません。 しかし、エニグモではこの業務に非常に力を入れています。なぜなら、私たちは「来た問い合わせをただ捌くこと」がゴールだとは考えていないからです。

問い合わせの内容を分析し、「どのように問い合わせそのものを減らせるか」「ユーザーがストレスなく業務を行える環境を作れるか」を追求し続けること。これこそが、エニグモにおけるヘルプデスク業務のあり方だと考えています。

明日の記事の担当は アプリケーション開発グループ の 小松さんです。お楽しみに。

GCSToSFTPOperatorでハマった

こんにちは!WEBアプリケーションエンジニアの小松です!

今まで主に EC サイトの WEB エンジニアとして仕事をしてきて、Airflow を触るようになったのはエニグモに入社してからでした。

BUYMA では、広告媒体向けのフィード生成や外部パートナーとのデータ連携、在庫データの収集など、毎日大量に発生するバッチ処理を Airflow に任せています。

人手では絶対に回せない規模なので、Airflow は影の立役者のような存在です。

そんな Airflow を動かしている基盤が Google Cloud Composer なのですが、会社全体でオンプレサーバーからクラウドへ移行していく流れの中で、Composer も新しいバージョンへ移し替えることになりました。

「まあ普通に移行できるだろう」と思っていたら、まさかの沼にハマってしまい……
同じ罠に落ちる人が一人でも減りますように、という気持ちでこの記事を書いています。

この記事は[Enigmo Advent Calendar 2025]の13日目の記事です。  


結論(先に言います)

Airflow の GCSToSFTPOperator が突然 SFTP 認証できなくなった原因は…

Composer が入れている Paramiko のバージョンが古く、RSA 署名アルゴリズムがサーバーに拒否されたから。

つまり、

  • コマンドからは接続できる

  • Python の Paramiko スクリプトでも接続できる

  • でも Airflow からだけ認証エラーになる

という、最悪に分かりづらい症状が発生していました。


何が起きていたのか(時系列で紹介)

① Composer 移行後、SFTP アップロードだけエラー

Airflow 2 → Composer の新環境に移行した際、GCSToSFTPOperator だけが謎の認証失敗。

ログにはこれだけ:

 
Bad authentication type; allowed types: ['publickey']

鍵は設定済みのはずなのに、Airflow だけ失敗。謎が深まる。


② コマンド実行では成功する

Docker コンテナに入り、

 
sftp -i pri.key sftp.host.com

→ 成功。

設定ミスではないと確信。


Python(paramiko)でも成功する

「Airflow がダメなら paramiko 生で試すか」と思いテストコードを書くと…

→ 普通に成功。

つまり、Airflow 経由でのみ認証が弾かれている。


④ 「Airflow からだけ接続できない」という地獄に突入

Airflow → SFTPHook → SSHHook → Paramiko

このどこかが悪いのは確実だが、Extra の書式を変えても、パラメータを変えても改善しない。

Airflow のログは詳細な理由を出してくれない。

完全に暗闇の中を歩く状態。


⑤ 試行錯誤の果てに見えてきた「署名アルゴリズム問題」

Docker の sftp でのみ発生していたエラー:

 
sign_and_send_pubkey: no mutual signature supported

ここでようやく糸口が見えた。

  • サーバー:rsa-sha2-256/512 を要求

  • 古い Paramiko:ssh-rsaSHA1)署名を使ってしまう

サーバーが拒否

という構図。


⑥ Airflow(Paramiko)は署名アルゴリズムを指定できない

OpenSSH のように

 
-PubkeyAcceptedAlgorithms=ssh-rsa

といった強制は Paramiko では不可能。

つまり:

  • Airflow から署名アルゴリズムを変更するすべがない

  • 古い Paramiko を使っている限り絶対に成功しない

という仕様の問題。


⑦ Composer の paramiko を確認すると…古い!

Composer 内で

 
pip freeze | grep paramiko

すると……

2.7 系(古い)
rsa-sha2 に未対応

原因が完全に確定。


⑧ Composer をアップグレード → 一発成功

Composer の Airflow イメージをアップデートし、

  • Paramiko が 2.9+(rsa-sha2 デフォルト対応) に更新

その瞬間、

→ GCSToSFTPOperator が何事もなく成功。

設定は一文字も変えていません。

完全にバージョン差の問題でした。


技術的まとめ:今回の本質

問題の本質はこれ:

GCSToSFTPOperator → SFTPHook → Paramiko が SHA1 署名(ssh-rsa)しか使えず、外部サーバーが RSA-SHA2 を要求していたため認証が失敗した。


回避策(原理上)

方法 可能? 説明
Paramiko を 2.9+ にアップグレード 今回の完全解決策
key_file 形式で渡す Composer では鍵配置がやや面倒
RSA 鍵を SHA2 対応形式へ変換 問題は鍵ではなくクライアント側
SFTP サーバーに設定変更を依頼 外部企業のため不可能

最後に:今回の教訓

  • 「Airflow だけ接続できない」→ Paramiko のバージョンをまず疑うべし

  • Airflow のログは認証まわりが不親切で根本原因が見えづらい

  • Composer は内部ライブラリが固定なので、移行時に“バージョン差事故”が起きやすい

  • 結局のところ、問題の 9 割は Airflow が使っているライブラリのバージョン差

数日単位で調査し、無数のテストを書き、ようやく原因に辿り着いたので、この記事が誰かの時間を 30 分でも節約できたら嬉しいです。

 

明日12/14の記事はインフラエンジニア森田さんです。お楽しみに。