コスト削減と精度維持を両立!類似画像検索システムの内製化成功事例(システム編)

こんにちは、データサイエンティストの髙橋です。業務では企画/分析/機械学習モデル作成/プロダクション向けの実装/効果検証を一貫して行っています。

この記事では類似画像検索システムの内製化にあたり、システム面での課題をどのように解決したかについて紹介します。内製化の背景や機械学習部分などについては以前作成した記事で説明しており、この記事はその続きとなります。

この記事は Enigmo Advent Calendar 2024 の 5 日目の記事です。

内製化の目的・事業インパク

内製化の目的は、弊社が運営する CtoC EC サイト BUYMA において、商品名寄せで利用している他社製の類似画像検索システムの精度を保ちながらコストを削減することでした。既に内製化後のシステムに移行しており、同等の精度を維持しつつ年間数百万円規模(約8〜9割)のコスト削減を見込んでいます。

また、名寄せ以外にも類似画像検索システムを利用している施策があり、その移行も進めることでさらなるコスト削減の可能性があります。さらに、他社製の類似画像検索システムではコストが高く断念していた EC サイト上での画像起点のレコメンドや、画像による類似商品検索機能なども検討出来るようになりました。

より詳細な説明については前編である機械学習編の記事を参照ください。

類似画像検索のシステム化における課題

類似画像検索のシステム化における課題として、非常に大規模なデータを現実的な時間で処理する必要がありました。機械学習編の記事に記載した類似画像検索の各ステップに対して、毎月処理するデータ量は以下の通りでした。

ステップ 毎月処理するデータ量
商品画像をダウンロードし、物体存在箇所をセグメンテーションして切り抜き 約 200 万画像
切り抜いた画像の Embedding 計算 約 1000 ∼ 2000 万画像
切り抜いた画像・Embedding ファイルを GCS (Google Cloud Storage) にアップロード 約 2000 ∼ 4000 万ファイル
Embedding による画像同士の類似度計算 数十〜数千億の組合せ
画像ハッシュによる画像同士の類似度計算 数十万の組合せ

単純に単一のサーバー上で各ステップを実行する方法では、全体で1ヶ月以上かかる見込みであり、毎月定期的に処理を行うのは現実的ではありませんでした。

また、データ量以外の課題として、弊社では機械学習基盤として Vertex AI Pipelines を利用していましたが、今回のシステムはその基盤上に実装できない課題がありました。先述した規模のデータを処理するにはそれに特化した複数の GCP (Google Cloud Platform) サービスを組み合わせる必要がある一方で、現行の基盤は VM インスタンス上で Python コードを実行する用途を想定していたためです。そのため、複数の GCP サービスを連携して毎月スケジュール実行するアーキテクチャを作る必要性がありました。

システム化における各課題の解決策

前述した課題をどのように解決したかを説明します。ただし、前提として類似画像検索の各ステップのうち、以下は1つのかたまりとして処理することにしました。

  • 商品画像をダウンロードし、物体存在箇所をセグメンテーションして切り抜き
  • 切り抜いた画像の Embedding 計算
  • 切り抜いた画像・Embedding ファイルを GCS にアップロード

理由は、セグメンテーションおよび Embedding 計算の両方に GPU が必要であったため、また GCS とのデータのやり取りに時間がかかることから、「商品画像ダウンロード > セグメンテーション > Embedding 計算 > アップロード」の一連のステップを同一メモリ上で行いたかったためです。

「商品画像ダウンロード > セグメンテーション > Embedding 計算 > アップロード」の高速化

Dataflow という並列分散処理が行える GCP サービスを利用することで高速化を実現しました。Dataflow とは、GCP が提供するマネージドのバッチ・ストリーミングデータ処理サービスであり、並列分散処理により大量のデータを効率的に処理することが可能です。

今回 Dataflow を選択するにあたり、Ray on Vertex AIも候補に上がりました。いずれも並列分散処理は実現できそうでした。Ray はその構文がネイティブの Python に近く、既存の Python コードに大きな変更を加える必要がなさそうだったため、実装コストが低そうに見えました。一方で、Dataflow は Apache Beam の構文を理解し覚える必要があり一定の学習コストが伴いそうでしが、社内で利用実績があり困ったときに既存の資産を参考に出来そうであったため Dataflow を選択しました。
実際に、 Dataflow の実装で困ったときに他プロジェクトでのソースコードを参考にして解決することができ、この選択は正解であったと考えています。また、 Apache Beam の構文もそれほど複雑ではなく、初期の学習コストは多少ありましたが慣れれば実装に大きく手間取ることはなかったです。

約 200 万枚の画像に対して、並列分散処理無しでは約 30 日かかる見込みでしたが、Dataflow により約 56 時間に短縮することができました。 Dataflow の設定は

  • ワーカーマシンタイプ: n1-highmem-4 (vCPU 4 、メモリ 26 GB)
  • ワーカー数: 4
  • GPUnvidia-tesla-t4

としました。ハイメモリのマシンタイプを利用した理由は、メモリ枯渇でジョブが途中で停止してしまうことがあったためです。

