BigQueryのデータをAIでフィルタリング!手動→自動化→コスト最適化の3ステップ

こんにちは、AIテクノロジーグループの竹田です。
本記事は Enigmo Advent Calendar 2025 の11日目の記事です。

本稿では、BigQueryで抽出したデータに対して「金額に関する記述が含まれているか」をAIで判定する方法を、段階的に進化させながら紹介します。

この記事を書いた背景

私は元々検索システムの運用保守やMLOpsのOps周りを担当していました。
しかし、ここ最近は生成AIが実用的なツールとして利用できるようになり、業務でもAIを活用した対応が急増しています。

そんな中で直面したのが、「BigQueryで抽出した大量のテキストデータに対して、AIで判定処理を行いたい」というニーズです。
最初は手動で試し、次第に自動化・効率化を進めていく中で、いくつかの実装パターンが見えてきました。

本記事では、その試行錯誤の過程を「段階的な進化」として整理し、それぞれのアプローチのメリット・デメリットを共有します。
なお、本稿では「金額に関する記述の判定」を例として取り上げていますが、この手法は他の様々な判定タスクにも応用可能です。
同じような課題に直面している方の参考になれば幸いです。

やりたいこと

アンケートやレビューデータなど、テキストデータの中から「具体的な金額や価格に関する言及があるもの」だけを抽出したい!というシチュエーションを想定します。

例えば:
- 「この製品の価格は10万円ですか?」 → Yes(金額の言及あり)
- 「見た目の高級感に対する満足度は?」 → No(金額の言及なし)
- 「製品の質感に対するニュアンスで高い評価はあるか?」 → No(金額の言及なし)

こういった判定を、ルールベースだけでは難しいケースもあるので、AIの力を借りてやってみます。


アプローチ1: BigQueryコンソール → Spreadsheet → Gemini(手動)

まずは一番シンプルな方法から。BigQueryでデータを抽出して、Googleスプレッドシートに保存し、Geminiを使って判定させる方法です。

Step 1: BigQueryでデータを抽出

BigQueryコンソールで以下のようなクエリを実行します。

SELECT
    t.original_text
FROM
    (
        SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
        SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
        SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
    ) t;

BigQueryコンソールでのクエリ実行

実行したら、「Save results」からスプレッドシートに保存します。

Step 2: スプレッドシートでGeminiを使う

スプレッドシートに保存したら、右側のGeminiパネルを開いて、以下のようなプロンプトを投げます。

A列の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみ回答してください。

スプレッドシートでのGemini判定

Geminiが各行を判定してくれて、B列に結果が入ります。

この方法の課題

  • 手動作業が多い:毎回クエリ実行→保存→Gemini実行という手順が必要
  • 自動化が困難:定期的に実行したい場合、かなり面倒
  • スケールしない:データ量が増えると手作業では限界がある

ということで、次のステップに進みます!


アプローチ2: BigQuery ML(BQ ML)で自動化

BigQuery MLを使えば、BigQueryの中から直接Geminiを呼び出せます。これで自動化の道が開けます!

実装スクリプト全体

以下のスクリプトで一気にセットアップできます。

実行前の注意事項
 ・このスクリプトは、GCPリソースの作成やIAM権限の変更を行います。
 ・必ずご自身の責任の範囲内で実行してください。
 ・スクリプトは検証済みですが、GCPプロジェクトの設定や権限状況により失敗する可能性があります。

前提条件:
 ・macOS環境(またはLinux環境)で実行可能
 ・gcloud コマンドがインストール済みで、GCPにログイン済みであること
 ・対象のGCPプロジェクトで課金が有効化されていること
 ・サービスアカウントへのIAMロール付与など、プロジェクトに対する十分な権限を持っていること
 ・bqコマンド、jqコマンドがインストール済みであること

実行前の準備:
 ・スクリプト内のPROJECT_ID="your_project_id"を、ご自身が管理するGCPプロジェクトIDに変更してください
 ・必要に応じて、CONNECTION_REGIONMODEL_DATASET_IDなどの変数も環境に合わせて調整してください
 ・エラーが発生した場合は、エラーメッセージを確認し、必要な権限やリソースが不足していないか確認してください

