Rails + マイクロサービスでイベント駆動アーキテクチャを導入した話

はじめに

こんにちは、サーバーサイドエンジニアの@hokitaです。

この記事は Enigmo Advent Calendar 2020 の 16 日目の記事です。

弊社が運営するBUYMAは現状モノレポで管理されており、10年以上も運営しているサービスなのでソースも肥大化していて、メンテナンスが難しくなってきました。 そこで現在、本体から少しずつマイクロサービスに切り離していこうとしています。

その取組の中で配送処理の一部をマイクロサービス化する作業に携わることができました。今回はBUYMA本体と配送サービスとの通信にイベント駆動アーキテクチャを導入した話をしていきます。

イベント駆動アーキテクチャ

マイクロサービスでサービスを切り分ける場合、それぞれ責務が分かれるように分割するかと思います。 しかしサービス間の通信手段によっては各サービスが密になる恐れがあります。

そこでイベント駆動アーキテクチャを利用します。

f:id:hokita:20201215115957p:plain
PublisherとSubscriber

メリットとして

  • PublisherはSubscriberのことを知らない
  • Subscriberはイベントのこと以外知らない
  • Publisher1つに対し、複数のSubscriberを設置できる

このようにPublisherとSubscriberはお互い知らないので、疎の関係を保つことができます。

非同期メッセージングサービス

イベント駆動アーキテクチャを実現するために、非同期メッセージングサービスを利用します。

代表的なものとして

などがあります。

今回は Amazon SNS / Amazon SQS を使用しました。

Amazon SNS

docs.aws.amazon.com

発行者からサブスクライバー (プロデューサーおよびコンシューマーとも呼ばれます) へのメッセージ配信を提供するマネージド型サービスです。

メッセージを発信するものと思って頂ければと思います。メッセージの受け取り先(サブスクライバー)として今回はAmazon SQSですが、他にもAWS Lambdaなどでも取得することが可能です。

なぜSNSを利用するのか

次の図を見て頂ければ分かる通り、SNSがないとBUYMA本体が配送サービスを知っていることになります。つまりになってしまいます。

f:id:hokita:20201215120053p:plain
SNSなしの場合

Amazon SQS

docs.aws.amazon.com

メッセージキューイングサービスです。同じ非同期処理としてsidekiqやResqueを使っているサービスも多いかと思います。

主な特徴としては

  • "at least once"(最低1回)が保証されている
    • 逆に2回以上同じメッセージを取得する可能性がある
    • Redisのようにジョブを失うことがない
  • 順不同(それなりに担保されるのかと思っていましたが全く順不同でした。)

FIFO(First-In-First-Out)キュー

通常キューとは別でFIFOキューというものも使用することができます。

  • 受信する順序が保持される
  • 必ず1回処理される
  • 1つずつ処理される
  • 通常キューより処理が遅くなる

処理順序が決まっている処理(例えば製品価格の変更処理など)で便利かと思います。

ロングポーリング

ショートポーリングとロングポーリングというものがありますが、基本ロングポーリングが良いです。(ショートポーリングはいつ使われるのだろうという感じです。)

ロングポーリングですが最大20秒キューにメッセージがないか待機をして、あれば即座に実行します。(筆者は最初20秒間に溜まったメッセージを処理するのかと思っていましたが、勘違いでした。メッセージを取得したら即座に実行です。)

Shoryuken

github.com

RailsでSQSをジョブキューとして利用するときは現状Shoryuken一択です。Shoryukenを起動することでSQSのキューをポーリングし、キューを取得して処理を実行してくれます。エンキューももちろんできます。

一通りwikiに必要な情報が書かれているので、そちらを読めば問題なく実装できるかと思います。

以下設定ファイルや実装例を記載します。

shoryuken.yml

対象のキューの情報を記載します。

Shoryuken options · phstc/shoryuken Wiki · GitHub

# config/shoryuken.yml

groups:
  purchase_completed:
    concurrency: 1
    queues:
      - ['purchase_completed_queue', 1]
pidfile: ./tmp/pids/shoryuken.pid

※groupsで分けている理由

groups:
  group1:
    concurrency: 1
    queues:
      - ['a_queue', 2] # 重さ2
      - ['b_queue', 1] # 重さ1

