「AIでさがす」サービスのリニューアル - BUYMA内記事コンテンツをベースにした商品提案エージェントの実現

こんにちは、AIテクノロジーグループのエンジニアの吉田です。

本記事はEnigmo Advent Calendar 2025の 18日目の記事です。

普段は検索システム全般、機械学習システムのMLOps、AI関連の機能開発を担当しております。

この記事では「AIでさがす」サービスのリニューアルについて紹介します。

「AIでさがす」サービスとは

「AIでさがす」サービスは、BUYMAのWebサイトおよびアプリで提供している、AIを活用した商品提案サービスです。

実際の機能は以下からご利用頂けます。(BUYMAアカウントでのログインが必要となります。)

「AIでさがす」サービス

ユーザーが文章で質問すると、AIが質問内容を理解し、おすすめの商品を提案します。例えば「春のデートにぴったりなワンピースを教えて」といった質問に対して、AIが回答文とともに具体的な商品を紹介します。

従来のキーワード検索では見つけにくかった商品や、ユーザー自身が気づいていなかった新しい商品との出会いを提供することで、BUYMAでのショッピング体験をより豊かにすることを目指しています。

※商品画像はモザイク加工しております。

リニューアルの背景

旧システムは、ChatGPT APIを活用した商品提案サービスでしたが、主な課題が3点ありました。

  • BUYMAの知識不足
    • ChatGPT が一般的な知識で回答を生成するため、BUYMAならではのトレンドや商品特性を反映できない。
  • 根拠の不明確さ
  • 検索キーワード生成の精度
    • 形態素解析ツールの MeCab を併用していましたが、文脈や意味を理解した検索キーワード生成ができない。

また、リリースから2年が経過し、本格的にバージョンアップが必要なタイミングでもありました。

※旧システムの詳細はこちらの記事で紹介しております。

ちょうどチームメンバーが社内ドキュメントのAI検索システムを開発しており、この仕組みをBUYMAの多数の記事コンテンツに適用すれば、よりBUYMAらしい商品提案が可能になると考えました。

そこで、今回のリニューアルでは、BUYMA内の記事コンテンツ群をベースに会話するエージェントを作成しました。これにより、BUYMAならではの知識を持ったAIが、よりBUYMAでおすすめしたい商品を提案できるようになりました。

システム変更前後の比較

旧システムと新システムの違いは以下の通りです。

旧システムでは、ChatGPT が一般的な知識で回答を生成し、MeCab による単純な形態素解析で検索キーワードを生成していました。そのため、BUYMAならではの文脈を理解した商品提案が難しい状況でした。

新システムでは、BUYMA内記事コンテンツを参照した Vertex AI Search が回答文を生成し、Gemini が文脈を理解した検索キーワードを生成します。その結果、よりBUYMAらしい商品提案が可能になりました。

それぞれの処理フローは以下の通りです。

旧システム処理フロー

  1. BUYMA基幹システムから「AIでさがす」APIにリクエスト
  2. 「AIでさがす」APIがユーザーの質問を ChatGPT APIに送信
  3. ChatGPT が回答文とおすすめアイテムリストを生成
  4. アイテム名を MeCab形態素解析)で解析し、検索キーワードを生成
  5. 検索APIで商品情報を取得し、ユーザーに表示

新システム処理フロー

  1. BUYMA基幹システムから「AIでさがす」APIにリクエスト
  2. 「AIでさがす」APIがユーザーの質問を Vertex AI Search に送信
  3. Vertex AI Search (事前にBUYMA内記事コンテンツをインポート済み)が回答文を生成
  4. 質問文と回答文を Gemini に送信し、検索キーワードを生成
  5. 検索APIで商品情報を取得し、ユーザーに表示

アーキテクチャー特徴

Vertex AI Search を利用して、インポートしたBUYMA内記事コンテンツをベースに会話を行うエージェントを構築しました。

  • BUYMA内記事コンテンツのインポート
    • 約4000件の記事をデータストアにインポート
  • プロンプト設計
    • 「ファッションECサイトBUYMAのショッピングアドバイザー」として定義し、ユーザーの質問に対して最適な商品を提案する形で回答を生成

2. Gemini

Gemini を活用する事により、会話内容から商品検索キーワードを生成する機能を作成しました。

  • プロンプト設計
    • ECサイトの検索キーワードを生成する専門家」として定義し、会話の文脈を理解して検索キーワードを生成
  • MeCab との違い
    • MeCab は単語の分解のみだが、Gemini は文脈を理解してブランド名・カテゴリ名・モデル名を組み合わせた検索キーワードを生成

実装時の課題・解決策・工夫した点

  • Vertex AI Search の幻覚への対応

    • 初回質問時に Vertex AI Search が過去から質問が続いているような幻覚を見る場合がありました。当初は初回と2回目以降の会話を共通のプロンプトで行っており、「ユーザーの過去の質問履歴」の項目に入っている文言の有無から初回なのか、2回目以降の会話なのかを判断する指示を出していました。ところが、「過去」という文言に引きずられてなのか、初回なのに過去の質問をAI側が捏造して、その続きとして回答する場合が稀にありました。
    • プロンプトテンプレートを初回用と2回目以降用の2種類に分け、初回用のプロンプトからは「ユーザーの過去の質問履歴」の文言自体を削除する事によって対応しました。
  • 敵対的クエリへの対応

    • 敵対的クエリ(不適切な質問)の場合、Vertex AI Search のAPIからのレスポンスフォーマットが通常とは異なるものになり、要約が生成できないにもかかわらず、無理やり商品紹介を行ってしまいました。
    • 敵対的クエリーのフォーマットを検知した場合は、要約失敗として扱い、商品紹介を行わないように修正しました。
    • この場合以外でも稀に異なるフォーマットのレスポンスになる場合があり、サービス継続に支障が出ないように都度改善を行いました。
  • Gemini のライブラリ移行

    • もともと使用していたライブラリがサポート終了を迎えるため、社内では実績がない新しいライブラリに移行する必要がありました。移行後、従来使用していた Gemini モデルが初期設定では使用できず、次世代のモデルを試したところレスポンスタイムが大幅に遅くなってしまいました。新しいライブラリという事もあり、AIツールではなかなか解決できず、最終的にはGoogleサポートに問い合わせして解決に至りました。

得られた学びとノウハウ

  • AIツールの活用と限界

    • 「AIでさがす」のバックエンドAPIリポジトリは、ほぼ全部作り直したのですが、AIツールを活用する事によって、工数を節約する事ができました。Terraform 関連のリソース修正、テストケース作成やMOCK用のフロントエンド実装等においてもAIツールにより大幅な工数削減ができました。
    • 一方で、Gemini のライブラリ移行など、ドキュメントの記載やインターネット上での知見が少ない領域ではAIツールでは解決できず、結果的に公式サポートへの問い合わせが必要でした。
  • AIの不確定な挙動への対応

    • Vertex AI Search の幻覚や部分的な失敗など、AIサービス特有の不確実性に対して、初回用と2回目以降用でテンプレートを分けるなど、細かな調整が重要でした。また、プロンプトだけではどうする事もできない場合があり、そのような場合は後処理でルールベースのロジックを追加する必要がありました。

効果測定

リニューアル後、以下のような指標が上昇しました。

  • 1スレッドあたりの質問数の平均
    • 会話の継続性が向上し、ユーザーが複数回質問を続けるようになりました。
  • 1ユーザー1日あたりの質問数
    • 利用頻度が向上し、ユーザーがより積極的に機能を活用するようになりました。
  • 検索URLに遷移された回数
    • 商品検索への誘導効果が向上し、実際の商品閲覧につながるケースが増加しました。

これらの結果から、BUYMA内記事コンテンツを根拠とした回答の提供と、文脈を理解した検索キーワード生成により、ユーザーの満足度と利用価値が向上したと考えられます。

直近の対応/今後の展開・課題

  • 金額絞り込み機能の追加(今月対応)

    • ユーザーからの要望が多い金額絞り込み機能の対応をしました。価格帯に関する質問に対して適切な商品提案ができていない課題があったため、Gemini で検索API用の金額フィルタークエリを生成することで対応しました。
  • コンテンツの拡充

    • 現在はBUYMA内記事コンテンツのみを Vertex AI Search にインポートしていますが、今後はYouTubeでの発信内容も追加する予定です。記事以外のコンテンツも活用することで、より幅広い情報をユーザーに提供できるようになります。
  • 継続的なメンテナンス

    • AIのライブラリやモデルは随時更新されていくため、継続的なメンテナンスが課題となります。特に Gemini や Vertex AI Search などのサービスは進化が早く、新しいモデルへの対応や非推奨ライブラリ/バージョンから移行など、定期的な見直しが必要です。

