パフォーマンスを95%改善!Sidekiqで実現する効率的なジョブ並列化と状態管理

こんにちは!WEBアプリケーションエンジニアの川本です!

この記事はEnigmo Advent Calendar 2024の1日目の記事です。

弊社の運営する BUYMA では、社内システムよりタイムセールを毎週 約100万商品 に設定しています。しかし従来のシステムでは、この処理に 約100時間 もかかっており、運用負荷が大きな課題となっていました。本記事では、パフォーマンス改善によって処理時間を 約5時間 に短縮し、運用効率を向上させた事例をご紹介します。

タイムセール設定機能について

処理フロー

タイムセール設定機能はざっくり以下のような処理フローになっています。

  1. CSVファイルのアップロード
    タイムセール設定依頼情報を記載したCSVファイルを S3 にアップロードします。
  2. SQSへのエンキュー
    S3にファイルがアップロードされると、SQSにメッセージがエンキューされます。
  3. Redisに保存
    常駐しているデーモンがSQSキューをポーリングし、SQSから取得したメッセージをRedisに保存します。
  4. Sidekiqジョブの実行
    Redisに保存されたメッセージをSidekiqがデキューして、タイムセール設定の処理を実行します。

性能

タイムセール設定を行うSidekiqの性能は以下の通りです。

  • 専用プロセスの使用
    タイムセール設定ジョブは専用のSidekiqプロセスで実行。
  • 並列処理数
    1プロセスあたりの並列処理数は10。
  • 冗長化
    2台のサーバー構成で冗長性を確保。

設定例(sidekiq.yml)

---
:labels:
  - default
:concurrency: 2
:pidfile: tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
production:
  :concurrency: 10
:queues:
  - [setting_timesale, 1]

課題の発見

従来の仕組みでは、タイムセール設定依頼のCSVファイル1つを1スレッドで処理していました。これにより、以下のような問題が発生していました。

  • スレッド活用不足
    Sidekiqはマルチスレッド対応で高い並列処理性能を持っていますが、1つのスレッドが1つのCSVを丸ごと処理していたため、マルチスレッドであること効果的に活用できていませんでした。
  • 長時間実行による運用負荷
    1つのCSVには約100万件の商品データが含まれており、これを1スレッドで処理することで、処理時間が長時間に及び、運用上現実的でない時間になっていました。
  • CSV分割の運用負荷
    スレッドを有効活用するには、CSVをあらかじめスレッドの数だけ手動で分割する必要があり、これが運用上の手間となっていました。

改善アプローチ

上記の課題を解決するため、システム側で1つのCSVデータをバッチ単位(100商品)で分割し、各バッチを複数スレッドで並列処理する方式に変更しました。これにより、処理の効率化と運用負荷軽減を同時に実現することが可能になりました。

改善後の処理の流れ

  1. 親ジョブCSVデータをバッチ単位に分割し 子ジョブを作成・エンキューする。
  2. 子ジョブが実際のタイムセール設定処理を実行する。
  3. 監視ジョブ子ジョブの進捗を追跡し、全て完了したら設定完了の処理を実行する。

登場してきたジョブについて↓

親ジョブ

  • CSVファイルから取得したタイムセール設定依頼データをバッチ単位に分割。
  • 各バッチについて子ジョブをエンキューし、ジョブIDを記録。
  • 最後に監視ジョブをエンキューし、子ジョブの進捗を監視。

実装例↓

module SidekiqWorker
  # 親ジョブ
  class Parent
    include Sidekiq::Worker
    sidekiq_options queue: :setting_timesale

    BATCH_SIZE = 100

    def initialize(args)
      # 省略
    end

    def perform
      child_job_ids = []

      # バッチ単位でタイムセール依頼データを分割
      timesale_request_data.each_slice(BATCH_SIZE) do |batch|
        # バッチ単位でタイムセール設定を行う 子ジョブ をエンキュー
        child_job_ids << SidekiqWorker::Child.perform_async(batch)
      end

      # 子ジョブの進捗状況を管理する 監視ジョブ をエンキュー
      SidekiqWorker::Monitoring.perform_async(child_job_ids)
    end

    private

    # s3からタイムセール設定依頼データを取得
    def timesale_request_data
      # 省略
    end
  end
end

子ジョブ

  • バッチ単位(100件)の設定依頼に対してタイムセール設定の処理を実行。
  • ジョブの進捗状況をRedisに記録。