作成されるリソース:
 ・BigQueryデータセットllm_dataset
 ・BigQuery Connection(llm_connection_for_filtering
 ・BigQueryリモートモデル(gemini_flash
 ・IAMロール付与(BigQuery ConnectionのサービスアカウントにVertex AI User権限)

#!/bin/bash

export PROJECT_ID="your_project_id"
export CONNECTION_REGION="US"
export CONNECTION_NAME="llm_connection_for_filtering"
export MODEL_DATASET_ID="llm_dataset"
export MODEL_NAME="gemini_flash"

echo "1. 必要なAPIを有効化します..."
gcloud services enable \
    aiplatform.googleapis.com \
    bigquery.googleapis.com \
    bigqueryconnection.googleapis.com \
    --project=${PROJECT_ID}

# データセットを作成
echo "2. BigQuery データセットを作成します..."
bq show --dataset ${PROJECT_ID}:${MODEL_DATASET_ID} &>/dev/null || \
bq mk --dataset --location=${CONNECTION_REGION} ${PROJECT_ID}:${MODEL_DATASET_ID}

# 接続を作成
echo "3. BigQuery接続 (Connection) を作成します..."
bq mk --connection \
    --connection_type=CLOUD_RESOURCE \
    --project_id="${PROJECT_ID}" \
    --location="${CONNECTION_REGION}" \
    "${CONNECTION_NAME}"

# サービスアカウントIDを取得
echo "4. 接続のサービスアカウントIDを取得します..."
SERVICE_ACCOUNT_ID=$(bq show \
    --connection \
    --location="${CONNECTION_REGION}" \
    --format=json "${PROJECT_ID}".${CONNECTION_REGION}."${CONNECTION_NAME}" 2>/dev/null| jq -r '.cloudResource.serviceAccountId')

echo "取得したサービスアカウントID: ${SERVICE_ACCOUNT_ID}"

# サービスアカウントにVertex AI Userロールを付与
echo "5. IAMロール (roles/aiplatform.user) を付与します..."
gcloud projects get-iam-policy ${PROJECT_ID} \
  --flatten="bindings[].members" \
  --filter="bindings.role:roles/aiplatform.user AND bindings.members:${SERVICE_ACCOUNT_ID}" \
  --format="value(bindings.role)" 2>&1 | grep -q "roles/aiplatform.user" >/dev/null 2>&1
if [ $? = 0 ]; then
  echo "roles/aiplatform.userは付与済みです。"
else
  gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
      --member="serviceAccount:${SERVICE_ACCOUNT_ID}" \
      --role="roles/aiplatform.user" --quiet
fi

echo "6. リモートモデルを定義します..."
cat > remote_model_def.sql <<EOF
CREATE OR REPLACE MODEL \`${PROJECT_ID}.${MODEL_DATASET_ID}.${MODEL_NAME}\`
REMOTE WITH CONNECTION \`${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}\`
OPTIONS (
    endpoint = 'gemini-2.5-flash'
);
EOF
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false < remote_model_def.sql

echo "7. ML.GENERATE_TEXTの実行と結果確認..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false --nouse_cache <<EOF
SELECT
    t.original_text,
    JSON_EXTRACT_SCALAR(ml_generate_text_result, '$.candidates[0].content.parts[0].text') AS judgment_result
FROM
    ML.GENERATE_TEXT(
        MODEL \`${PROJECT_ID}.${MODEL_DATASET_ID}.${MODEL_NAME}\`, 
        (
            SELECT
                t.original_text,
                CONCAT(
                    '以下の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみ回答してください。文章: ', 
                    t.original_text
                ) AS prompt
            FROM
                (
                    SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
                    SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
                    SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
                ) AS t
        ),
        STRUCT(0.0 AS temperature, 1000 AS max_output_tokens)
    ) AS t
EOF

ポイント解説

  1. BigQuery Connection の作成

    • BigQueryからVertex AIのGeminiにアクセスするための接続を作成します
    • CLOUD_RESOURCEタイプの接続を使います
  2. IAM権限の設定

    • 作成された接続には専用のサービスアカウントが紐づきます
    • このサービスアカウントにroles/aiplatform.userロールを付与して、Vertex AIを使えるようにします
  3. リモートモデルの定義

    • CREATE MODEL文で、Gemini 2.5 Flashをリモートモデルとして登録します
    • これでBigQueryからGeminiを呼び出せるようになります
  4. ML.GENERATE_TEXTで判定実行

    • ML.GENERATE_TEXT関数を使って、各テキストに対してGeminiで判定を実行します
    • プロンプトはCONCATで動的に生成しています

この方法の利点と課題

  • 利点

    • 完全自動化!スケジュールクエリで定期実行も可能
    • BigQueryの中で完結するので、データの移動が不要
  • 課題

    • 全行でLLMが呼ばれる = コストが高い
    • 「10万円」みたいな明らかなキーワードがある場合も、わざわざLLMを呼んでいる

ということで、さらなる最適化に挑戦します!


アプローチ3: UDF + Cloud Run でコスト最適化

最後は、BigQueryのRemote UDFとCloud Runを組み合わせて、ルールベース判定 → LLM判定の2段階フィルタリングを実装します。

戦略

  1. まず高速なルールベース判定(キーワードマッチ)を実行
  2. キーワードに引っかからなかった場合のみ、LLMで判定
  3. これでLLM呼び出し回数を大幅削減!

実装スクリプト全体

実行前の注意事項
 ・このスクリプトは、Cloud Runのデプロイ、Dockerイメージのビルド、BigQueryリソースの作成、IAM権限の変更など、多くのGCPリソース操作を行います。
 ・必ずご自身の責任の範囲内で実行してください。
 ・スクリプトは検証済みですが、GCPプロジェクトの設定や権限状況により失敗する可能性があります。

前提条件:
 ・macOS環境(またはLinux環境)で実行可能
 ・gcloud コマンドがインストール済みで、GCPにログイン済みであること
 ・対象のGCPプロジェクトで課金が有効化されていること
 ・サービスアカウントへのIAMロール付与、Cloud Runのデプロイなど、プロジェクトに対する強い権限を持っていること
 ・bqコマンド、jqコマンドがインストール済みであること

実行前の準備:
 ・スクリプト内のPROJECT_ID="your_project_id"を、ご自身が管理するGCPプロジェクトIDに変更してください
 ・必要に応じて、リージョンやサービス名などの変数も環境に合わせて調整してください
 ・このスクリプトset -eでエラー時に停止するようになっていますが、途中で失敗した場合は作成済みのリソースが残る可能性があります
 ・エラーが発生した場合は、エラーメッセージを確認し、必要な権限やリソースが不足していないか確認してください

作成されるリソース:
 ・BigQueryデータセットllm_dataset
 ・BigQuery Connection(llm_connection_for_filtering
 ・Artifact Registryリポジトリbq-udf-repo
 ・Cloud Runサービス(bq-udf-processor-final
 ・BigQuery Remote UDF(efficient_price_filter_final
 ・IAMロール付与(BigQuery ConnectionのサービスアカウントにCloud Run Invoker権限、Cloud RunのサービスアカウントにVertex AI User権限)

#!/bin/bash

set -e

export PROJECT_ID="your_project_id"
export CONNECTION_REGION="US"
export CLOUDRUN_REGION="us-central1"
export DATASET_ID="llm_dataset"
export CONNECTION_NAME="llm_connection_for_filtering"
export REPO_NAME="bq-udf-repo"
export SERVICE_NAME="bq-udf-processor-final"
export FUNCTION_NAME="efficient_price_filter_final"

echo "--- 1. 必要なAPIの有効化 ---"
gcloud services enable \
    artifactregistry.googleapis.com \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    aiplatform.googleapis.com \
    bigquery.googleapis.com \
    bigqueryconnection.googleapis.com \
    --project=${PROJECT_ID} --quiet

echo "--- 2. BigQuery データセットの作成 ---"
bq show --dataset ${PROJECT_ID}:${DATASET_ID} &>/dev/null || \
bq mk --dataset --location=${CONNECTION_REGION} ${PROJECT_ID}:${DATASET_ID}

echo "--- 3. Artifact Registryの準備 ---"
gcloud artifacts repositories create ${REPO_NAME} \
    --repository-format=docker \
    --location=${CLOUDRUN_REGION} \
    --project=${PROJECT_ID} || true

echo "--- 4. BQ Connectionの作成とサービスアカウントIDの取得 ---"
CONNECTION_FULL_PATH="${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}"

bq show --connection --location="${CONNECTION_REGION}" "${CONNECTION_FULL_PATH}" &>/dev/null || \
bq mk --connection --connection_type=CLOUD_RESOURCE --project_id="${PROJECT_ID}" --location="${CONNECTION_REGION}" "${CONNECTION_NAME}"

SERVICE_ACCOUNT_ID=$(bq show \
    --connection \
    --location="${CONNECTION_REGION}" \
    --format=json "${PROJECT_ID}".${CONNECTION_REGION}."${CONNECTION_NAME}" 2>/dev/null | jq -r '.cloudResource.serviceAccountId')

if [ -z "$SERVICE_ACCOUNT_ID" ]; then
    echo "エラー: サービスアカウントIDの取得に失敗しました。"
    exit 1
fi
echo "取得されたサービスアカウントID: ${SERVICE_ACCOUNT_ID}"

echo "--- 5. ソースファイルの作成 ---"

cat > main.py <<'EOF'
from flask import Flask, request, jsonify
import os
from google import genai
from google.genai import types

app = Flask(__name__)

PROJECT_ID = os.environ.get('GCP_PROJECT', 'your_project_id')
LLM_REGION = 'us-central1'

llm_client = None
try:
    llm_client = genai.Client(vertexai=True, project=PROJECT_ID, location=LLM_REGION)
except Exception as e:
    print(f"LLM Client Initialization Error: {e}")

def call_llm_for_judgment(text):
    if not llm_client:
        return "ERROR_CLIENT_INIT"

    prompt = f"以下の文章に具体的な予算や価格帯を示す言葉が含まれていれば「Yes」、そうでなければ「No」のみを回答してください。\n文章:{text}"
    try:
        response = llm_client.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt
        )
        return response.text.strip()
    except Exception as e:
        print(f"LLM API Call Failed: {e}")
        return "ERROR_LLM_CALL"

@app.route('/', methods=['POST'])
def handle_bq_udf():
    try:
        data = request.get_json()
        calls = data['calls']
        results = []
        for call in calls:
            input_text = call[0]
            # --- 1. 高速なルールベース判定 ---
            keywords = ['万円', '予算', '価格', '費用', '円', 'ドル']
            if any(k in input_text for k in keywords):
                results.append("Yes")
                continue
            # --- 2. LLMフォールバック判定 ---
            llm_result = call_llm_for_judgment(input_text)
            if llm_result.strip().upper() == "YES":
                 results.append("Yes")
            else:
                 results.append("No")
        return jsonify({"replies": results})
    except Exception as e:
        return jsonify({"errorMessage": str(e)}), 400

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 8080))
    app.run(host='0.0.0.0', port=port)
EOF

echo "flask" > requirements.txt
echo "google-genai" >> requirements.txt

cat > Dockerfile <<'EOF'
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
EXPOSE 8080
CMD ["python", "main.py"]
EOF

echo "--- 6. イメージのビルドとCloud Runへのデプロイ ---"
export IMAGE_URI="${CLOUDRUN_REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${SERVICE_NAME}:latest"

gcloud builds submit --tag ${IMAGE_URI} --project=${PROJECT_ID} --quiet

gcloud run deploy ${SERVICE_NAME} \
    --image ${IMAGE_URI} \
    --region ${CLOUDRUN_REGION} \
    --platform managed \
    --no-allow-unauthenticated \
    --project=${PROJECT_ID} \
    --quiet

SERVICE_URL=$(gcloud run services describe ${SERVICE_NAME} --region ${CLOUDRUN_REGION} --project=${PROJECT_ID} | grep ^URL: | awk '{print $2}')
echo "デプロイされたサービスURL: ${SERVICE_URL}"

echo "--- 7. IAM権限付与 ---"

# set -e の影響を一時的に無効化してチェック
set +e
INVOKER_CHECK=$(gcloud run services get-iam-policy ${SERVICE_NAME} \
  --project=${PROJECT_ID} \
  --region=${CLOUDRUN_REGION} \
  --format="value(bindings.role, bindings.members)" \
  | grep "roles/run.invoker" | grep "${SERVICE_ACCOUNT_ID}")
CHECK_RESULT=$?
set -e

if [ $CHECK_RESULT = 0 ]; then
  echo "roles/run.invokerは付与済みです。"
else
  echo "roles/run.invokerを付与します..."
  gcloud run services add-iam-policy-binding ${SERVICE_NAME} \
      --member="serviceAccount:${SERVICE_ACCOUNT_ID}" \
      --role="roles/run.invoker" \
      --region ${CLOUDRUN_REGION} \
      --project=${PROJECT_ID} --quiet
  echo "roles/run.invoker権限付与後、60秒待機します..."
  sleep 60 
fi

# Cloud RunサービスアカウントにVertex AI権限を付与
echo "Cloud RunサービスアカウントにVertex AI権限を付与します..."
PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
CLOUDRUN_SA="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
    --member="serviceAccount:${CLOUDRUN_SA}" \
    --role="roles/aiplatform.user" \
    --quiet

echo "--- 8. リモート関数の定義と実行 ---"

cat > remote_function_def.sql <<EOF
CREATE OR REPLACE FUNCTION \`${PROJECT_ID}.${DATASET_ID}.${FUNCTION_NAME}\`(input_text STRING)
RETURNS STRING
REMOTE WITH CONNECTION \`${PROJECT_ID}.${CONNECTION_REGION}.${CONNECTION_NAME}\`
OPTIONS (
  endpoint = '${SERVICE_URL}'
);
EOF

echo "リモート関数の定義を実行します..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false < remote_function_def.sql

# UDFの実行と結果確認
echo "UDFの実行と結果確認..."
bq query --project_id=${PROJECT_ID} --use_legacy_sql=false <<EOF_EXEC
SELECT 
    original_text,
    \`${DATASET_ID}.${FUNCTION_NAME}\`(original_text) AS judgment_result
FROM 
    (
        SELECT 'この製品の価格は10万円ですか?' AS original_text UNION ALL
        SELECT '見た目の高級感に対する満足度は?' AS original_text UNION ALL
        SELECT '製品の質感に対するニュアンスで高い評価はあるか?' AS original_text
    );
EOF_EXEC

ポイント解説

1. Cloud Runアプリの実装(main.py)

# --- 1. 高速なルールベース判定 ---
keywords = ['万円', '予算', '価格', '費用', '円', 'ドル']
if any(k in input_text for k in keywords):
    results.append("Yes")
    continue  # LLMを呼ばずに次へ
# --- 2. LLMフォールバック判定 ---
llm_result = call_llm_for_judgment(input_text)

この2段階判定がポイントです!
- キーワードに引っかかれば即座に「Yes」を返す(高速・低コスト)
- キーワードがない場合のみLLMで判定(精度重視)

2. BigQuery Remote UDF

CREATE OR REPLACE FUNCTION `project.dataset.function_name`(input_text STRING)
RETURNS STRING
REMOTE WITH CONNECTION `project.region.connection_name`
OPTIONS (
  endpoint = 'https://your-cloud-run-url'
);

BigQueryから外部のCloud Runエンドポイントを呼び出すUDFを定義します。

3. 使い方

SELECT 
    original_text,
    `dataset.function_name`(original_text) AS judgment_result
FROM your_table;

通常のBigQuery関数と同じように使えます!

この方法の利点

  • コスト最適化:明らかなケースはルールベースで処理し、LLM呼び出しを最小化
  • 柔軟性:Cloud Runのコードを変更すれば、判定ロジックを自由にカスタマイズ可能
  • スケーラビリティ:Cloud Runが自動スケールするので、大量データにも対応
  • 保守性:判定ロジックがPythonコードなので、メンテナンスしやすい

まとめ:3つのアプローチの比較

アプローチ コスト 実装難易度 おすすめ用途
1. コンソール→Spreadsheet 少量データの一回限りの分析、プロトタイピング
2. BQ ML 精度重視、コストは気にしない、完全自動化
3. UDF + Cloud Run 最適 本番運用、コスト最適化重視、カスタマイズ性重視

個人的には、最初はアプローチ1で試してみて、定期実行が必要になったらアプローチ2、さらにコストが気になってきたらアプローチ3という段階的な進化がおすすめです!

参考リンク

本記事で紹介した各種コードは、Google Cloud の公式ドキュメントを参考にしています。

感想

今回の実装を通して、外部接続の設定やサービスアカウントへのロール追加など、思ったより設定することが多いなと感じました。特にアプローチ3のUDF + Cloud Runの構成は、初回のセットアップにそれなりの手間がかかります。

ただ、一度作成してしまえば他のユースケースにも流用できるため、非常に便利な機能だと実感しました。今後、BigQuery + ML利用についてはより簡素で柔軟な方法が出てくるかもしれませんが、本記事がみなさまの参考になれば幸いです。

明日の記事は同じAIテクノロジーグループの辻埜さんです。お楽しみに!


株式会社エニグモ すべての求人一覧

hrmos.co