まとめ

本記事では、「AIでさがす」サービスのリニューアルについて紹介しました。

旧システムでは ChatGPT と MeCab を使用していましたが、BUYMA特有の知識不足や根拠の不明確さなどの課題がありました。リニューアルでは Vertex AI Search と Gemini を採用し、BUYMA内記事コンテンツを根拠とした回答生成と文脈を理解した検索キーワード生成を実現しました。

実装時には敵対的クエリへの対応やAIサービス特有の不確実性への対処など様々な課題に直面しましたが、ロジックでの細かい制御やAIツールの活用により解決できました。リニューアル後は会話継続性や利用頻度、商品検索への誘導効果が明らかに向上しています。

明日の記事は同じAIテクノロジーグループの髙橋さんです。お楽しみに。


株式会社エニグモ すべての求人一覧 hrmos.co

Workflows + Cloud Scheduler で定期処理をサーバーレス構築(Cloud Composer との比較もあります)

こんにちは、AIテクノロジーグループの太田です。
普段は商品のカタログデータ基盤を開発・運用するチームで業務に携わっております。エニグモではそういったデータやAI関連の技術基盤としてGCPを利用しており、そこで利用したWorkflowsについて紹介したいと思います。

この記事はEnigmo Advent Calendar 2025の17日目の記事です。

1. はじめに:なぜこの構成に至ったか

  • 背景
    毎日追加・更新される商品データを Vector Search のインデックスに反映させる必要があった。
    Cloud Composer (Airflow) の利用実績はあったものの、より安価な Workflows に興味があった。
  • 課題
    「差分抽出 → 画像処理(Embedding) → インデックス更新」という一連のフローを毎日決まった時間に実行したい。
    各処理ステップ(特に画像処理とインデックス更新)は時間がかかるため、タイムアウトやリトライ制御を考慮する必要がある。
    当然コストは抑えたい。
  • 結論
    Cloud Composer を使わず、Workflows + Cloud Scheduler を採用することで、管理コストと金銭的コストを最小限に抑えたアーキテクチャを構築した。

ポイントは、重たい処理(画像処理・インデックス更新)は Dataflow に任せ、Workflows はあくまで「順序制御」に徹する構成にしたことです。

2. Goolge Cloud 構成:全体アーキテクチャ概要

処理の流れは次のとおりです。

  • Cloud Scheduler 毎日定時に Workflows をトリガー。
  • Workflows: 全体の指揮者。以下のステップを順次実行。
    • BigQuery: 前日データとの差分をSQLで抽出。
    • Workflows: 画像の Embedding 計算とGCSへの保存を実行する Dataflow を実行する。
    • Workflows: Vector Search のインデックス更新を実行する Dataflow を実行する。

ポイントは、長時間実行かつ単発で実行する機会がある Dataflow の実行を別の Workflows に委ね、メインの Workflows から別の Workflows を呼び出すようにしたことです。

Dataflowの実装については、本記事の趣旨から外れるので省略いたします。

3. 技術選定:なぜ Cloud Composer ではなく Workflows なのか

このセクションで、他の選択肢と比較し、なぜ今回の構成に至ったかを解説します。

比較項目 Workflows Cloud Composer
特性 サーバーレスで軽量、直線的なフローに最適 複雑な依存関係に強いが、常時稼働が必要
コスト 安価(実行回数課金)
1000ステップ0.01ドル1
Google Cloud 外へのアクセスを要する場合は1000ステップ0.025ドル
高い(小規模でも月額数万円〜)2
実際に月額約8万円かかっています
採用/不採用の決め手 今回の処理が「直線的」であり、複雑なDAGが不要だったため 日次バッチ一つに対してはオーバースペック

運用実績があったからといって、「とりあえず Airflow」とせずに、ワークフローの複雑さに応じてツールを選定できた点が良かったです。

実際に使ってみて、単純な A -> B -> C というフローなら Workflows の方が圧倒的に運用・コストメリットが大きいことが実感できました。

4. 実装サンプル:Workflows から Dataflow を起動する

ここでは、実際に Workflows を使って Dataflow (Flex Template) を起動するための定義(YAML)を紹介します。

【コード解説のポイント】

Dataflow の Flex Template を利用することで、Docker イメージ化したジョブをパラメータ付きで呼び出せます。
Workflows 側でジョブの完了を待機する(ポーリングする)ようにしました。
googleapi 3 で Dataflow 以外の各種 Google Cloud のプロダクトへアクセスすることができるので、参照してみてください。

【サンプルコード(YAML)】

実際に作成した Dataflow を起動する Workflows の定義ファイル(main.yaml)の一部を掲載します。

  • main.yaml の抜粋イメージ
steps:
  - init:
      assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - location: "asia-northeast1"
        - job_name: "dataflow-launcher"
        # デプロイするときに --set-env-vars で設定した環境変数をここで読み込む
        - sdk_container_image: ${sys.get_env("sdk_container_image")}
        - gcs_bucket: ${sys.get_env("gcs_bucket")}
        # YYYYMMDD 形式の日付
        - ymd: ${text.replace_all(text.split(time.format(sys.now()), "T")[0], "-", "")}
  
  # 画像処理をするので時間がかかる Dataflow を実行するステップ
  - dataflow_start_crop_embedding:
      call: googleapis.dataflow.v1b3.projects.locations.flexTemplates.launch
      args:
        projectId: ${project_id}
        location: ${location}
        body:
          launchParameter:
            jobName: ${job_name + ymd}
            containerSpecGcsPath: ${gcs_bucket + "/templates/your-template-spec.json"}
            parameters:
              sdk_container_image: ${sdk_container_image}
            environment:
              stagingLocation: ${gcs_bucket + "/templates/staging"}
              tempLocation: ${gcs_bucket + "/templates/temp"}
              serviceAccountEmail: ${Dataflow 実行権限を持つ Service Account}
      result: dataflow_result
      next: initialize_polling

  # 以下、Dataflow を完了まで監視するステップ
  # ループした回数だけステップ数が増えてコストも増えていくのでループ数には注意してください。

  - initialize_polling:
      assign:
        - counter: 0
        # "max_retries * 60秒 (wait_60_secondsで定義) = 1時間"なので、最大1時間ポーリングを行う。
        - max_retries: 60
      next: poll_job_status

  - poll_job_status:
      call: googleapis.dataflow.v1b3.projects.locations.jobs.get
      args:
        projectId: ${project_id}
        location: ${location}
        # run_dataflow_job ステップの return から参照できる。
        jobId: ${dataflow_result.job.id}
      result: job_status
      next: check_job_status

  - check_job_status:
      switch:
        - condition: ${job_status.currentState == "JOB_STATE_DONE"}
          next: job_succeeded
        - condition: ${job_status.currentState == "JOB_STATE_FAILED" or job_status.currentState == "JOB_STATE_CANCELLED"}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が失敗しました。ステータス " + job_status.currentState}
        - condition: ${counter >= max_retries}
          raise: ${"Dataflowジョブ " + dataflow_result.job.id + " が1時間以内に完了しませんでした。タイムアウト。"}
        - condition: ${true}
          next: increment_counter

  - increment_counter:
      assign:
        - counter: ${counter + 1}
      next: wait_60_seconds

  - wait_60_seconds:
      call: sys.sleep
      args:
        seconds: 60
      next: poll_job_status

  - job_succeeded:
      return: "SUCCESS"

【サンプルコードをデプロイ】

作成した Workflows の定義ファイルをデプロイ4します。

gcloud workflows deploy sample_workflow \
        --source=main.yaml \
        --location="asia-northeast1" \
        --project=${PROJECT_ID} \
        --service-account=${SERVICE_ACCOUNT_EMAIL} \
        --set-env-vars sdk_container_image=${Artifact Registry にpushしたdockerイメージ} \
        --set-env-vars gcs_bucket="gs://YOUR_BUCKET" \

【サンプルコードを実行した結果】

ループしているので実際に実行されたステップ数は137でした。

コストは 137 * 0.01 / 1000 = 0.00137 ドルになります。

5. Workflows を採用する上で許容した「不便な点」