子ジョブの状態管理に sidekiq-status というgemを使用しました。

github.com

sidekiq-status は、Sidekiqで実行中のジョブの状態を追跡するためのGemです。ジョブの状態(例: queued, working, completeなど)をRedisに保存し、進捗をリアルタイムで確認できるようになります。

module SidekiqWorker
  # 子ジョブ
  class Child
    include Sidekiq::Worker
    include Sidekiq::Status::Worker
    sidekiq_options queue: :setting_timesale

    # 24時間ジョブの状態をRedisに保持する
    def expiration
      @expiration ||= 60 * 60 * 24
    end

    def perform(timesale_request_data)
      SetTimeSaleService.new(timesale_request_data).call
    end
  end
end

監視ジョブ

  • 子ジョブの進捗を追跡し、全ての子ジョブが完了した場合に完了処理を実行
  • 追跡できない子ジョブが存在した場合はアラートを飛ばす。

sidekiq-statusのAPIよって以下のように子ジョブの状態を確認することができます。

job_id = SidekiqWorker::Child.perform_async

# :queued, :working, :complete, :failed or :interrupted, nil after expiry
status = Sidekiq::Status::status(job_id) # <- ジョブ状態を確認
Sidekiq::Status::queued?      job_id # <- キューにあるか?
Sidekiq::Status::working?     job_id # <- 実行中か?
Sidekiq::Status::retrying?    job_id # <- リトライ中か?
Sidekiq::Status::complete?    job_id # <- 完了したか?
Sidekiq::Status::failed?      job_id # <- 失敗したか?
Sidekiq::Status::interrupted? job_id # <- 中断されたか?

実装例↓

module SidekiqWorker
  # 監視ジョブ
  class Monitoring
    include Sidekiq::Worker
    include Sidekiq::Status::Worker
    sidekiq_options queue: :setting_timesale

    SLEEP_TIME = 15

    def perform(job_ids)
      @job_ids = job_ids

      loop do
        @job_ids.reject! do |job_id|
          job_complete?(job_id)
        end
        break if @job_ids.empty?

        sleep SLEEP_TIME
      end

      TimeSaleSettingCompletionService.new(args).call
    end

    private

    # ジョブが完了したか?
    def job_complete?(job_id)
      Sidekiq::Status.complete?(job_id)
    end
  end
end

学んだこと

性能要件の見直しの重要性

長年運用されているシステムは、リリース当時の性能要件がそのまま適用されていることが少なくありません。しかし、システムの利用状況や運用環境は時間の経過とともに変化します。今回の事例でも、リリース当初は妥当だった処理速度が、現在では運用負荷を引き起こす大きな要因になっていました。

そこで重要なのは、現状の運用方法をしっかりと把握し、必要に応じて性能要件を再定義することです。運用者にヒアリングを行い、現在の問題点を明確にすることで、改善に向けた具体的な指針を得ることができます。

並列化によって増す複雑性

並列化はシステムの処理速度を大幅に向上させる一方で、複雑性を増す側面があります。一連の処理が複数のジョブに分散されるため、それぞれのジョブの状態を適切に管理する必要が生じます。今回の事例でも、Sidekiqを用いた並列化に伴い、以下のような課題が明らかになりました。

  • ジョブの状態管理コスト
    並列化することで、ジョブの進捗や完了状態を追跡する仕組みが必要になります。このため、Redisを活用したジョブの状態管理が不可欠となりましたが、それには追加の開発コストと運用リスクが伴います。
  • ジョブの状態欠損のリスク
    例えば、Redisに障害が発生した場合、一部のジョブの状態が欠損する可能性があります。このリスクを考慮し、ジョブの再実行や障害時のリカバリープロセスを検討する必要があります。

並列化のインパクトは大きいですが、システムの複雑性は増してしまいます。まずは 処理内容自体のパフォーマンス改善 に目を向けることが大切です。今回のプロジェクトでは、並列化に先立ち、子ジョブ内で実行されるタイムセール設定処理の最適化を行いました。

おわりに

パフォーマンス改善は、単に処理を「速くする」ことが目的ではなく、システムの要件や複雑性を考慮し、最適なバランスを見つけることが重要だなと思いました。

今回学んだことを活かしてこれからもパフォーマンス改善に取り組んでいきたいです!


株式会社エニグモ すべての求人一覧

hrmos.co