このように同じグループに複数のキューを設定している場合、a_queueがメッセージを取得してもb_queueにメッセージがない場合はポーリング時間が終わるまでa_queueの処理は実行されないので注意が必要です。

Processing Groups · phstc/shoryuken Wiki · GitHub

またa_queueb_queueで大量にメッセージがある場合、a_queueb_queueの重さの2倍なので、処理の優先度も2倍になります。

Polling strategies · phstc/shoryuken Wiki · GitHub

shoryuken.rb

SQSの情報を記載します。

# config/initializers/shoryuken.rb

# ロングポーリング
Shoryuken.sqs_client_receive_message_opts = { wait_time_seconds: 20 }

Shoryuken.sqs_client = Aws::SQS::Client.new(
  region: ENV['AWS_REGION'],
  access_key_id: ENV['AWS_ACCESS_KEY_ID'],
  secret_access_key: ENV['AWS_ACCESS_SECRET_KEY']
)

起動

$ bundle exec shoryuken -R -C config/shoryuken.yml

ジョブ

例)決済完了時に実行するジョブ

# app/jobs/purchase_complete_job.rb

# 決済完了時のジョブ
class PurchaseCompleteJob < ApplicationJob
  include Shoryuken::Worker

  shoryuken_options queue: '<キュー名>',
                    auto_delete: true,
                    body_parser: :json

  def perform(_sqs_msg, body)
    message = JSON.parse(body['Message'])

    # 保存処理
  end
end

エンキュー

エンキューも簡単にできます。

# 配送ステータスの通知ジョブにエンキュー
StatusNotificationJob.perform_async(
  status_data: 'some status data'
)

FIFOキュー

前述したFIFOキューですが、Shoryukenでも扱うことができます。 基本通常キューと記述は同じですが、メッセージグループIDを指定することができます。 同じメッセージグループIDだと厳密な順序で、常に1件ずつ処理をします。

例えば取引のステータスを変更したい場合、ステータス変更の順序は重要だが、他の取引同士の順序は気にしない場合はメッセージグループIDに取引IDを指定したら良さそうです。

SomeJob.perform_async(
  csv_id: csv.id,
  message_group_id: <MESSAGE_GROUP_ID>
)

イベント駆動は一筋縄ではいかない話

各技術の説明をしてきましたが、ここからはイベント駆動アーキテクチャ導入時に必ず考慮する点を書いていきます。

冪等性

同じ処理を何度実行しても同じ結果が得られる性質のことです。

冪等 - Wikipedia

Amazon SQSの通常キューでは2回以上同じメッセージを取得する可能性があるので、重複してメッセージを取得することを考慮する必要があります。

今回実装した決済完了イベントでは、取引ID(冪等キー)をUnique Keyにして、DB側で制御するようにしました。

# 配送サービスの配送情報保存処理
def save!
  Delivery.save!(params)
rescue ActiveRecord::RecordNotUnique => e
  # 念の為取引ID(冪等キー)で重複していることを確認
  raise e unless Delivery.exists?(order_id: params[:order_id])

  # ログだけ残して正常終了させる
  Rails.logger.info(
    "#{self.class}##{__method__} Message: #{e}"
  )
end

もしくはFIFOキューを導入することでも解決が可能です。

整合性

モノレポの場合はDBのtransaction機能を利用して、失敗したらロールバックすることが可能です。しかしマイクロサービスだとそう簡単にはできません。

一般的にはTCCパターンやSagaパターンを使用する必要がでてきます。 qiita.com

今回は下記理由で、ロールバック処理を行いませんでした。

  • 連携するサービス数が少ないので今のところ処理がシンプル
  • 外部からの入力事項は送信前にバリデーションされている
  • BUYMA本体から配送サービスに送られてくるメッセージは信頼する
  • BUYMA本体の取引データと配送データに不整合がないかを毎日チェックしている

現状は問題なく動作していますが、今後はもっと堅牢にするためにTCCパターンやSagaパターンを導入する必要がありそうです。

最後に

イベント駆動アーキテクチャに使用した技術と考慮するポイントを書いていきましたが、マイクロサービスは思っている以上に学習することが多く、また実際に作成して経験値を積んでいくことが大事だと実感しました。 今後も実践を経て、レガシーから脱却できるように精進していきたいです。

明日の記事の担当は検索エンジニアの伊藤さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co