コストと手軽さは魅力的ですが、導入に際しては以下のデメリットも考慮する必要がありました。

  1. 開発体験のクセ(YAML地獄)

    • 課題
      Python で記述できる Airflow と異なり、Workflows は YAML (または JSON) でロジックを記述する必要があります。条件分岐やループ処理、変数の扱いが直感的ではなく、構文エラーに悩まされることが多いです(一般的に "YAML engineering" と揶揄される部分)。
    • 対応
      今回は「直列的なフロー」に留めることで複雑な記述を回避しました。複雑なロジックが必要な場合は、無理に Workflows 内に書かず、Cloud Functions や Dataflow に逃がす設計が重要です。
  2. ローカルテスト・デバッグの難易度

    • 課題
      Cloud Composer (Airflow) はローカル環境を構築可能なので DAG のテストが可能ですが、Workflows はクラウド上のリソースと密結合しているため、ローカルでの完全な再現・テストが困難です。「修正してデプロイして実行」のサイクルになりがちです。
    • 対応
      ステップごとの単体テストは諦め、結合テスト中心で進める割り切りが必要でした。 また、別の Workflows に分割することで、ステップごとに運用できるように対応しました。
  3. ステップ間のデータ受け渡し制限(メモリサイズ)

    • 課題
      Workflows は大規模なデータをステップ間で直接受け渡すこと(ペイロードサイズ制限)には向いていません。
    • 対応
      今回の設計では、画像データそのものや大量のリストは Workflows 上を通過させず、必ず GCS のパスや BigQuery のテーブル名といった「参照情報」のみを受け渡すように徹底しました。
  4. ベンダーロックイン

    • 課題
      Airflow はOSS標準ですが、Workflows は Google Cloud 固有のサービスです。将来的に他のクラウドへ移行する場合、ポータビリティがありません。
    • 対応
      今回は GCP 完結のシステムであり、フルマネージドの恩恵(管理レス)を最優先しました。

6. まとめ

Workflows + Cloud Scheduler の組み合わせにより、日次の Vector Search インデックス更新を完全自動化できました。

コスト面以外では、インフラ管理コスト(Cloud Composer の環境維持など)を削減し、本質的なロジック開発に集中できることが Workflows の大きなメリットに感じました。

デメリットでYAML記法に言及しましたが、逆に言えば、どなたでも気軽に試してみることができるとも言えます。コストも軽いのでこれを機に是非一度お試しください。

読者の皆様がこれで良い体験を得ることができましたら私としても大変嬉しく思います。

明日18日目はAIテクノロジーグループの吉田さんです。

PHPerがRubyistになろうとしてつまづいたところ⑥プロセス内キャッシュ

WEBアプリケーションエンジニアの小松です!

プロセス内キャッシュの挙動に馴染みがなかったので、どういう挙動なのか。

ネットワーク越しのキャッシュとの使い分け。

他言語との比較でRails特有の仕様なのかどうか。

という疑問が湧いたので調査し、それを記事にしました。

この記事は[Enigmo Advent Calendar 2025]の16日目の記事です。  

 


ローカルキャッシュとは何か

ローカルキャッシュとは、Ruby プロセス内のメモリに値を保持し、同じプロセス内であれば何度呼び出されても再計算や再読み込みを行わない仕組みのことを指す。

Ruby では次の構文がある。

@config ||= YAML.load_file("config/settings.yml")

この構文は最初の一回だけ YAML.load_file が実行され、以降はメモリに保持された @config が返される。

Rails プロセスが動いている限り、この値は保持され続ける。


今回直面した疑問と調査内容

実際に自分が直面した疑問は次のようなものだった。

  • Rails サーバーが複数ある場合、各プロセスごとにキャッシュされるということは、そもそも「キャッシュ」と言えるのか

  • Memcached や Redis など外部キャッシュと比べて本当に速いのか

  • 毎回インスタンス変数に保存するだけで高速化されるように見えるが、仕組みとして本当に正しいのか

  • そもそもこれは Rails の仕様なのか、Ruby の仕様なのか

  • 他言語ではどう実現しているのか

これらを順番に整理していった。


「ディスク IO を避けたいだけなら」プロセス内キャッシュが最も速い

Rails.cache(Memcached/Redis)のキャッシュも高速だが、必ずネットワーク越しの通信が発生する。

クラウド環境であれば数百マイクロ秒〜数ミリ秒のオーバーヘッドが加わる。

一方、プロセス内キャッシュは Ruby プロセスが持つメモリに直接アクセスするだけで、ネットワークもディスクも介さない。

最短経路でデータにアクセスできるという点では最速になる。

ただし、これは「ローカルに存在する静的データ」に限った話である。

更新頻度が高いデータには適さない。


実際に採用したコード

今回検討していたコードは次のような YAML 読み込み処理だった。

def contents(condition)
  yaml = YAML.load_file('config/item_cate_desc.yml')
  # 以下ロジック...
end

これでは毎回ファイルを読み込み、ディスク IO が発生するため遅い。

そこで、次のように改善した。

def item_cate_yaml
  @item_cate_yaml ||= YAML.load_file('config/item_cate_desc.yml')
end

この1行によって、「最初の一回だけ読み込む」処理に変わる。

後はメモリに保持され続けるため、各リクエストで読み込む必要がない。

 


Rails 特有の挙動

Rails のコントローラでインスタンス変数を使っても、それはリクエストごとに新しく生成されるオブジェクトに所属するため、キャッシュとしては機能しない。

キャッシュとして効くのは、プロセスが生きている限り保持され続ける「クラスインスタンス変数」や「クラス変数」の方である。

PHP のようにリクエスト終了時にプロセスが破棄される言語とは異なり、Ruby(特に Rails のアプリサーバー)はプロセス常駐型のため、同じクラスインスタンス変数へ複数リクエストがアクセスする構造になっている。

この違いが理解しづらく、PHPer には馴染みがなく疑ってすらいたので、

railsアプリではクラスインスタンス変数の注意する #Ruby - Qiita

などの記事を参考にしてファクトチェックもしました。

 


Rails サーバーが複数台ある場合の挙動

ここについても疑問を持ったが、調べた結論は次のとおり。

  • Rails プロセス内で一度だけ読み込まれ、それぞれが独立してデータを保持する

  • よってプロセスを跨いだ共有キャッシュではない

  • ただし配置ファイル(YAML)が全サーバーで共通であれば問題はない

  • プロセス間の同期は不要で、むしろ高速

「複数サーバーだからキャッシュが効かない」という誤解があるが、ローカルキャッシュは各プロセス単位で成立するため問題ない。


キャッシュとしての位置づけの違い

データの性質に応じてどのキャッシュを選ぶべきか整理すると次のようになる。

種類 特徴 向いているケース
プロセス内キャッシュ 最高速。プロセスごと独立。データ変更には弱い 設定ファイル、マスターデータ
Rails.cache(Memcached/Redis) 共有キャッシュ。通信が必要 変更頻度がありサーバー間で共通化したいデータ
DB キャッシュ 一貫性は高いが IO コストあり モデルデータ

今回のような静的な YAML データであれば、間違いなくプロセス内キャッシュが適している。


この仕組みは Ruby 特有なのか

Ruby||= を使ったプロセス内キャッシュは極めて自然で扱いやすい。

もちろん他の言語でも似たことはできるが、次のように比較すると Ruby の簡潔さが際立つ。

Java

static 変数+ダブルチェックロックなど同期処理が必要で、明らかにコードが冗長。

Go

sync.Once を使う必要がある。

パッケージスコープの変数は設計上の制約も多い。

PHP

そもそも 1 リクエスト 1 プロセスのため、プロセス内キャッシュという概念が成立しない。

APCu など外部拡張に頼る必要がある。

Node.js

モジュールキャッシュにより Ruby に近い感覚で扱えるが、副作用の管理が必要で Ruby の手軽さとはやや異なる。

Ruby はプロセス常駐型で、かつクラスインスタンス変数が自然にキャッシュとして機能するため、他の言語と比較して特に扱いやすい。

 


まとめ

今回の検討で分かったのは、次のような点である。

  • Ruby@var ||= ... によるローカルキャッシュは、非常に手軽に使える最速のキャッシュ方式

  • 複数サーバーでも問題なく、各プロセスが独立してキャッシュを保持する

  • Memcached や Redis より速いのは、ネットワーク通信が一切ないため

  • データの性質に応じてキャッシュ方式は使い分けるべき

  • 他言語でも実現は可能だが、Ruby ほど自然で簡潔な形にはならない