Embedding 同士の類似度計算処理の高速化

Vertex AI Vector Search というサービスを利用しました。Vertex AI Vector Search とは、GCP が提供するマネージドサービスで、膨大な数の Embedding 同士の類似度計算を高速に行うことができます。

Vertex AI Vector Search を利用することで約 1700億個の Embedding の組合せを約 8 時間で処理することができました。単一のサーバー上で処理した場合の処理時間は見積もっていませんが、おそらくこの規模のデータをこの速度で処理するシステムを作るにはそれなりの工数がかかったと思います。

Vertex AI Vector Search の設定としては、Algorithm type 、 Shard size はそれぞれデフォルト値である tree-AH 、 Medium で速度やコストに問題がなかったためそのままとしました。 Approximate neighbors count は値を変えて実験したところ検索速度に大きな違いが生じました。具体的には、 約 1 千万件の 768 次元のベクトルに対して類似度計算を 100 回行い、処理速度の統計量を算出したところ以下の通りでした。

num_neighbors mean (sec) std (sec) min (sec) max (sec)
10 0.031 0.013 0.021 0.076
100 0.189 0.034 0.146 0.266
1000 0.218 0.034 0.17 0.297
10000 0.519 0.149 0.39 1.109

上記実験結果より可能な限り低い値にすることで処理時間が大きく短縮できそうでした。今回の用途では 1 画像に対して同一と検知される見込み画像数は数件程度であったため、それをカバーできる 10 としました。

画像ハッシュ計算処理の高速化

このステップでも Dataflow を利用しました。

約 27 万件の画像の組合せに対して、並列分散処理無しでは約 54 時間かかる見込みでしたが、Dataflow により約 45 分に短縮することができました。 Dataflow の設定は

  • ワーカーマシンタイプ: n1-standard-1 (vCPU 1 、メモリ 3.75 GB)
  • ワーカー数: 120

としました。Dataflow を採用したことで、ワーカー数を自由に変更することができ、120 ワーカーで並列分散処理を容易に実現できました。

複数 GCP サービスを連携させてスケジュール実行するアーキテクチャ作成

Cloud Composer という Airflow のマネージドサービスを利用しました。Cloud Composer は、GCP が提供するワークフローオーケストレーションサービスで、複数のクラウドサービスを連携してスケジュール実行することができます。

今回 Cloud Composer を選択するにあたり Cloud Workflows も候補に上がりましたが、 Airflow の社内での利用実績が豊富であったため Cloud Composer を採用しました。こちらでも困ったときに他プロジェクトの既存のソースコードが参考になる場面が多く、また移行前の類似画像検索システムの一部で Airflow を利用しており、既存システムの理解がスムーズに出来たメリットもありました。

以下のようなフローで Dataflow や Vertex AI Vector Search などを連携し、類似画像検索の各ステップを実行するシステムを Cloud Composer で実装しました。

Cloud Composer 処理フロー

開発生産性や保守性を向上させるために、 Dataflow では各ステップごとに Docker Image と Flex Template を、 Cloud Batch でも Docker Image を利用しました。これにより各ステップを独立に開発・テスト・デプロイ出来るようにしました。この部分の詳細については、別途機会があれば記事として執筆する予定です。

実装時の工夫

ここでは、システム実装時の工夫を2つ紹介します。同じようなシステム構成の実装をされる方の参考になれば幸いです。

Cloud Composer からの GCP サービス呼び出し方法

Cloud Composer から 各 GCP サービスを呼び出すに当たり、Dataflow には Airflow に専用のクラスが存在しましたが、 Vertex AI Vector Search には存在しませんでした。そこで、 GCPREST API例:インデックス作成 API)を呼び出すことでリソースの作成や作成状況のポーリングを行うクラスをカスタムで実装しました。具体的には Airflow の Sensor クラスを利用して以下のようなイメージで実装しました(あくまでソースコードのイメージとして簡易化したものであり、実際のものとは異なります)。

from typing import Any, Dict

from airflow import models
from airflow.decorators import task
from airflow.sensors.base import BaseSensorOperator


@task(task_id="create_resource")
def create_resource(args1: int, args2: str):
    """GCP のリソースを作成する(Vertex AI Vector Search のインデックスなど)。

    Args:
        args1 (int): 引数1。
        args2 (str): 引数2。
    """

    # call_create_resource_api 関数は別途実装。内部で GCP の REST API を呼び出す。
    response = call_create_resource_api(args1, args2)
    return response


class ResourceSensor(BaseSensorOperator):
    """GCP のリソース操作の状況をポーリングする。

    Attributes:
        poke_task_id (str): ポーリング対象のタスクID。
        args1 (int): 引数1。
        args2 (str): 引数2。
    """

    def __init__(self, *, poke_task_id: str, args1: int, args2: str, **kwargs):
        """GCP のリソース操作の状況をポーリングするクラスを初期化。

        Args:
            poke_task_id (str): ポーリング対象のタスクID。
            args1 (int): 引数1。
            args2 (str): 引数2。
        """
        super().__init__(**kwargs)
        self.poke_task_id = poke_task_id
        self.args1 = args1
        self.args2 = args2

    def poke(self, context: Dict[str, Any]) -> bool:
        """
        リソース操作の状況をポーリングして、完了したかどうかチェックする。

        Args:
            context (Dict[str, Any]):
                Airflow のコンテキスト。どのような値が格納されているかは以下参照。
                https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

        Returns:
            bool: リソース操作が完了したかどうか。
        """
        # 作成したリソース情報(IDなど)を取得。
        response = context["ti"].xcom_pull(task_ids=self.poke_task_id)

        # call_get_resource_status_api 関数は別途実装。内部で GCP の REST API を呼び出す。
        status = call_get_resource_status_api(response, args1=self.args1, args2=self.args2)

        # status = {"done": True} のような値と仮定
        return status["done"]


with models.DAG(
    "dag_name",
    schedule_interval="0 0 * * mon",
) as dag:
    create_resource_task = create_resource(args1, args2)
    wait_create_resource_task = ResourceSensor(
        task_id="wait_create_resource",
        poke_interval=60 * 10,
        timeout=3600 * 3,
        poke_task_id="create_resource",
        args1=args1,
        args2=args2,
    )

    create_resource_task >> wait_create_resource_task

ここで、 Airflow のコンテキストを利用してポーリング時に必要なリソース情報(ID など)を Sensor クラスで取得するようにしました。

Vertex AI Vector Search での類似度計算時にリトライ

Vertex AI Vector Search で類似度計算を行う際に、Exponential backoff アルゴリズムによるリトライ処理を入れるようにしました。理由は、実際に運用していると Vertex AI Vector Search の呼び出し時に google.api_core.exceptions.InternalServerError: 500 Failed to call Service Control Check.google.api_core.exceptions.Unknown: None Stream removed というエラーが稀に発生することがあったためです。Vertex AI Vector Search による類似度計算は1回の定期実行あたり約 8 時間かかるため、途中で停止するとリトライにかかる時間が大きいという問題がありました。

Exponential backoff アルゴリズムによるリトライは backoff ライブラリを利用し、リトライ対象のエラーは google.api_core.exceptions.ServerError としました。ソースコードを見ると今回発生した google.api_core.exceptions.InternalServerErrorgoogle.api_core.exceptions.Unknown がこのクラスの子クラスであり、他の子クラスも GCP 側のサーバーエラー起因のものであるためリトライする方が良いと判断したためです。

以下が実際のソースコードのイメージです。(簡易化したものであり、実際のものとは異なります)。

from typing import List

import backoff
from google.api_core.exceptions import ServerError
from google.cloud.aiplatform import MatchingEngineIndexEndpoint
from google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint import (
    MatchNeighbor,
)


# エラーが解消するまでの待ち時間が不明なため、リトライの最大時間は1時間半とした。
# 類似度計算は1回の定期実行あたり約 8 時間かかるため、1時間半の待ち時間は許容する。
@backoff.on_exception(backoff.expo, ServerError, max_time=5400)
def find_neighbors(
    index_endpoint: MatchingEngineIndexEndpoint,
    deployed_index_id: str,
    queries: List[List[float]],
    num_neighbors: int,
) -> List[List[MatchNeighbor]]:
    """デプロイされたインデックスで与えられた Embedding に対して近似最近傍探索を実行。

    Args:
        index_endpoint (MatchingEngineIndexEndpoint): Vertex AI Vector Search のインデックスエンドポイントクラス。
        deployed_index_id (str): インデックスのデプロイID。
        queries: List[List[float]]: Embedding のリスト。
        num_neighbors (int): 近似最近傍探索で取得する Embedding 数。

    Returns:
        List[List[MatchNeighbor]]: 類似度トップ `num_neighbors` の id と類似度のリスト。
    """
    return index_endpoint.find_neighbors(
        deployed_index_id=deployed_index_id, queries=queries, num_neighbors=num_neighbors
    )

実際にこのリトライ処理を入れたことで、その後の運用時に同じエラーが発生することがありましたが、無事にリトライされることで途中で停止せずに実行完了していました。

まとめ

本記事では、類似画像検索システムの内製化におけるシステム面での課題と、それをどのように解決したかについて説明しました。大規模なデータを毎月現実的な時間で処理しなければならない課題を、複数の GCP サービスを組み合わせて高速化することで解決しました。また、複数の GCP サービスを連携してスケジュール実行する必要がある課題を、 Cloud Composer を利用して実装することで解決しました。

今後は他の機能への応用を検討しています。本記事が類似のシステムを構築されている方々の参考になれば幸いです。

明日の記事の担当は UI/UXチーム の飯沼さんです。お楽しみに。


エニグモでは一緒にデータを利用したサービス価値向上を実現していただけるデータサイエンティストを募集中です!世界178ヶ国に1100万人超の会員を有し、出品数は630万品を超えるBUYMA には膨大なデータが蓄積されており、データ活用の余地はまだまだあります。ご興味ある方はカジュアル面談からでもお話できますと幸いです。

他の職種も絶賛募集中です!

株式会社エニグモ すべての求人一覧

hrmos.co