静的な設定データを高速化したい場面では、最初に検討すべき手法と言える。

 

明日17日目はAIテクノロジーグループの太田さんです。

 

エニグモでは問い合わせをどう捌いているのか?ヘルプデスク業務の裏側をお伝えします

こんにちは、コーポレートエンジニア(コーポレートITチーム)の藤田です。 この記事は Enigmo Advent Calendar 2025 の15日目の記事です。

コーポレートIT(以下CO-IT)の業務において、地味ながらも非常に重要な「ヘルプデスク業務」についてお伝えします。 「どのようなツールを使って、どのようなフローで対応し、どうやってナレッジを残しているのか」 普段あまり表に出ることのない、運用の裏側をご紹介しようと思います。

自己紹介

本題の前に少し自己紹介をさせていただきます。

私は今年3月にエニグモに入社いたしました。エニグモへ入社する前は、鉄骨製作会社の情報システム部門で働いており、いわゆる「一人情シス」として働いていました。それ以前はシステムエンジニアとして開発業務に携わっていた経験があります。

なぜこの記事を書くのか

過去のAdvent Calendarでは、入社エントリーやチームビルディングに関する記事はありましたが、具体的な「CO-ITの業務内容」にフォーカスした記事はありませんでした。

そこで今回は、社内外の方に「エニグモのCO-ITって具体的にどんな業務をしているの?」を知っていただくため、主業務の一つであるヘルプデスクについて深掘りしてみたいと思います。

(他のCO-ITメンバーが作成した記事もぜひ目を通してみてください!)

エニグモのヘルプデスク構成要素

まず、問い合わせ対応に使用しているツールを紹介します。 

  • Slackワークフロー: 問い合わせ受付からナレッジ化までのデータ入力インターフェース。
  • Googleスプレッドシート: ログの集約・一時保管・ID管理
  • Zapier: ツール間の連携・自動化
  • Asana: タスク・進捗管理

ヘルプデスク対応のフロー

実際の問い合わせから完了までの流れは、以下のように自動化されています。 極力、人の手による「転記作業」をなくすように設計しています。

【User】問い合わせ入力

  ユーザーがSlackワークフローから問い合わせ内容を入力します。

【System】自動起票・通知

  問い合わせ内容が自動的にGoogleスプレッドシートへ転記されます。

  同時にCO-ITの「問い合わせ対応チャンネル」に通知が飛びます。

【CO-IT】担当者アサイ

  通知に対し、CO-ITメンバーが「担当します」ボタンをクリック。

  Asanaのタスクに担当者名が自動入力されます。

【CO-IT】対応・解決

  実際の調査・対応を行います。

【System】クロージング・ナレッジ化

  対応完了後、Slackワークフローに対応内容を入力。

  Asanaが「完了」ステータスに更新され、対応内容が記録されます。

  最後に「問い合わせナレッジチャンネル」へ内容が自動投稿されます。

運用における3つのこだわり

入り口の設計:「優しさ」と「セキュリティ」の両立

問い合わせの入り口は、内容の性質に合わせて「オープン」と「プライベート」の2つのワークフローを用意しています。

オープン問い合わせ

 用途:PCトラブルや仕様確認など、他のユーザーと共有しても有益な内容。

プライベート問い合わせ

 用途:人事関与など、秘匿性の高い内容。

これにより、ユーザーは適切な窓口を選択することで、セキュリティと利便性を両立させています。

出口の設計:「個人の記憶」ではなく「組織の記録」へ

ヘルプデスク業務において最も避けたい事態は、「過去に同様の問い合わせがあったはずなのに、どう解決したか分からない」という状況です。特に、退職したメンバーしか詳細を知らない案件などがブラックボックス化してしまうと、組織としての対応力は大きく低下してしまいます。それを解決するために、以下の2点の取り組みを行なっています。

あらゆる対応をナレッジ化する

突発的な相談や、日々のコミュニケーションの中で偶発的に発生した問い合わせに関しても、漏れなく記録・管理できる仕組みを整えています。「入り口」は柔軟に受け入れつつも、最終的にナレッジとして蓄積することで、情報の散逸を防いでいます。

自動的な情報の共有と蓄積

対応フローの最後に「問い合わせナレッジチャンネルへの自動投稿」を組み込んでいます。これにより、対応した担当者が不在でも、Slack上でキーワード検索をするだけで過去の類似事例や経緯を即座に引き出すことが可能になります。

「誰か一人が知っている」ではなく「組織全員がいつでも引き出せる」状態を作ること。これがエニグモのヘルプデスクです。

改善の設計:対応して終わりではなく「減らす」までが業務

ただ問い合わせを捌くだけでは、業務は改善しません。 私たちは月に一度、チーム内で「問い合わせに関する会議」を実施しています。

ここでは、その月の問い合わせ件数の推移を確認するだけでなく、頻発した問い合わせ内容について深掘りを行います。「なぜその問い合わせが発生したのか」「根本解決のためにどのような対策が必要か」「今後は対応方針をどう変えるべきか」を議論し、再発防止や業務フローの改善につなげています。

今後の展望

次なるステップとして「AI活用」を見据えています。

具体的には、これまでに蓄積されたナレッジデータを学習データとして活用し、生成AIによる「一次回答の自動化」や、担当者への「類似回答のレコメンド」機能の実装などに挑戦していきたいと考えています。 問い合わせ対応のスピードと質をさらに向上させ、ユーザーにとっても解決までの時間を短縮できるような環境を目指します。

おわりに

ヘルプデスク業務は、一般的に「雑用」や「誰でもできる仕事」と捉えられがちかもしれません。 しかし、エニグモではこの業務に非常に力を入れています。なぜなら、私たちは「来た問い合わせをただ捌くこと」がゴールだとは考えていないからです。

問い合わせの内容を分析し、「どのように問い合わせそのものを減らせるか」「ユーザーがストレスなく業務を行える環境を作れるか」を追求し続けること。これこそが、エニグモにおけるヘルプデスク業務のあり方だと考えています。

明日の記事の担当は アプリケーション開発グループ の 小松さんです。お楽しみに。

GCSToSFTPOperatorでハマった

こんにちは!WEBアプリケーションエンジニアの小松です!

今まで主に EC サイトの WEB エンジニアとして仕事をしてきて、Airflow を触るようになったのはエニグモに入社してからでした。

BUYMA では、広告媒体向けのフィード生成や外部パートナーとのデータ連携、在庫データの収集など、毎日大量に発生するバッチ処理を Airflow に任せています。

人手では絶対に回せない規模なので、Airflow は影の立役者のような存在です。

そんな Airflow を動かしている基盤が Google Cloud Composer なのですが、会社全体でオンプレサーバーからクラウドへ移行していく流れの中で、Composer も新しいバージョンへ移し替えることになりました。

「まあ普通に移行できるだろう」と思っていたら、まさかの沼にハマってしまい……
同じ罠に落ちる人が一人でも減りますように、という気持ちでこの記事を書いています。

この記事は[Enigmo Advent Calendar 2025]の13日目の記事です。  


結論(先に言います)

Airflow の GCSToSFTPOperator が突然 SFTP 認証できなくなった原因は…

Composer が入れている Paramiko のバージョンが古く、RSA 署名アルゴリズムがサーバーに拒否されたから。

つまり、

  • コマンドからは接続できる

  • Python の Paramiko スクリプトでも接続できる

  • でも Airflow からだけ認証エラーになる

という、最悪に分かりづらい症状が発生していました。


何が起きていたのか(時系列で紹介)

① Composer 移行後、SFTP アップロードだけエラー

Airflow 2 → Composer の新環境に移行した際、GCSToSFTPOperator だけが謎の認証失敗。

ログにはこれだけ:

 
Bad authentication type; allowed types: ['publickey']

鍵は設定済みのはずなのに、Airflow だけ失敗。謎が深まる。


② コマンド実行では成功する

Docker コンテナに入り、

 
sftp -i pri.key sftp.host.com

→ 成功。

設定ミスではないと確信。


Python(paramiko)でも成功する

「Airflow がダメなら paramiko 生で試すか」と思いテストコードを書くと…

→ 普通に成功。

つまり、Airflow 経由でのみ認証が弾かれている。


④ 「Airflow からだけ接続できない」という地獄に突入

Airflow → SFTPHook → SSHHook → Paramiko

このどこかが悪いのは確実だが、Extra の書式を変えても、パラメータを変えても改善しない。

Airflow のログは詳細な理由を出してくれない。

完全に暗闇の中を歩く状態。


⑤ 試行錯誤の果てに見えてきた「署名アルゴリズム問題」

Docker の sftp でのみ発生していたエラー:

 
sign_and_send_pubkey: no mutual signature supported

ここでようやく糸口が見えた。

  • サーバー:rsa-sha2-256/512 を要求

  • 古い Paramiko:ssh-rsaSHA1)署名を使ってしまう

サーバーが拒否

という構図。


⑥ Airflow(Paramiko)は署名アルゴリズムを指定できない

OpenSSH のように

 
-PubkeyAcceptedAlgorithms=ssh-rsa

といった強制は Paramiko では不可能。

つまり:

  • Airflow から署名アルゴリズムを変更するすべがない

  • 古い Paramiko を使っている限り絶対に成功しない

という仕様の問題。


⑦ Composer の paramiko を確認すると…古い!

Composer 内で

 
pip freeze | grep paramiko

すると……

2.7 系(古い)
rsa-sha2 に未対応

原因が完全に確定。


⑧ Composer をアップグレード → 一発成功

Composer の Airflow イメージをアップデートし、

  • Paramiko が 2.9+(rsa-sha2 デフォルト対応) に更新

その瞬間、

→ GCSToSFTPOperator が何事もなく成功。

設定は一文字も変えていません。

完全にバージョン差の問題でした。


技術的まとめ:今回の本質

問題の本質はこれ:

GCSToSFTPOperator → SFTPHook → Paramiko が SHA1 署名(ssh-rsa)しか使えず、外部サーバーが RSA-SHA2 を要求していたため認証が失敗した。


回避策(原理上)

方法 可能? 説明
Paramiko を 2.9+ にアップグレード 今回の完全解決策
key_file 形式で渡す Composer では鍵配置がやや面倒
RSA 鍵を SHA2 対応形式へ変換 問題は鍵ではなくクライアント側
SFTP サーバーに設定変更を依頼 外部企業のため不可能

最後に:今回の教訓

  • 「Airflow だけ接続できない」→ Paramiko のバージョンをまず疑うべし

  • Airflow のログは認証まわりが不親切で根本原因が見えづらい

  • Composer は内部ライブラリが固定なので、移行時に“バージョン差事故”が起きやすい

  • 結局のところ、問題の 9 割は Airflow が使っているライブラリのバージョン差

数日単位で調査し、無数のテストを書き、ようやく原因に辿り着いたので、この記事が誰かの時間を 30 分でも節約できたら嬉しいです。

 

明日12/14の記事はインフラエンジニア森田さんです。お楽しみに。

ローコードAIツールDifyをエンジニアが使ったら?コードブロックでハマった7つの落とし穴

こんにちは、AIテクノロジーグループの辻埜です。 本記事は Enigmo Advent Calendar 2025 の12日目の記事です。

普段はデータサイエンティストとして機械学習を用いたシステムの開発運用や、社内のAI活用推進を担当しています。

近年、生成AIの活用が進む中で、エニグモでも社内のAI活用を推進するため、Difyという生成AIアプリ開発ツールを活用した取り組みを行っています。Difyは非エンジニアでもAIを組み込んだワークフローを簡単に構築できるツールです。

dify.ai

社内での導入初期に、使い勝手はどうか?どんな場面で有用か?を調査するため自分でも使ってみたところ、いくつかの課題に直面しました。

この記事では、実際にDifyを使ってワークフローを構築するにあたって苦労した点についてご紹介します。Difyを導入検討している方や、すでに使用している方の参考になれば幸いです。

なお、記事内でDifyのセキュリティ機能の変更について触れていますが、今回の用途としては完全に社内の一部ユーザーに閉じた環境での使用だったため、変更内容に問題がないと判断した上で実施しています。 外部に公開する場合や、不特定多数の利用者によって使用される場合は、セキュリティには十分ご注意ください。

前提(使用した環境)

  • Difyのバージョン: 1.4.1
  • 利用形態: セルフホスト版
  • 実行環境: Compute Engine(GCP)

Difyでやったこと

今回構築したのは、スプレッドシートから情報を読み込んで、外部APIで取得した情報と結合して、新しいスプレッドシートに出力するというワークフローです。

Difyではブロックという単位で機能を繋げていき、ワークフローを構築していきます。ブロックには様々な種類があり、IF/ELSE処理やLLMの呼び出し、RAGの実装までGUI上で簡単に構築ができます。
最初はそれらのブロックを組み合わせてワークフローを構築していたのですが、データをあれこれ変換しようとするとだんだんと標準ブロックだけでは対応が難しくなっていきました。

そんな私のようなわがままなケースに対応するため、Difyでは「コードブロック」を使用することで、Pythonコードを実行して自由に処理をすることができます。普段コードを書いている身からすると、やりたいことをささっと記述して実現できるのでとても便利な機能でした。

コードブロックの落とし穴

少々複雑な処理もコードを使ってしまえば簡単にかけてしまうためとても便利なのですが、使っているとなかなか思うように使えず苦労するケースがいくつかでてきました。

1. エディタの機能が限定的

Difyのコードブロックは、ブラウザ上で動作する簡易的なエディタで、簡単なシンタックスハイライトはあるものの、近年のエディタに搭載されているような各種機能は搭載されていません。

そのため、普段使っているエディタのショートカット機能などが使えない他、最近流行りのAIエディタのようなコード補完機能も使うことができませんでした。 どうしてもAIの力を借りたい時にはローカルのエディタでコードを書いてから、Difyにコピー&ペーストするという手間が発生しました。

2. 外部ライブラリを使う方法が難しい

さらに使っていくと、外部ライブラリを使いたい時に簡単に導入することができないことに気づきました。 コードブロックではデフォルトでは実行環境に事前にインストールされているライブラリしか使用できず、使いたい外部ライブラリがある場合は自分で一手間加えて導入する必要があります。

ライブラリの指定

外部ライブラリを使うには、まずリポジトリ内の./docker/volumes/sandbox/dependenciesにあるpython-requirements.txtにライブラリの追加をする必要があります。 書き方は通常のPythonrequirements.txtと同じです。

pandas==2.3.3
gspread==6.2.1

システムコールの許可

python-requirements.txtの変更が完了したら、次にシステムコールを許可する設定を行う必要があります。 Difyではセキュリティのため、デフォルトでは使用できるシステムコールが制限されています。外部ライブラリを使おうとするとシステムコールが呼び出せずエラーが発生するケースがあり、その際に対応が必要になります。

許可するシステムコールは、docker/volumes/sandbox/conf/にあるconfig.yamlの中で設定が可能です。 以下のようにallowed_syscallsに許可するシステムコールを追加することで、システムコールを使用できるようになります。

allowed_syscalls: [0, 1, 2, 3, 4, 5, ..., 336]

外部ライブラリを使うハードルが想像以上に高く、この時点ですでに通常では想定されていない使い方をしてしまっているんだろうなと感じました。

3. コードブロックで外部へのAPIアクセスができない

上記の設定で外部ライブラリは使用できるようになったものの、次は外部APIを呼び出す処理でネットワークエラーが発生しました。

Difyでは、コードブロックが実行される際には、外部と隔離された専用のSandbox環境(実体はコンテナ)内で実行されるため、安全に開発を進めることができます。エラー発生の状況から原因はそこが怪しいと推測し、調査を進めました。

./docker/docker-compose.yamlをみてみると、サンドボックスコンテナがssrf_proxy_networkという名前の専用のネットワークを使用していることがわかりました。 また、プロキシの情報を環境変数HTTP_PROXYHTTPS_PROXY)で指定していることがわかりました。

# (一部抜粋)
  sandbox:
    environment:
      HTTP_PROXY: http://ssrf_proxy:3128
      HTTPS_PROXY: http://ssrf_proxy:3128
    networks:
      - ssrf_proxy_network

最終的に、他のコンテナで使われているdefaultというネットワークを追加し、プロキシが使われないようHTTP_PROXYHTTPS_PROXYコメントアウトすることで、外部APIを呼び出すことができるようになりました。

# (一部抜粋)
  sandbox:
    environment:
      # HTTP_PROXY: http://ssrf_proxy:3128
      # HTTPS_PROXY: http://ssrf_proxy:3128
    networks:
      - ssrf_proxy_network
      - default  # 追加

4. テストが書けない

通常のソフトウェア開発では、単体テストや統合テストを書くことで、コードの品質を担保し、リファクタリングや機能追加を安全に行うことができます。一方でDifyのワークフローは、GUI上でブロックを配置して接続する形で構築するため、テストコードを記述できず、テストフレームワークを使って自動テストを実行することができませんでした。

一応ブロック単位で実行する機能があるため、一部の処理はその機能を活用して入出力を確認しました。しかし、入力がリスト形式の場合はうまくデータが渡せなかったり(本当はやり方があるのかもしれません)、前のブロックの入力が複雑な場合はそれを用意するのも大変なため、なかなか思うようにテストができませんでした。

最終的には実際にワークフローを実行して結果を目視で確認するしかなく、本当に自分の想定する挙動が実現できているのか、バグが仕込まれていないか、いつも以上に気を張って開発する必要がありました。

5. デバッグが難しい

さらに苦しかったのが、エラーが出た時のデバッグの難しさです。

簡単なエラーであればエラーメッセージから問題の内容を読み取ることができすぐに解消できるのですが、使っているライブラリの中でエラーが出た場合や実行環境の問題でエラーが出た場合などは、エラーメッセージからは原因が読み取れず、printデバッグ等もできなかったため、原因を特定するのにとても時間がかかりました。

具体的には、ワークフローがtimeoutで止まってしまうケースなどがありました。最終的にはロードバランサータイムアウト設定が原因だったのですが、コードのどこで止まっているのか、なぜ止まっているのかもわからず、ログにも何も出力されないので、結局根本原因の特定までに1ヶ月近くを要しました。

6. ブロック間でのデータ受け渡しが難しい

Difyのワークフローではブロック間でデータを受け渡すことができますが、データの受け渡し方法が独特でした。

コードブロックの出力としてはいくつか型を指定することができ、StringNumberArray[Number]など基本的な型は使用できるようになっています。しかし、複雑な型を扱いたい場合には、Objectという型を指定して、Pythonの辞書型に変換して受け渡す必要があります。

私の場合はPandasのDataFrameをコードブロック間で受け渡したかったのですが、これをObjectとして受け渡す必要があり、毎回DataFrame型からdict型に変換しては戻すという余分な処理を入れなければいけませんでした。

さらに、ブロック間で受け渡しができるデータサイズや文字数、オブジェクトのネストの深さなどにも制約があり、これらを超える場合はエラーが発生してしまいます。
一部については上述のdocker-compose.yamlファイルや./docker/.envファイルなどをいじることで対応できるものもありますが、設定方法についてはドキュメントにも記載がなく、ソースコードを読んだりリポジトリのissueをあさって調べる必要がありました。

7. バージョン管理ができない

通常のソフトウェア開発では、Gitなどのバージョン管理システムを使用して、コードの変更履歴を管理します。

しかし、Difyのワークフローは、GUI上で構築されるため、Gitで直接管理することができませんでした。 Difyにも変更履歴機能があるのですが、一度セッションが切れてしまうと履歴が失われてしまうため、変更履歴をきちんと管理するには不十分でした。

対応策として、ワークフローのエクスポート機能を活用しました。定義したワークフローはDSL形式で出力できるため、出力されたファイルをGitで管理することで擬似的にGitでのバージョン管理を実現しました。 毎回手動で行う必要があったり履歴の確認や復元に手間がかかるため、完全な再現とまでは行きませんが、少なくとも変更履歴を管理できたので大きく困ることはありませんでした。

さいごに

この記事では、エンジニアの視点からDifyを使用した際に苦しんだ点についてご紹介しました。

データを加工するなどある程度複雑な処理を行う場合には、純粋にコードを書いてシステムとして構築する方が良いと感じました。 おそらく本来はコードブロックを多用するような使い方ではなく、基本的にはすでに用意されているブロックを組み合わせて使うような使い方を想定されているため、情報が少なかったり設定がしづらかったりするのだと思います。

一方で、社内では非エンジニアの方がDifyを使いこなしてチャットボットを作り込んでいる事例もあり、GUIで簡単に生成AIを組み込んだワークフローが構築できるという点では非常に革新的なツールだと感じています。

利用者のニーズや用途に合わせて適切な場面で活用していきたいですね。

明日の記事の担当はエンジニアの小松さんです。お楽しみに!

参考文献


株式会社エニグモ すべての求人一覧 hrmos.co

BigQueryのデータをAIでフィルタリング!手動→自動化→コスト最適化の3ステップ

こんにちは、AIテクノロジーグループの竹田です。
本記事は Enigmo Advent Calendar 2025 の11日目の記事です。

本稿では、BigQueryで抽出したデータに対して「金額に関する記述が含まれているか」をAIで判定する方法を、段階的に進化させながら紹介します。

この記事を書いた背景

私は元々検索システムの運用保守やMLOpsのOps周りを担当していました。
しかし、ここ最近は生成AIが実用的なツールとして利用できるようになり、業務でもAIを活用した対応が急増しています。

そんな中で直面したのが、「BigQueryで抽出した大量のテキストデータに対して、AIで判定処理を行いたい」というニーズです。
最初は手動で試し、次第に自動化・効率化を進めていく中で、いくつかの実装パターンが見えてきました。

本記事では、その試行錯誤の過程を「段階的な進化」として整理し、それぞれのアプローチのメリット・デメリットを共有します。
なお、本稿では「金額に関する記述の判定」を例として取り上げていますが、この手法は他の様々な判定タスクにも応用可能です。
同じような課題に直面している方の参考になれば幸いです。

やりたいこと

アンケートやレビューデータなど、テキストデータの中から「具体的な金額や価格に関する言及があるもの」だけを抽出したい!というシチュエーションを想定します。

例えば:
- 「この製品の価格は10万円ですか?」 → Yes(金額の言及あり)
- 「見た目の高級感に対する満足度は?」 → No(金額の言及なし)
- 「製品の質感に対するニュアンスで高い評価はあるか?」 → No(金額の言及なし)

こういった判定を、ルールベースだけでは難しいケースもあるので、AIの力を借りてやってみます。


アプローチ1: BigQueryコンソール → Spreadsheet → Gemini(手動)

まずは一番シンプルな方法から。BigQueryでデータを抽出して、Googleスプレッドシートに保存し、Geminiを使って判定させる方法です。

Step 1: BigQueryでデータを抽出

BigQueryコンソールで以下のようなクエリを実行します。

SELECT
    t.original_text
FROM
    (
        SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
        SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
        SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
    ) t;

BigQueryコンソールでのクエリ実行

実行したら、「Save results」からスプレッドシートに保存します。

Step 2: スプレッドシートでGeminiを使う

スプレッドシートに保存したら、右側のGeminiパネルを開いて、以下のようなプロンプトを投げます。

A列の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみ回答してください。

スプレッドシートでのGemini判定

Geminiが各行を判定してくれて、B列に結果が入ります。

この方法の課題

  • 手動作業が多い:毎回クエリ実行→保存→Gemini実行という手順が必要
  • 自動化が困難:定期的に実行したい場合、かなり面倒
  • スケールしない:データ量が増えると手作業では限界がある

ということで、次のステップに進みます!


アプローチ2: BigQuery ML(BQ ML)で自動化

BigQuery MLを使えば、BigQueryの中から直接Geminiを呼び出せます。これで自動化の道が開けます!

実装スクリプト全体

以下のスクリプトで一気にセットアップできます。

実行前の注意事項
 ・このスクリプトは、GCPリソースの作成やIAM権限の変更を行います。
 ・必ずご自身の責任の範囲内で実行してください。
 ・スクリプトは検証済みですが、GCPプロジェクトの設定や権限状況により失敗する可能性があります。

前提条件:
 ・macOS環境(またはLinux環境)で実行可能
 ・gcloud コマンドがインストール済みで、GCPにログイン済みであること
 ・対象のGCPプロジェクトで課金が有効化されていること
 ・サービスアカウントへのIAMロール付与など、プロジェクトに対する十分な権限を持っていること
 ・bqコマンド、jqコマンドがインストール済みであること

実行前の準備:
 ・スクリプト内のPROJECT_ID="your_project_id"を、ご自身が管理するGCPプロジェクトIDに変更してください
 ・必要に応じて、CONNECTION_REGIONMODEL_DATASET_IDなどの変数も環境に合わせて調整してください
 ・エラーが発生した場合は、エラーメッセージを確認し、必要な権限やリソースが不足していないか確認してください

作成されるリソース:
 ・BigQueryデータセットllm_dataset
 ・BigQuery Connection(llm_connection_for_filtering
 ・BigQueryリモートモデル(gemini_flash
 ・IAMロール付与(BigQuery ConnectionのサービスアカウントにVertex AI User権限)

#!/bin/bash

export PROJECT_ID="your_project_id"
export CONNECTION_REGION="US"
export CONNECTION_NAME="llm_connection_for_filtering"
export MODEL_DATASET_ID="llm_dataset"
export MODEL_NAME="gemini_flash"

echo "1. 必要なAPIを有効化します..."
gcloud services enable \
    aiplatform.googleapis.com \
    bigquery.googleapis.com \
    bigqueryconnection.googleapis.com \
    --project=${PROJECT_ID}

# データセットを作成
echo "2. BigQuery データセットを作成します..."
bq show --dataset ${PROJECT_ID}:${MODEL_DATASET_ID} &>/dev/null || \
bq mk --dataset --location=${CONNECTION_REGION} ${PROJECT_ID}:${MODEL_DATASET_ID}

# 接続を作成
echo "3. BigQuery接続 (Connection) を作成します..."
bq mk --connection \
    --connection_type=CLOUD_RESOURCE \
    --project_id="${PROJECT_ID}" \
    --location="${CONNECTION_REGION}" \
    "${CONNECTION_NAME}"

# サービスアカウントIDを取得
echo "4. 接続のサービスアカウントIDを取得します..."
SERVICE_ACCOUNT_ID=$(bq show \
    --connection \
    --location="${CONNECTION_REGION}" \
    --format=json "${PROJECT_ID}".${CONNECTION_REGION}."${CONNECTION_NAME}" 2>/dev/null| jq -r '.cloudResource.serviceAccountId')

echo "取得したサービスアカウントID: ${SERVICE_ACCOUNT_ID}"

# サービスアカウントにVertex AI Userロールを付与
echo "5. IAMロール (roles/aiplatform.user) を付与します..."
gcloud projects get-iam-policy ${PROJECT_ID} \
  --flatten="bindings[].members" \
  --filter="bindings.role:roles/aiplatform.user AND bindings.members:${SERVICE_ACCOUNT_ID}" \
  --format="value(bindings.role)" 2>&1 | grep -q "roles/aiplatform.user" >/dev/null 2>&1
if [ $? = 0 ]; then
  echo "roles/aiplatform.userは付与済みです。"
else
  gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
      --member="serviceAccount:${SERVICE_ACCOUNT_ID}" \
      --role="roles/aiplatform.user" --quiet
fi

echo "6. リモートモデルを定義します..."
cat > remote_model_def.sql <<EOF
CREATE OR REPLACE MODEL \`${PROJECT_ID}.${MODEL_DATASET_ID}.${MODEL_NAME}\`
REMOTE WITH CONNECTION \`${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}\`
OPTIONS (
    endpoint = 'gemini-2.5-flash'
);
EOF
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false < remote_model_def.sql

echo "7. ML.GENERATE_TEXTの実行と結果確認..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false --nouse_cache <<EOF
SELECT
    t.original_text,
    JSON_EXTRACT_SCALAR(ml_generate_text_result, '$.candidates[0].content.parts[0].text') AS judgment_result
FROM
    ML.GENERATE_TEXT(
        MODEL \`${PROJECT_ID}.${MODEL_DATASET_ID}.${MODEL_NAME}\`, 
        (
            SELECT
                t.original_text,
                CONCAT(
                    '以下の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみ回答してください。文章: ', 
                    t.original_text
                ) AS prompt
            FROM
                (
                    SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
                    SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
                    SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
                ) AS t
        ),
        STRUCT(0.0 AS temperature, 1000 AS max_output_tokens)
    ) AS t
EOF

ポイント解説

  1. BigQuery Connection の作成

    • BigQueryからVertex AIのGeminiにアクセスするための接続を作成します
    • CLOUD_RESOURCEタイプの接続を使います
  2. IAM権限の設定

    • 作成された接続には専用のサービスアカウントが紐づきます
    • このサービスアカウントにroles/aiplatform.userロールを付与して、Vertex AIを使えるようにします
  3. リモートモデルの定義

    • CREATE MODEL文で、Gemini 2.5 Flashをリモートモデルとして登録します
    • これでBigQueryからGeminiを呼び出せるようになります
  4. ML.GENERATE_TEXTで判定実行

    • ML.GENERATE_TEXT関数を使って、各テキストに対してGeminiで判定を実行します
    • プロンプトはCONCATで動的に生成しています

この方法の利点と課題

  • 利点

    • 完全自動化!スケジュールクエリで定期実行も可能
    • BigQueryの中で完結するので、データの移動が不要
  • 課題

    • 全行でLLMが呼ばれる = コストが高い
    • 「10万円」みたいな明らかなキーワードがある場合も、わざわざLLMを呼んでいる

ということで、さらなる最適化に挑戦します!


アプローチ3: UDF + Cloud Run でコスト最適化

最後は、BigQueryのRemote UDFとCloud Runを組み合わせて、ルールベース判定 → LLM判定の2段階フィルタリングを実装します。

戦略

  1. まず高速なルールベース判定(キーワードマッチ)を実行
  2. キーワードに引っかからなかった場合のみ、LLMで判定
  3. これでLLM呼び出し回数を大幅削減!

実装スクリプト全体

実行前の注意事項
 ・このスクリプトは、Cloud Runのデプロイ、Dockerイメージのビルド、BigQueryリソースの作成、IAM権限の変更など、多くのGCPリソース操作を行います。
 ・必ずご自身の責任の範囲内で実行してください。
 ・スクリプトは検証済みですが、GCPプロジェクトの設定や権限状況により失敗する可能性があります。

前提条件:
 ・macOS環境(またはLinux環境)で実行可能
 ・gcloud コマンドがインストール済みで、GCPにログイン済みであること
 ・対象のGCPプロジェクトで課金が有効化されていること
 ・サービスアカウントへのIAMロール付与、Cloud Runのデプロイなど、プロジェクトに対する強い権限を持っていること
 ・bqコマンド、jqコマンドがインストール済みであること

実行前の準備:
 ・スクリプト内のPROJECT_ID="your_project_id"を、ご自身が管理するGCPプロジェクトIDに変更してください
 ・必要に応じて、リージョンやサービス名などの変数も環境に合わせて調整してください
 ・このスクリプトset -eでエラー時に停止するようになっていますが、途中で失敗した場合は作成済みのリソースが残る可能性があります
 ・エラーが発生した場合は、エラーメッセージを確認し、必要な権限やリソースが不足していないか確認してください

作成されるリソース:
 ・BigQueryデータセットllm_dataset
 ・BigQuery Connection(llm_connection_for_filtering
 ・Artifact Registryリポジトリbq-udf-repo
 ・Cloud Runサービス(bq-udf-processor-final
 ・BigQuery Remote UDF(efficient_price_filter_final
 ・IAMロール付与(BigQuery ConnectionのサービスアカウントにCloud Run Invoker権限、Cloud RunのサービスアカウントにVertex AI User権限)

#!/bin/bash

set -e

export PROJECT_ID="your_project_id"
export CONNECTION_REGION="US"
export CLOUDRUN_REGION="us-central1"
export DATASET_ID="llm_dataset"
export CONNECTION_NAME="llm_connection_for_filtering"
export REPO_NAME="bq-udf-repo"
export SERVICE_NAME="bq-udf-processor-final"
export FUNCTION_NAME="efficient_price_filter_final"

echo "--- 1. 必要なAPIの有効化 ---"
gcloud services enable \
    artifactregistry.googleapis.com \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    aiplatform.googleapis.com \
    bigquery.googleapis.com \
    bigqueryconnection.googleapis.com \
    --project=${PROJECT_ID} --quiet

echo "--- 2. BigQuery データセットの作成 ---"
bq show --dataset ${PROJECT_ID}:${DATASET_ID} &>/dev/null || \
bq mk --dataset --location=${CONNECTION_REGION} ${PROJECT_ID}:${DATASET_ID}

echo "--- 3. Artifact Registryの準備 ---"
gcloud artifacts repositories create ${REPO_NAME} \
    --repository-format=docker \
    --location=${CLOUDRUN_REGION} \
    --project=${PROJECT_ID} || true

echo "--- 4. BQ Connectionの作成とサービスアカウントIDの取得 ---"
CONNECTION_FULL_PATH="${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}"

bq show --connection --location="${CONNECTION_REGION}" "${CONNECTION_FULL_PATH}" &>/dev/null || \
bq mk --connection --connection_type=CLOUD_RESOURCE --project_id="${PROJECT_ID}" --location="${CONNECTION_REGION}" "${CONNECTION_NAME}"

SERVICE_ACCOUNT_ID=$(bq show \
    --connection \
    --location="${CONNECTION_REGION}" \
    --format=json "${PROJECT_ID}".${CONNECTION_REGION}."${CONNECTION_NAME}" 2>/dev/null | jq -r '.cloudResource.serviceAccountId')

if [ -z "$SERVICE_ACCOUNT_ID" ]; then
    echo "エラー: サービスアカウントIDの取得に失敗しました。"
    exit 1
fi
echo "取得されたサービスアカウントID: ${SERVICE_ACCOUNT_ID}"

echo "--- 5. ソースファイルの作成 ---"

cat > main.py <<'EOF'
from flask import Flask, request, jsonify
import os
from google import genai
from google.genai import types

app = Flask(__name__)

PROJECT_ID = os.environ.get('GCP_PROJECT', 'your_project_id')
LLM_REGION = 'us-central1'

llm_client = None
try:
    llm_client = genai.Client(vertexai=True, project=PROJECT_ID, location=LLM_REGION)
except Exception as e:
    print(f"LLM Client Initialization Error: {e}")

def call_llm_for_judgment(text):
    if not llm_client:
        return "ERROR_CLIENT_INIT"

    prompt = f"以下の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみを回答してください。\n文章:{text}"
    try:
        response = llm_client.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt
        )
        return response.text.strip()
    except Exception as e:
        print(f"LLM API Call Failed: {e}")
        return "ERROR_LLM_CALL"

@app.route('/', methods=['POST'])
def handle_bq_udf():
    try:
        data = request.get_json()
        calls = data['calls']
        results = []
        for call in calls:
            input_text = call[0]
            # --- 1. 高速なルールベース判定 ---
            keywords = ['万円', '予算', '価格', '費用', '円', 'ドル']
            if any(k in input_text for k in keywords):
                results.append("Yes")
                continue
            # --- 2. LLMフォールバック判定 ---
            llm_result = call_llm_for_judgment(input_text)
            if llm_result.strip().upper() == "YES":
                 results.append("Yes")
            else:
                 results.append("No")
        return jsonify({"replies": results})
    except Exception as e:
        return jsonify({"errorMessage": str(e)}), 400

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 8080))
    app.run(host='0.0.0.0', port=port)
EOF

echo "flask" > requirements.txt
echo "google-genai" >> requirements.txt

cat > Dockerfile <<'EOF'
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
EXPOSE 8080
CMD ["python", "main.py"]
EOF

echo "--- 6. イメージのビルドとCloud Runへのデプロイ ---"
export IMAGE_URI="${CLOUDRUN_REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${SERVICE_NAME}:latest"

gcloud builds submit --tag ${IMAGE_URI} --project=${PROJECT_ID} --quiet

gcloud run deploy ${SERVICE_NAME} \
    --image ${IMAGE_URI} \
    --region ${CLOUDRUN_REGION} \
    --platform managed \
    --no-allow-unauthenticated \
    --project=${PROJECT_ID} \
    --quiet

SERVICE_URL=$(gcloud run services describe ${SERVICE_NAME} --region ${CLOUDRUN_REGION} --project=${PROJECT_ID} | grep ^URL: | awk '{print $2}')
echo "デプロイされたサービスURL: ${SERVICE_URL}"

echo "--- 7. IAM権限付与 ---"

# set -e の影響を一時的に無効化してチェック
set +e
INVOKER_CHECK=$(gcloud run services get-iam-policy ${SERVICE_NAME} \
  --project=${PROJECT_ID} \
  --region=${CLOUDRUN_REGION} \
  --format="value(bindings.role, bindings.members)" \
  | grep "roles/run.invoker" | grep "${SERVICE_ACCOUNT_ID}")
CHECK_RESULT=$?
set -e

if [ $CHECK_RESULT = 0 ]; then
  echo "roles/run.invokerは付与済みです。"
else
  echo "roles/run.invokerを付与します..."
  gcloud run services add-iam-policy-binding ${SERVICE_NAME} \
      --member="serviceAccount:${SERVICE_ACCOUNT_ID}" \
      --role="roles/run.invoker" \
      --region ${CLOUDRUN_REGION} \
      --project=${PROJECT_ID} --quiet
  echo "roles/run.invoker権限付与後、60秒待機します..."
  sleep 60 
fi

# Cloud RunサービスアカウントにVertex AI権限を付与
echo "Cloud RunサービスアカウントにVertex AI権限を付与します..."
PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
CLOUDRUN_SA="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
    --member="serviceAccount:${CLOUDRUN_SA}" \
    --role="roles/aiplatform.user" \
    --quiet

echo "--- 8. リモート関数の定義と実行 ---"

cat > remote_function_def.sql <<EOF
CREATE OR REPLACE FUNCTION \`${PROJECT_ID}.${DATASET_ID}.${FUNCTION_NAME}\`(input_text STRING)
RETURNS STRING
REMOTE WITH CONNECTION \`${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}\`
OPTIONS (
  endpoint = '${SERVICE_URL}'
);
EOF

echo "リモート関数の定義を実行します..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false < remote_function_def.sql

# UDFの実行と結果確認
echo "UDFの実行と結果確認..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false <<EOF_EXEC
SELECT 
    original_text,
    \`${DATASET_ID}.${FUNCTION_NAME}\`(original_text) AS judgment_result
FROM 
    (
        SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
        SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
        SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
    );
EOF_EXEC

ポイント解説

1. Cloud Runアプリの実装(main.py)

# --- 1. 高速なルールベース判定 ---
keywords = ['万円', '予算', '価格', '費用', '円', 'ドル']
if any(k in input_text for k in keywords):
    results.append("Yes")
    continue  # LLMを呼ばずに次へ
# --- 2. LLMフォールバック判定 ---
llm_result = call_llm_for_judgment(input_text)

この2段階判定がポイントです!
- キーワードに引っかかれば即座に「Yes」を返す(高速・低コスト)
- キーワードがない場合のみLLMで判定(精度重視)

2. BigQuery Remote UDF

CREATE OR REPLACE FUNCTION `project.dataset.function_name`(input_text STRING)
RETURNS STRING
REMOTE WITH CONNECTION `project.region.connection_name`
OPTIONS (
  endpoint = 'https://your-cloud-run-url'
);

BigQueryから外部のCloud Runエンドポイントを呼び出すUDFを定義します。

3. 使い方

SELECT 
    original_text,
    `dataset.function_name`(original_text) AS judgment_result
FROM your_table;

通常のBigQuery関数と同じように使えます!

この方法の利点

  • コスト最適化:明らかなケースはルールベースで処理し、LLM呼び出しを最小化
  • 柔軟性:Cloud Runのコードを変更すれば、判定ロジックを自由にカスタマイズ可能
  • スケーラビリティ:Cloud Runが自動スケールするので、大量データにも対応
  • 保守性:判定ロジックがPythonコードなので、メンテナンスしやすい

まとめ:3つのアプローチの比較

アプローチ コスト 実装難易度 おすすめ用途
1. コンソール→Spreadsheet 少量データの一回限りの分析、プロトタイピング
2. BQ ML 精度重視、コストは気にしない、完全自動化
3. UDF + Cloud Run 最適 本番運用、コスト最適化重視、カスタマイズ性重視

個人的には、最初はアプローチ1で試してみて、定期実行が必要になったらアプローチ2、さらにコストが気になってきたらアプローチ3という段階的な進化がおすすめです!

参考リンク

本記事で紹介した各種コードは、Google Cloud の公式ドキュメントを参考にしています。

感想

今回の実装を通して、外部接続の設定やサービスアカウントへのロール追加など、思ったより設定することが多いなと感じました。特にアプローチ3のUDF + Cloud Runの構成は、初回のセットアップにそれなりの手間がかかります。

ただ、一度作成してしまえば他のユースケースにも流用できるため、非常に便利な機能だと実感しました。今後、BigQuery + ML利用についてはより簡素で柔軟な方法が出てくるかもしれませんが、本記事がみなさまの参考になれば幸いです。

明日の記事は同じAIテクノロジーグループの辻埜さんです。お楽しみに!


株式会社エニグモ すべての求人一覧

hrmos.co