今年組織作りに貢献するためにやったこと

今年組織作りに貢献するためにやったこと

こんにちは。アプリケーション開発グループの穴澤です。

この記事は Enigmo Advent Calendar 2020 の 22 日目の記事です。

今年の振り返りとともに主に自分が去年から今年にかけて中途で入社した方にやってきたことを書いておきます。 これからチームに新しい人を迎える人や、中途採用入社する方、チームの組織づくりに興味がある方に読んでいただければとおもいます。

実際に取り組んだこと

  1. オンボーディング資料
  2. 開発手法、フロー・チェックリストの読み合わせ
  3. 1on1

1.オンボーディング資料

f:id:enigmo7:20201209113806p:plain

エニグモでは部署問わず、今年から中途で入社した社員向けのzoomでのオンボーディングが開始されました。 簡単にいうと、「各事業部が具体的になにをやっているか」「どんな取り組みをしているか」を中途入社した方に説明します。 入社された人は部署ごとに数回に分けて参加しています。

思えば今年の早い段階からリモートワークが推奨された状況で、中途入社問わず、自分の所属する部署以外の方と顔を見合わせて話をする機会というのが極端に減りました。例えば「先月入社した方ですね、XX部署のXXです!」といったようなコミュニケーションも、廊下やエレベータで顔をあわせたり、ミーティングの終わりですれ違う、お手洗いで顔を合わせるなどのタイミングで行われてきました。それぞれを取るととても些細なことですが、これらを積み重ねた何かが会社の社風や文化を醸造する一部になっていたんだなと感じます。

社風や文化をオンラインでも実現するために、このオンボーディング時に自分が紹介する担当として実践していることは以下の点です。細かい箇所は割愛しますが、リモートワークならでは、でしょうか。

  • 説明資料に自部署の社員の氏名とSlack名の紐付け(顔とアイコンと名前とSlack名が一致しない問題)
  • アプリ、インフラ、データ基盤、それぞれについての相談相手としてまずは覚えておいてほしい人をエピソードを添えて紹介(そんなに色々紹介されても覚えられない問題)
  • BUYMAの仕組みで聞きたいこと、相談したいことがあったら、ここで相談してね、のチャットの紹介(何かを相談したくなってもどこだかわからない問題)
  • 参加してくれた方とサービスエンジニアリング本部との具体的な関わり(紹介してもらったけど具体的にどういう関わりがあるんだろう問題)

2.開発手法、フロー・チェックリストの読み合わせ

f:id:enigmo7:20201209112322p:plain

BUYMAというサービスが産声を上げて10年以上たちます。尖った技術を使っているところもあれば枯れているところ、 やや温かみのある仕組みのところもあり、中途で入社するエンジニアの方が躓くところもあるため、主に若手の方対象に週1回程度開発やリリースの際に 気をつけてほしい点をesaにまとめたフロー・チェックリストの読み合わせを行いました。

基本的に開発に関わる問題点は、所属しているチームのリーダーやメンターが答えてくれますが、彼らに一点集中よりも、誰にきいても答えてもらえる、誰にきいても大丈夫、という雰囲気作りに自分も貢献したかったからです。ましてや、自分の所属するチーム以外のエンジニアと、躓きや悩みを共有する場所はいくつあってもいいと考えています。 時には検索エンジンチームのエンジニアをゲストに迎えて検索周りの説明をしてもらったり、チェックリストについては実際トラブルに遭遇したエンジニアに「なぜこのチェック項目があるのか」というのを語ってもらいました。

3.1on1

1on1は前職でも取り組んできたことですが、今年特に自分が取り組んできたことをあげます。

アジェンダを用意する

序盤はなかなか用意できませんでしたが、後半は実施の1,2日前にいくつかアジェンダを用意するようにしました。 4,5つ用意してその中のトピックで話が広がれば盛り上がり、割愛してできるだけ「話が盛り上がる」「相手の指向性、興味関心がわかりそう」な所を意識してすすめていきました。

1on1は「・・・最近どうですか」というやり方もありますが、1on1を実施するメンバーの身になって考えてみると 「なにを話せばいいんだ」「なにを言われるのか」と身構えられてしまうこともあります。明日はこういう事を話そう。を用意する方が 時間も有意義ですし、もしネガティブな話をされるということが事前にわかっていれば、自分が言いたいことも事前に用意するなど、お互いに準備ができ、結果的に少しの準備で得られる物が大きいと感じました。

会社全体の動きがわかるトピックをアジェンダに混ぜて話す

メンバーの仕事や取り組みと直接関係のない、会社の動きや施策。最近の売上。他部署に入社した人のこんなところが凄いなど。 会社の中での私達の部署の動き、施策以外の取り組みや課題など、自分と会社をつないでいる環境にどんなものがあり、どんな動きをしているか。中途入社の序盤だと、見えにくい組織の動きを定期的にアジェンダに混ぜて話をするようにしています。

  f:id:enigmo7:20201209112739p:plain

まとめ

今年は、いつもやっていること+αの「工夫する」注力をしました。特に、コロナ禍のリモートワークで感じやすい「孤独感」を和らげるための「皆さんは一人ではないですよ」ということを伝えるために、様々なできることを模索した年だった様に思います。取り組みの結果を定量的にはかることはできないので難しいですが、フィードバックを受けながら、来年も少しずつ改善していきたいと思います。 よいお年を。

明日の記事の担当は インフラグループ の 加藤 さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

データアナリストが転職活動で求めたこと

こんにちは、データアナリストの田中里澄です。
エニグモではデータ活用推進室という部署に所属しており、主に他部署が行った施策の効果検証を担当しています。

私は2020年11月にエニグモに入社したので、今回はその転職活動の中でどうして今の職を選んだのか、また入社後どのような仕事をしているのかを紹介できればと思います。
前職はライブ配信サービスを運営している会社で、同じくデータアナリストとして働いていました。
転職理由について前職に対してネガティブなことは一切なく(むしろ今も大好きな会社です)、今の自分は別の会社で経験を積んだ方がいいと判断したためです。

なぜこの内容を書くのか?

理由は社外の方向けと社内の方向けでそれぞれあります。

  • 社外の方にはデータアナリストの転職活動の参考にしてもらいたいため

  • 社内の方にはまだ入社して日が浅い私の自己紹介になるためです。転職活動だけでなく、自分がどんな仕事をできるか/したいかについても書いています。

そもそもデータアナリストとは?

ここであくまで自分の考えにはなりますが、データアナリストとはどんな仕事をするのか?を説明します。
読んでくださっている方に知っていただきたいですし、私が転職活動をしたときにも実際に自分の中で整理しました。
その整理のおかげで、無数にある求人(特にデータ〇〇という職種は最近多い)の中でピックアップする際や、面接時に自分がしたいことを説明する際に役立ちました。

データアナリストがどんな仕事をするのかを一言で表すと、「意思決定に必要な情報/インサイトを提供する仕事」だと思っています。
データを分析する仕事じゃないの?という意見もあるかもしれませんが、分析はあくまで手段であり意思決定をすることが目的だと考えているのでこのような書き方をしました。
もしかするとコンサルという職種でイメージされる仕事に近いかもしれません。(余談ですが、思考法を勉強するためにコンサル向けの本を読むこともあります)
逆にいうと、こういった仕事をする人のことを「データアナリスト」と定義している会社が多いとも言えます。(本質的にはこちらが正しいかもしれません)

どんな会社を求めていたのか?

上記で書いたデータアナリストとしての仕事ができることはもちろんですが、さらにそれを具体化した自分のしたい仕事について2点、またそれに関連した仕事の環境について2点合わせて4点求めていることがありました。

どんな意思決定に携わりたいのか

意思決定にも長期的な経営に関するものや日々のオペレーションに関するものなど色々な種類があります。
その中でも私はプロダクトの改善に関する意思決定に携わりたいと考えていました。
なぜならユーザーに喜んでもらえることに直結して実感しやすく、また分析するなかで人がどんな考えで行動するのかを垣間見ることができるためです。

どんなデータを分析するのか

データは大きく分けると定量と定性の2種類に分けることができますが私はその両方を用いて分析したいと考えていました。
特に定量的なデータだけを扱うことが求められる場合もありますが、ユーザーの行動理由を知るためにはそれだけでは分からないことがたくさんあるためです。

他の職種(特に意思決定者)との連携が取りやすい

上の方で書いていたデータアナリストの仕事内容を遂行するためには他の職種の方との連携が大切です。
その中でも特に最終的に意思決定をする方との連携がとても大事だと考えていました。
分析をする前にはどんなことが分かれば意思決定ができるかを摺り合わせ、分析後にはそこで分かったことを共有し議論することで意思決定に付き添うためです。

分析をするためのデータ基盤が整っているか

整っているという言葉が曖昧ですが、例外はあれど日々の分析をする際の前処理やデータの準備に自分の持っているスキル以上のことを求められないことが基準でした。
もちろん後々はデータ基盤の構築などのスキルも伸ばしていきたいのですが、それをすぐに求められて本当にやりたいことに時間が使えないのはしんどいので求める要件に入っていました。

実際に入社してみて感じたこと

最終的にエニグモ社に決めた理由としては、上記の求めていることを全て満たせていると選考中や内定をいただいた後の面談で感じたからです。
ここでは入ってみて実際に感じたことを紹介します。

  • 分析結果を共有する際に、集計結果だけでなく考察やネクストアクションも求められまた歓迎されていい。
  • 求めていた意思決定者との連携について、非常に取りやすいので分析の方針を立てやすい。
  • データ分析基盤について、基本的に必要なデータがBQにあって扱いやすい。ただし、それぞれのテーブルの定義などがわからないところやGAのデータについての扱い方を調べる必要があったりと、データはあるが扱い方は聞いたり調べたりが必要なので、今後入ってくる方のためにまだまだ整える余地がある。

入ってからどんな仕事をしているか

主に2種類の施策の振り返りとアプリの分析方針を立てる仕事をしています。

施策についてはどちらも私が入社する少し前から始めていたもので、毎月改善を加えてやっていこうという前提のものでした。
なので振り返りでどんな数字を見るのか?というところから任せてもらい、翌月の改善点の提案を出すところまで行っています。

アプリの分析方針については、私が入社するまでは売上というKGIを追いつつ、KPIとして何を追うべきかについて試行錯誤をしていたという状況です。
そこで今後追うべきKPIについて決める仕事を担っており、今は私の提案を元に議論しています。

総じていうと、現在自分のしたいと思っていた仕事ができており今後もまだまだやりたいことが溢れています。
コロナ禍で入社後のコミュニケーションは以前と比較すると難しいなとは感じておりますが、より一層迅速で納得感のある意思決定に貢献できるように日々の業務に努めていければと思っています。


株式会社エニグモ 正社員の求人一覧

hrmos.co

Apache Airflowで実装するDAG間の実行タイミング同期処理

こんにちは。
今年4月にエニグモに入社したデータエンジニアの谷元です。
この記事は Enigmo Advent Calendar 202020日目の記事です。

目次

はじめに

コロナ禍の中、皆さんどのようにお過ごしでしょうか。 私はリモートワークを続けてますが、自宅のリモートデスクワーク環境をすぐに整えなかったため、薄いクッションで座りながらローテーブルで3ヶ月経過した頃に身体の節々で悲鳴をあげました。猫背も加速...

さて、エニグモでの仕事も半年以上経過し、データ分析基盤の開発運用保守やBI上でのデータ整備などを対応をさせていただいてますが、今回は社内で利用されているAirflowの同期処理の話をしたいと思います。

尚、こちらの記事を書くにあたり、同環境の過去記事 Enigmo Advent Calendar 2018 がありますので、良ければそちらもご覧ください。

そもそも同期処理とは?

例えば、以下のようなデータ分析基盤上でのETL処理があったとします。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
処理2. DWH上のデータを加工後、外部へCSV連携する

上記の処理は順番に実行されないと必要なデータを含めた抽出ができなくなってしまい、外部へデータ連携できなくなります。そのためには処理1の正常終了確認後に処理2を実施するという、同期処理を意識した実装をする必要があります。

例だとシンプルですが、業務上では徐々にETL処理も増えていき、複雑化していきます。 こうした同期処理は、初めはなくても問題にならないことが多いのですが、徐々に処理遅延が発生してタイミングが合わなくなったり、予期せぬ一部の処理エラーが原因で関連する後続処理が全て意図せぬ状態で動き出してしまったりします。そうならないためにも、設計時点で同期について意識することが大事だと思います。

Airflowによる同期処理

Airflowでは ExternalTaskSensor を使用することで実装可能となります。 Airflowのソース上ではdependという用語が使われているので、Airflowの世界では「同期」ではなく、「依存」と呼んだ方が良いのかもしれません。適弁読み替えていただければ...

では、Airflowでの同期検証用サンプルコードを作成してみましたので、実際に動かしながら検証したいと思います。尚、Airflowバージョンは1.10.10となります。

先ほどのETL処理を例にして、下記の内容で検証してみます。 実際のファイル出力やDBへのロード処理などはここでは割愛して、DummyOperatorに置き換えてます。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
  dag_id: sample_db_to_dwh_daily
  schedule: 日次16:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

処理2. DWH上のテーブルを用いてデータ加工後、外部ツールへCSV連携する
  dag_id: sample_dwh_to_file_daily_for_sync_daily_16
  schedule: 日次17:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

検証時のコード

まずは処理1のサンプルコードになります。

from airflow.models import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='sample_db_to_dwh_daily',
    start_date=datetime(2020, 12, 1),
    schedule_interval='0 16 * * *'
)

tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

for table_name in tables:

    db_to_dwh_operator = DummyOperator(
        task_id='db_to_dwh_operator_%s' % table_name,
        dag=dag
    )

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed'
    )

    db_to_dwh_operator >> terminal_operator

次に処理2のサンプルコードです。

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta


tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

def _dwh_to_file_operator(dag,
                         table_name,
                         depend_external_dag_id,
                         depend_external_dag_execution_delta):

    dwh_to_file_operator = DummyOperator(
        task_id='dwh_to_file_operator_%s' % table_name,
        dag=dag
    )

    sensors = []
    sensors.append(ExternalTaskSensor(
        task_id='wait_for_sync_db_to_dwh_%s' % table_name,
        external_dag_id=depend_external_dag_id,
        external_task_id='terminal_operator_%s' % table_name,
        execution_delta=depend_external_dag_execution_delta,
        allowed_states=[State.SUCCESS, State.SKIPPED],
        dag=dag))

    for sensor in sensors:
        sensor >> dwh_to_file_operator

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed')

    dwh_to_file_operator >> terminal_operator


args = {
    'start_date': datetime(2020, 12, 3)
}

dag = DAG(
    dag_id='sample_dwh_to_file_daily_for_sync_daily',
    default_args=args,
    schedule_interval='0 17 * * *'
)


for table_name in tables:
    _dwh_to_file_operator(
        dag=dag,
        table_name=table_name,
        depend_external_dag_id='sample_db_to_dwh_daily',
        depend_external_dag_execution_delta=timedelta(hours=1)
    )

サンプルをAirflow画面で見ると?

上記2つのDAGをAirflowのWebUIで見ると以下のようになります。

f:id:enigmo7:20201211201546p:plain

同期処理をするoperatorは「wait_for_sync_db_to_dwh 」です。図でも ExternalTaskSensor のアイコンになってますね。 今回は3テーブルあり、それぞれ並列処理を想定しているため、3つあります。 そして、それぞれ、該当するテーブルのterminal_operator処理完了を確認後、後続処理が実行されます。

では上記の同期処理がAirflowのログでどのように表示されるか見ていきたいと思います。

同期遅延なし時のAirflowログ

まずは、既に sample_db_to_dwh_daily が完了していた場合です。 3テーブル毎のログを載せておきます。

TABLE_DAILY_1

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,833] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,840] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,841] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,841] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,841] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,848] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,851] {standard_task_runner.py:53} INFO - Started process 48063 to run task
[2020-12-09 02:00:09,905] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,921] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,924] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,834] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,834] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_2

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,831] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,840] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,840] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,846] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_2> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,848] {standard_task_runner.py:53} INFO - Started process 48062 to run task
[2020-12-09 02:00:09,902] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,916] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_2 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,918] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,921] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_2, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,832] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,832] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_3

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,829] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,837] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,837] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,844] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_3> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,847] {standard_task_runner.py:53} INFO - Started process 48061 to run task
[2020-12-09 02:00:09,901] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,915] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_3 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,917] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,920] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_3, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,831] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,831] {local_task_job.py:103} INFO - Task exited with return code 0

注目して欲しい行は下記となります。
※ 他テーブルのログも同じですので、ここから先は記載を割愛します

[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...

これが同期処理のログになります。 今回は既に正常終了していたため、1回の確認しか行われておらず、その後、後続処理も含めて正常終了してることが分かります。

同期遅延あり時のAirflowログ

次に前提のタスクが遅延して正常終了した場合のAirflowログもみていきたいと思います。

~~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08 17:00:00
~~~
[2020-12-10 02:00:04,174] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,182] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-10 02:00:04,182] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,189] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-08T17:00:00+00:00
[2020-12-10 02:00:04,192] {standard_task_runner.py:53} INFO - Started process 21683 to run task
[2020-12-10 02:00:04,245] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-10 02:00:04,259] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:01:04,322] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:02:04,385] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:03:04,448] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:04:04,511] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...

(省略)

[2020-12-10 10:27:29,163] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:28:29,226] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:29:29,273] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:30:29,298] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,360] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,363] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-10 10:31:29,366] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201208T170000, start_date=20201209T170004, end_date=20201210T013129
[2020-12-10 10:31:33,698] {logging_mixin.py:112} INFO - [2020-12-10 10:31:33,698] {local_task_job.py:103} INFO - Task exited with return code 0

先ほどのPoking行がログに表示され続けていることが分かります。今回は途中でタスクを手動で動かして正常終了させました。

尚、Pokingの間隔はExternalTaskSensorの引数でpoke_interval を設定すると変更ができました。ソースを見るとデフォルトは60秒のようです。ログとも一致してますね。

同期タイムアウト時のAirflowログ

業務では非機能要件も大事だと思います。 Airlfowでは、ExternalTaskSensorの引数でtimeoutがあるようです。 こちらでタイムアウト(default: 1週間ですかね、長っ)を適切に設定して、エラー検知をしてみたいと思います。

sample_dwh_to_file_daily_for_sync_hourly/wait_for_sync_db_to_dwh_TABLE_HOURLY_1/2020-12-09T17:00:00+00:00/4.log.
[2020-12-11 18:21:14,539] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,551] {taskinstance.py:880} INFO - Starting attempt 4 of 9
[2020-12-11 18:21:14,552] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,563] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_HOURLY_1> on 2020-12-09T17:00:00+00:00
[2020-12-11 18:21:14,568] {standard_task_runner.py:53} INFO - Started process 84130 to run task
[2020-12-11 18:21:14,625] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-11 18:21:14,639] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:22:14,701] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:23:14,764] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:24:14,828] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,892] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,898] {taskinstance.py:1145} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
    raise AirflowSensorTimeout('Snap. Time is OUT.')
AirflowSensorTimeout: Snap. Time is OUT.
[2020-12-11 18:25:14,899] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=sample_dwh_to_file_daily_for_sync_hourly, task_id=wait_for_sync_db_to_dwh_TABLE_HOURLY_1, execution_date=20201209T170000, start_date=20201211T092114, end_date=20201211T092514
[2020-12-11 18:25:15,070] {logging_mixin.py:112} INFO - [2020-12-11 18:25:15,070] {local_task_job.py:103} INFO - Task exited with return code 1

hourlyに変更して実施したログですが、「Snap. Time is OUT.」と良い感じ(?)にエラーとなってくれました。 業務上では、お目にかかりたくないログ内容ですが...

他にも色々あるようですが、この辺りを抑えておけば基本的な使い方は抑えたことになるかと思います。

所感

実際にAirflow同期処理をやってみて思ったのですが、

  • 同期を取る対象のoperatorをどれにするか
  • 同期を取るoperatorの時刻差はどれだけあるか
  • 非機能要件にあったタイムアウト設定で適切にエラーで落とそう
  • 複雑になるとDAG間の関係性がわかるグラフをWebUIでサクッとみたい

が、気になりました。

「同期を取る対象のoperator」は「terminal_operator_<テーブル名>」としました。
ダミー処理なので冗長にはなってしまいますが、各並列処理毎に加えておいたほうがDAG修正時の同期影響を意識しなくて良くなるのかなと思ったためです。

また、DummyOperator利用時に 「trigger_rule='none_failed'」の引数を付け加えないと、先に実行されたケースもありましたので注意が必要そうです。

「同期を取るoperatorの時刻差」ですが、何度も繰り返してると混乱してしまう可能性もあるので、テストでも十分に気をつけて対応しないといけないですよ(to自分)。実際の業務でもtimedeltaで指定する時刻に誤りをテストで見落としてしまい、その結果、リリース後に同期処理が想定時間通りに終わらず遅延してしまい、外部連携のタイミングに間に合わなくなり問題になってしまいました...。

今回の検証例は、お互いdaily実行だったのですが、頻度が異なると慎重な対応が求められそうです。 ただ、この辺の制御はAirflowのコアな制御なので、今後、利用者が意識しないで済むようにdag_idとオプション引数で渡したら、同期を取ってくれるような機能があると良いなとも思いました。利用頻度が多くなると、そういった実装も検討しないといけないのかもしれません。

あとは、WebUI上にてDAGの関係性がグラフで見れると、意図したoperatorで同期が取れているかの確認がやりやすいと思いました。 見方が分からないだけなのか、未実装なのか把握できてないのですが、WebUIをみる限り今回のAirflowのバージョン1.10.10ではなさそうです。

最後に

Airflowでの同期処理について少しでも伝わればと思い、このテーマで記事を書いてみました。 本記事を通して少しでもイメージを掴めて頂けますと幸いです。 他にも面白そうな機能はあると思いますので、また、機会があれば投稿したいと思います。

私からは以上となります。最後までお読み頂きありがとうございました。

明日の記事の担当はエンジニアの高山さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

マニフェストファイルに思いを馳せる

こんにちは。BUYMAの検索やデータ基盤周りを担当している竹田です。
この記事は Enigmo Advent Calendar 2020 の19日目の記事です。

エニグモに入社してGCPAWSといったクラウドサービスを利用することが多くなり、日々刺激を受けながら業務に従事しております。
その中でもKubernetesのようにシステムを「宣言的」に定義するモデルに技術進化の恩恵を感じており、自分の体験も踏まえて、クラウドサービスでKubernetesを一般利用するに至るまでどういう歴史的経緯があるのかを辿ってみたくなりました。
(実際Kubernetesで編集するファイルも「マニフェスト(≒宣言)ファイル」といいますね)
なお、記事内容には主観を含む部分や、内容を簡素化するため端折っている部分もありますので、あらかじめご了承ください。

chroot

まずはKubernetesで管理されるコンテナの歴史を探ってみたいと思います。
コンテナ技術の前身は1979年に登場した chroot と言われています。
chrootで特定階層以下をrootディレクトリとすることで、システムとの分離を実現することができます。
変更したディレクトリ配下には動作中のOSと同じディレクトリ構造を実体として持つ必要があり、利用シーンとしてはsshログインしたユーザにディレクトリ移動の制限をかけるような場合でしょうか。個人としてはあまり使った記憶はありません。
少なくともリソースの制限を設けることやディレクトリ階層をイメージのような形で持つことはできませんでした。

コンテナのベース技術

コンテナのベースとなる技術は2000年にFreeBSDから発表された FreeBSD jail という機構です。 カーネルに手を加えてOSレベルでの仮想化機構を実現しており、ファイルシステムやネットワークなどの分離ができるようです。
このFreeBSD jailは知らなかったのですが、当時は業務で Solaris を利用しており、すでに2005〜2006年頃にはSolarisコンテナという概念が出てきていたことを記憶しています。

また、この頃はまだあまりLinuxは表舞台には出てきていませんでした。10数年前当時は商用製品であることが重視されていたと思います。
当時のLinuxSolarisなどの商用UNIXと比較すると

  • OSとしての安定性が高くなかった
    • OOM Killerに悩まされたこともしばしば
  • 比較的スループットが重視されていた
    • とりあえずタスク間がフェアじゃなかった
  • リソース管理やデバッグ面が弱かった など

こういった背景もあり、ミッションクリティカルなシステムでは商用UNIXを選択するのが一般的でした。

ただ、2000年台初頭から、OSS(Open Source Software) が台頭してきました。
開発者のニーズにマッチしていたことや、 Red Hat社 のようなビジネスモデルを確立できたことなどが、OSSを後押しした背景にあると思います。

ちなみにコンテナが出てくるまでの仮想化技術の筆頭は VM(VirtualMachine) ですが、本記事では割愛します。

cgroups

Linuxにおけるコンテナは cgroups が根幹となっています。
カーネルの機構によりプロセスをグループ単位にして、そのグループ内でリソース配分や制限を行う技術です。
少しだけcgroupsには関わっていたこともあり、Linux商用UNIXに追いつきそうだなと感じていた頃でした(おそらく2008〜2009年頃)。
ただし、設定方法が特殊であり、一般利用するにはかなり難易度が高いものでした。

この後、cgroupsを管理できる LXC というソフトウェアが出ています。
ポータビリティ性が低かったのか、使い勝手が良くなかったのか、、なぜかあまり脚光は浴びていませんね。

Docker

言わずと知れたDocker社が開発したコンテナを管理するソフトウェアです。
自分の印象では利用者がcgroupsを意識することなく利用でき、それをコードベースで管理できる柔軟なラッパー、、という解釈です。
ざっくりと表現するとLinuxでは以下のイメージです。
f:id:enigmo7:20201214004444p:plain
近年のCPUパフォーマンスやSSD等によるI/Oパフォーマンスの向上、かつVMよりも遥かに軽量でポータビリティ性が高く、簡素に利用できるということもあり、主に開発者の間で一気に広まったように思います。

Kubernetes

Google社が2013年に発表した コンテナオーケストレーションツール です。
この当時すでに大量のシステムをコンテナ化していた事実に衝撃を受けたのを覚えています。
kubernetesは紆余曲折ありましたが、現在の標準になっているものと思います。
宣言的な記載により、ブルーグリーンデプロイメントやローリングアップデートが非常に簡素に実現できるようになりました。

  • 宣言的
    • システムがどういう状態にあるべきかを記述する
    • 問題があった場合もその状態になるよう再構成する
    • 内部アプリの修正やバージョンアップは、修正適用済みコンテナに置き換える

解釈は難しいです。今こうある状態が大事、というニュアンスでしょうか。
対義語としては命令的、ということなので、こうしてああしたらその状態になる、というニュアンスですかね。

思えば、トラブル発生の際はシステムを修復・復旧させることが一般的でした。
ステートレスシステム(状態維持が不要なシステム)では、もはや宣言的アプローチが一般的になっていると思います。

コンテナ、およびKubernetesのサービス利用

コンテナ技術は開発者にとっては非常に都合の良いものでしたが、実サービスでの利用では、となると敷居が高かったように思います。
自分の当時の立場で記載すると、以下のような理由からでした。

  • 枯れた技術ではない
  • リソース分割はVMで十分満足できている
  • Kubernetesをオンプレ環境で管理するのはハードルが高い
  • 新技術を使いたいという理由では上長(ひいては経営層)を納得させられない

クラウドサービスによるサポート

クラウドサービスでのサポート開始により、サービス利用の敷居が一気に下がったものと思います。
Google GKE、Amazon EKS、Microsoft AKS などが挙げられます。

コンテナオーケストレーションでは大量のシステムを管理・運用することに長けており、クラウドサービスの柔軟性と非常に相性が良いと思います。

  • マネージドなKubernetesにより管理を簡素化できる
  • サポートを受けられる
  • コスト面も使った分だけ

。。となると使わない手はないだろうな、という印象です。

終わりに

実際にはもっと複雑な背景があるとは存じますが、概要としてはこのような流れかなと思います。
「宣言的」概念はシステムのあるべき形だと感じつつも、まだステートレスなシステムでのベストプラクティスなのかなという感触です。
ステートフルシステム(状態維持が必要なシステム、例えばDBサーバなど)でも利用はできますが、まだちょっと厳しいので今後より良い概念・機能が出てくるのかな、という期待があります。

おじさんエンジニアとしては過去に思いを馳せつつも、これからも技術のビッグウェーブには乗っていきたいので日々勉強・キャッチアップが本当に大事ですね。

最後まで読んでいただきありがとうございました。
明日の記事担当はデータエンジニアの谷元さんです。よろしくお願いします!


株式会社エニグモ 正社員の求人一覧

hrmos.co

old schoolerなネットワークエンジニアがIAP Connectorを試してみた

お疲れ様です。インフラチームの山口です。 この記事は Enigmo Advent Calendar 2020の18日目の記事となります。

2020年はコロナ禍でほぼ全社的にリモートワークになったこともあり、 前職のネットワークエンジニアだった頃のWANやビデオ会議の思い出を思い返す機会が多い一年でした。 強く思い出に残っているのは大概、障害と機器の不具合などのトラブル系しかなく、それだけでお腹いっぱいになる感じです。

話は変わりますが、アドベントカレンダーは業務から少しずらした内容で書くポリシーなので、現行の業務とはあまり関係ない、リモートワークに伴ってよく起こりがちなネットワークのごまかしの話をします。 今年、私の業務何やってたかなというとEKSの運用サポートとオンプレの保守対応が多かったので、業務からずれた内容をエイヤで書き下します。

1.はじめに

本記事はオフィスおよびオンプレミスのDCのPublicIPのみをホワイトリストに登録したS3バケット上の画像を、リモートワーク下の各ご自宅から閲覧する方法を考えた際の検討事項を整理したものです。

IAP Connectorをタイトルに入れていますが、本記事内ではZero Trust的な機能は使用しません。 IAP ConnectorはGKEに簡単にデプロイできるNAT箱として使っています。 そのため、Beyond Corp Remote AccessやCloud IAPについても本記事では特に触れません。

2.問題設定と前提条件

問題設定とその問題を解くにあたっての前提条件(おもに弊社のVPN関連の構成など)を説明します。 まず、何をやりたいかの問題設定を概説し、その後に付随する前提条件を説明します。

2-1.問題設定

解きたい問題を以下に記載します。

  • オフィスとDCのPublicIPをホワイトリストに登録したS3バケット上の画像を、リモートワーク下の自宅から閲覧できるようにしたい
  • エンジニア以外のメンバーも閲覧できるようにしたい

2-2. 前提条件

2-1で説明した問題設定に加えて方法検討の際の制約になりうるNW構成などを思いつく限りに記載します。以降の節でなんやかんや本節の理由を参照して、対応案を絞っていきます。

  1. オンプレミスのDCにあるVPNサーバでL2TP/IPSecを終端する
  2. VPN接続後のクライアントPCの経路はスプリットトンネリング(トンネルインタフェースにはデフォルトルート向けてない)
  3. クライアントPCは、エンジニアはMacだが非エンジニアはWindows
  4. VPNサーバからクライアントに経路をpushすることは可能
  5. オンプレミス環境はパブリッククラウドに移行中のため余計なリソースを増やしたくない

また、概要を簡単に記載した図を以下に示します。

f:id:enigmo7:20201215105050p:plain
図1

3.方法検討

検討した方法を以下表と図に記載します。 方法は3種類に大別されます。タイトルにIAP Connectorを試してみたと記載しているタイトルの通り最終的には、3bを選択することになるのは明白ですが、建前として各案のPros/Consを考えていきます。

小項目 方法
案1.愚直案 S3バケットのIPアクセス制限に各自の自宅のPublicIPを登録する
案2.ルーティングでごまかし案 2a VPNサーバでS3向けの経路をPushしDC経由にする
2b sshuttleなどを使用しクライアント側でルーティングを調整する
案3.Proxy建てる案 3a DCにProxyを建てる
3b IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加

f:id:enigmo7:20201215105210p:plain
図2

案1.愚直案

S3バケット側で愚直に各ご自宅のPublicIPをホワイトリスト登録すればいいだけというのは、そのとおりでそれで済むならこの記事をグダグダ書いてる意味ないじゃんという形になってしまうので、半ば無理矢理感はありますが一旦は却下します。 これは人によって意見分かれそうですが、プライベートで契約しているPublicIPの情報をCloudFormationのテンプレートなどに書いて、Gitの履歴に残したくなさがあるので個人的には微妙かなという印象が強いです。

  • pros
    • 一番シンプル
  • cons
    • アクセスする社員の数増えたら運用手間かもしれない。
    • そもそも、ご自宅が固定IPとは限らないケースもある。
    • 現状はCloudFormationでS3バケット管理しているが、個人の自宅のPublicIPをテンプレートに残してしまうのって良いのか? なんか嫌じゃない?

案2. ルーティングでごまかし案

ルーティングでごまかし案は2a、2bに分かれます。 2aと2bの違いはルーティングの調整をVPNサーバ側で行うか、クライアントのPC側で行うかの違いです。

案2での基本方針は以下からS3で使用されるPublicIPのレンジを確認してそのIPレンジ向けの通信をDC経由にします。

2a、2bのpros/consを記載します。 正直自分しか使わないのなら他の環境に影響少なくて手軽な2bでローカルのPCでルーティングいじってごまかすぞ、となるのですが、 今回の要件的には他メンバーにも展開する可能性があるので却下します。 記事まとめている途中で、わざわざS3のPublicIPのレンジ全部経路切らなくてもDNS引いた結果をhostsに追加&それ向けのホストルートをトンネルインタフェースに切れば良いんじゃないかと思いましたが他の人に展開するという観点ではやっぱり却下です。

  • 2a: VPNサーバでS3向けの経路をPushしDC経由にする

    • pros
      • クライアントPC側の作業は楽
    • cons
      • この要件のためだけにVPNサーバの経路調整したくない
      • S3のPublicIPのレンジをすべてDC経由にした場合に意図しない影響って本当に出ないんだっけ?
  • 2b: sshuttleなどを使用しクライアント側でルーティングを調整する

    • pros
      • VPNサーバ側の設定を変えなくて済む
      • 当該S3バケットを参照したい人だけDC経由になるので、2aより意図しない副作用は少なそう
    • cons
      • エンジニアならローカルで経路調整したりできそうだけど、非エンジニアの人には難しいかもしれない

案3. Proxy建てる案

Proxy建てる案はどこにProxyを建てるかどうかで3a、3bに分かれます。 3aはDCにProxyを建てる案です。DCにnginxのProxy建ててアクセスしている実績はあるのですが、オンプレのリソースは極力減らしていっている状況なので却下します。

  • 3a: DCにProxyを建てる
    • pros
      • VPNのルーティングは調整しなくて済む
      • 同様のProxyはDCにも何台かすでにある
    • cons
      • DCに余計なリソースが増える
      • 運用手間
  • 3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加
    • pros
      • VPNのルーティングは調整しなくて済む
      • Deployment Managerでデプロイは楽にできそう
    • cons
      • 動作確認は必要
      • GKEのクラスタがデプロイされるのでコスト面は大丈夫?

方法決定

一旦案をまとめます。 5案だしましたが、以下の3案が確度高く現実的には行けそうな感じがします。 案1、案2bはこの検討時点では動作することが分かっていたので、案3bの構成の検討を次節で行います。 案3bの検討事項ではコスト面と動作確認がconsとして上がっていますが、本記事ではとりあえず動作確認だけを行います。

  • 案1.愚直案
    • とりあえずはこれできるけど面白みはない
  • 案2b: sshuttleなどを使用しクライアント側でルーティングを調整する
    • エンジニアは拾えるかつ楽だけど非エンジニアは拾えない
  • 案3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加
    • 検証はした方が良いが意外と楽にできそう

4.リソース作成・動作確認

本節では 案3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加 の説明を簡潔に行います。 IAP Connectorのデプロイは基本的には以下公式の手順に従います。

最終的な構成図を以下に示します。 構成上、Deployment Managerのテンプレートでは、IAP ConnectorのNAT後のPublicIPとGCLBにアサインするPublicIPも同じテンプレートでデプロイされてしまうようですが、以下の理由から、PublicIPはコンソールでリソースを作成しそれをテンプレートから参照するように修正しました。また、とりあえずぱっとデプロイできて動きだけ確認できればいいので、Terraformで作成する選択肢は取らず、雑にDeployment Managerのテンプレート一枚で作成しました。

  • GKEクラスタはなにか問題が出たら潰して再構築する程度の温度感の運用を想定
    • ambassadorのPodの問題判別やクラスタ運用は極力しない手抜き前提
  • PublicIPはGKEクラスタと合わせて潰れてしまうとS3バケットホワイトリスト設定やDNS設定が手間
    • 所謂ライフサイクル異なるリソースは別テンプレートにというやつ(今回PublicIPはCLIでリソース作ったけれど)

f:id:enigmo7:20201215105259p:plain
図3

5.まとめ

本記事では、IAP Connectorを容易にデプロイできるNAT箱として利用しました。 Cloud IAP側でZero Trust的な制御を入れたとしても、IAP ConnectorからS3バケットのアクセス制御は昔ながらのIP制限方式になってしまっているなど、ツッコミどころや深堀りする余地はありますが、 S3バケットの前段に設置する構成については、一応当初の目的通りの動作を確認できました。

また、この記事とは関係ない半ば余談の愚痴ですが、2020年の夏頃にIAP Connectorを検証したのですが、プロキシ先からのレスポンスの書き換えができなくて困って、IAP Connectorに対する熱が下がったという個人的な経緯もあり、VPNもしくは既存のProxyの置き換え手段として手放しで愚直に推せる仕組みではないなというのが率直な印象でした。ただ、活用できる局面は多そうなので、引き続き継続的にキャッチアップしていきたいなと思います。

しかしながら、面倒でもGKEにnginxのProxyを建ててそれをCloud IAPで守った方がIAP Connectorより融通聞くのではという気持ちもあり、目的とそれにかける手間のバランスを見極めるのは難しいなと思いました。結論は毒にも薬にもならない感じなのですが終わります。

明日の記事の担当は、データテクノロジーグループの竹田さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

Dockerfileのベストプラクティスとセキュリティについて

こんにちは、主に検索周りを担当しているエンジニアの伊藤です。

この記事は Enigmo Advent Calendar 2020 の 17 日目の記事です。

みなさんは適切なDockerfileを書けていますか?とりあえずイメージのビルドが出来ればいいやとなっていませんか? 今回は自戒の意味も込めて、改めてDockefileのベストプラクティスについて触れつつ、 そもそもDockerfileを書かずにコンテナイメージをビルドする方法とコンテナセキュリティに関する内容についてまとめてみました。

Dockerfileのベストプラクティス

ご存知の方も多いと思いますが、こちらがDocker社が推奨するベストプラクティとなっています。 せっかくなので事例を交えていくつかピックアップしてみます。

イメージサイズは極力小さくしよう

  • 軽量なベースイメージを選択する
  • 不要なパッケージはインストールしない
  • レイヤはなるべく減らす
    • RUN/COPY/ADDだけがレイヤを増やすのでこれを使用するときに意識しましょう。
      • RUNで実行するコマンドは極力&&で連結する
      • 可能な場合はマルチステージビルドを利用する
      • ADDを使用したアンチパターン(下記の例ではADDによって圧縮ファイルを含んだレイヤが余計に作成されてしまう)
        • アンチパターン

          ADD http://example.com/big.tar.xz /usr/src/things/
          RUN tar -xJf /usr/src/things/big.tar.xz -C /usr/src/things
          RUN make -C /usr/src/things all
          
        • 推奨例

          RUN mkdir -p /usr/src/things \
          && curl -SL http://example.com/big.tar.xz \
          | tar -xJC /usr/src/things \
          && make -C /usr/src/things all
          

ビルドキャッシュを活用しよう

イメージをビルドするとき、DockerはDockerfileに書かれた命令を上から順番に実施します。 その際、各命令毎にキャッシュ内で再利用できる既存のイメージを探しますが、なければ以降のキャッシュは破棄されます。 そのため、更新頻度が高いものをDockerfileの後ろの方に記載することが重要になります。 例えば下記はappというアプリケーションコードを含むディレクトリをコンテナにコピーし、 pip installによって必要なライブラリをインストールする例です。

COPY app /tmp/
RUN pip install --requirement /tmp/requirements.txt
  • 推奨例
COPY requirements.txt /tmp/
RUN pip install --requirement /tmp/requirements.txt
COPY app /tmp/

一見すると前者の方がレイヤが少ない分、良さそうに見えますが、 app配下のコードに変更が入るたびにライブラリのインストールも行われ、 その分ビルド時間が伸びてしまいます。

Dockerfileに関する悩みどころ

ここまでDockerfileに関するベストプラクティスについて触れてきましたが、Dockerfileを作成、メンテするのって大変ではないですか?

  • どのベースイメージを使用すべきか?
  • イメージサイズが大きくなりすぎる
    • イメージサイズの削減を頑張ってたら時間が溶けた(開発作業に専念したいのに。。。)
  • Dockerfile自体のメンテが辛い
    • イメージサイズを小さくしようと思うとDockerfile自体の可読性が下がるというつらみ
  • ベストプラクティスを意識することが自体が辛い
  • セキュリティ的な懸念
    • 使用するベースイメージに脆弱性が含まれていないかなど

Dockerfileを書かないという選択肢

そこで続いてのお話がBuildpackについてです。 こちらを利用することでDockerfileを書くことなく、ソースコードからコンテナイメージを生成することが可能になるというものです。

Buildpack

  • 2011年にHerokuが考案し、Cloud FoundryGitlabKnative等で採用されている仕組み
  • 様々な言語のBuildpackを使ってユーザのアプリケーションコードに対して、「判定」、「ビルド」、「イメージ化」といった一連の流れを実施する事によって、基盤上で動作可能な形にアプリケーションコードを組み立てる

Cloud Native Buildpacks

  • 上記のHerokuオリジナルと呼ばれるBuildpackが特定の実行基盤でしか動作しないというでデメリットがあったのに対し、Dockerの急速な普及を背景に、OCIイメージのようなコンテナ標準を採用したイメージを作成しようと始まったのがCloud Native Buildpacks(以降 CNBと略) Projectです。
  • HerokuとPivotalが中心となって2018年1月にCNCF傘下でスタートし、現時点でCNCFのSandboxプロジェクトという立ち位置になっています
  • 以降はこちらのCNBについての概要について記載します

CNBの仕組み

CNBを利用してイメージを生成する際はビルダーというものを指定します。 ビルダーはアプリのビルド方法に関するすべての部品と情報をバンドルしたイメージとなっており、複数のbuildpacklifecyclestackで構成されています。

f:id:pma1013:20201214095357p:plain
公式サイトから引用

  • buildpack
    • ソースコードを検査し、アプリケーションをどうビルドし実行するかを決める
  • lifecycle
    • buildpackの実行を調整し、最終的なイメージを組み立てる
  • stack
    • ビルド及び実行環境用のコンテナイメージのペア

デモ

基本的にCNBを利用して運用していく際には、自前のビルダーを作成することになると思います。 今回はお試しということで、すでにあるビルダーを使って試してみたいと思います。

  • 前提条件
    • ローカル環境にDocker及びBuildpackがインストール済みであること
  • サンプルコード
    • Flaskを利用したWebアプリケーション(単純にHello Worldと出力するだけのもの)
    • 構成としては下記の通りで最低限のファイルのみ配置しています。
              .
              ├── requirements.txt
              └── src
                  ├── __init__.py
                  ├── app.py
                  └── templates
                      └── index.html
  • ビルド
    それではpackコマンドを使ってビルドしてみましょう。
$ pack build sample-cnb:0.0.1
Please select a default builder with:

    pack set-default-builder <builder-image>

Suggested builders:
    Google:                gcr.io/buildpacks/builder:v1      Ubuntu 18 base image with buildpacks for .NET, Go, Java, Node.js, and Python
    Heroku:                heroku/buildpacks:18              heroku-18 base image with buildpacks for Ruby, Java, Node.js, Python, Golang, & PHP
    Paketo Buildpacks:     paketobuildpacks/builder:base     Ubuntu bionic base image with buildpacks for Java, .NET Core, NodeJS, Go, Ruby, NGINX and Procfile
    Paketo Buildpacks:     paketobuildpacks/builder:full     Ubuntu bionic base image with buildpacks for Java, .NET Core, NodeJS, Go, PHP, Ruby, Apache HTTPD, NGINX and Procfile
    Paketo Buildpacks:     paketobuildpacks/builder:tiny     Tiny base image (bionic build image, distroless-like run image) with buildpacks for Java Native Image and Go

Tip: Learn more about a specific builder with:
    pack inspect-builder <builder-image>

packコマンドを実行すると上記のようにビルダーを指定しろと言われます。 今回はここでおすすめされている Google Cloud Buildpacks を利用して実行します。

$ pack build sample-cnb:0.0.1 --builder gcr.io/buildpacks/builder:v1
v1: Pulling from buildpacks/builder
Digest: sha256:f0bb866219220921cbc094ca7ac2baf7ee4a7f32ed965ed2d5e2abbf20e2b255
Status: Image is up to date for gcr.io/buildpacks/builder:v1
v1: Pulling from buildpacks/gcp/run
Digest: sha256:83eb67ec38bb38c275d732b07775231e7289e0e2b076b12d5567a0c401873eb7
Status: Image is up to date for gcr.io/buildpacks/gcp/run:v1
===> DETECTING
google.python.runtime            0.9.1
google.python.missing-entrypoint 0.9.0
google.utils.label               0.0.1
===> ANALYZING
Previous image with name "sample-cnb:0.0.1" not found
===> RESTORING
===> BUILDING
=== Python - Runtime (google.python.runtime@0.9.1) ===
Using runtime version from .python-version: 3.7.8
Installing Python v3.7.8
Upgrading pip to the latest version and installing build tools
--------------------------------------------------------------------------------
Running "/layers/google.python.runtime/python/bin/python3 -m pip install --upgrade pip setuptools wheel"
Collecting pip
  Downloading pip-20.3.1-py2.py3-none-any.whl (1.5 MB)
Collecting setuptools
  Downloading setuptools-51.0.0-py3-none-any.whl (785 kB)
Collecting wheel
  Downloading wheel-0.36.2-py2.py3-none-any.whl (35 kB)
Installing collected packages: pip, setuptools, wheel
  Attempting uninstall: pip
    Found existing installation: pip 20.1.1
    Uninstalling pip-20.1.1:
      Successfully uninstalled pip-20.1.1
  Attempting uninstall: setuptools
    Found existing installation: setuptools 47.1.0
    Uninstalling setuptools-47.1.0:
      Successfully uninstalled setuptools-47.1.0
Successfully installed pip-20.3.1 setuptools-51.0.0 wheel-0.36.2
Done "/layers/google.python.runtime/python/bin/python3 -m pip inst..." (6.427479028s)
=== Python - pip (google.python.missing-entrypoint@0.9.0) ===
Failure: (ID: 194879d1) Failed to run /bin/build: for Python, an entrypoint must be manually set, either with "GOOGLE_ENTRYPOINT" env var or by creating a "Procfile" file
--------------------------------------------------------------------------------
Sorry your project couldn't be built.
Our documentation explains ways to configure Buildpacks to better recognise your project:
 -> https://github.com/GoogleCloudPlatform/buildpacks/blob/main/README.md
If you think you've found an issue, please report it:
 -> https://github.com/GoogleCloudPlatform/buildpacks/issues/new
--------------------------------------------------------------------------------
ERROR: failed to build: exit status 1
ERROR: failed to build: executing lifecycle: failed with status code: 145

今度は上記のようなエラーが出力されます。 どうやらDockerfileのentrypointに相当する GOOGLE_ENTRYPOINTを設定する必要があるようです。 該当のオプションを追加して下記の通り再トライしてみます。

$ pack build sample-cnb:0.0.1 --builder gcr.io/buildpacks/builder:v1 --env GOOGLE_ENTRYPOINT="flask run --host 0.0.0.0 --port 5000"
〜省略〜
Adding cache layer 'google.python.pip:pip'
Adding cache layer 'google.python.pip:pipcache'
Successfully built image sample-cnb:0.0.1

上記のようにSuccessfullyと出力されれば無事にコンテナイメージのビルドは完了しています。 作成されたイメージを確認してみましょう。

REPOSITORY              TAG           IMAGE ID       CREATED         SIZE
sample-cnb              0.0.1         4c60a192da62   40 years ago    289MB

sample-cnbというイメージが作成されていることが確認できました。 ここで気になるのは作成日が40 years agoとなっていることです。 これについては公式サイトに記載がありましたが、 どうやら再現可能なビルドを目的とした意図的な設計のようです。

  • コンテナ起動
    ビルドしたコンテナを起動して正常に動作することを確認します。 下記コマンドでコンテナを起動して、
    $ docker run --rm -p 5000:5000 -e FLASK_ENV=development sample-cnb:0.0.1
    こちらにアクセスすると、下記の画面が表示されることが確認できました。

f:id:pma1013:20201214165419p:plain

  • Dockerfileを使ったビルド
    最後に比較のためにDockerfileを利用したビルドも行います。

    • Dockerfileの準備
FROM python:3.7

WORKDIR /app

COPY requirements.txt /app

RUN pip install -r requirements.txt

COPY src /app/

ENV FLASK_APP=/app/app.py

ENTRYPOINT ["flask", "run"]
CMD ["--host", "0.0.0.0", "--port", "5000"]
  • ビルド
    $ docker build -t sample-df:0.0.1 .

  • 比較
    Dockerfileベースでビルドしたイメージは下記の通りとなります。 CNBで作成したイメージの方が軽量なOSが利用されていることが分かります

REPOSITORY              TAG           IMAGE ID       CREATED          SIZE
sample-df              0.0.1           9a5c14fd1846   14 seconds ago   928MB

CNBのメリット

CNBのメリットをざっとまとめると下記のような感じになるかと思います。

  • 開発に注力できる
    • 開発者はDockerfileを作成、メンテすることから開放される
  • 持続可能な運用
    • スケーラブルなセキュリティ対応
      • 散在しがちなDockerfileすべてにおいて脆弱性対応などしていくのは現実的ではない

セキュリティについて

私のコンテナセキュリティに対する知識としては、下記のようなレベルのものでした。

  • コンテナにおけるセキュリティって何すればいいの?
  • そもそもコンテナに限らず何をすればセキュリティちゃんとしてますって言えるの?

という訳でコンテナにおけるセキュリティ基準やツールとしてはどういったものがあるのかを調査した結果をまとめます。

概要

コンテナにおけるセキュリティ基準

コンテナの脆弱性スキャン

  • コンテナ環境もオンプレ同様にOSのライブラリやパッケージなどから構成されるため、これまで通り脆弱性対策が必須である
  • それに加えてコンテナイメージ、ランタイム環境の脆弱性にも配慮する必要がある

ツールの活用

とりあえず手軽に上記のセキュリティ基準チェックと脆弱性スキャンを行いたいというモチベーションの元、以前から気になるツールをピックアップしました。

dockle

https://github.com/goodwithtech/dockle

trivy

https://github.com/aquasecurity/trivy

  • 概要
    • コンテナの脆弱性スキャンツール
  • 使い方
    • trivy [イメージ名]

デモ

ここで上記で作成したコンテナイメージ(Dockerfileから作成したイメージとCNBで作成したイメージ)をそれぞれのツールにかけた場合にどういった結果になるか確認してみたいと思います。

Dockerfileベース

まずはDockerfileからビルドしたイメージの方です。

  • dockle
    • WARNレベルが1件検知されました。
$ dockle sample-df:0.0.1
WARN    - CIS-DI-0001: Create a user for the container
    * Last user should not be root
INFO    - CIS-DI-0005: Enable Content trust for Docker
    * export DOCKER_CONTENT_TRUST=1 before docker pull/build
INFO    - CIS-DI-0006: Add HEALTHCHECK instruction to the container image
    * not found HEALTHCHECK statement
INFO    - CIS-DI-0008: Confirm safety of setuid/setgid files
    * setuid file: usr/bin/chfn urwxr-xr-x
    * setgid file: usr/bin/ssh-agent grwxr-xr-x
    * setuid file: usr/lib/openssh/ssh-keysign urwxr-xr-x
    * setuid file: bin/umount urwxr-xr-x
    * setgid file: usr/bin/wall grwxr-xr-x
    * setuid file: bin/mount urwxr-xr-x
    * setuid file: usr/bin/gpasswd urwxr-xr-x
    * setuid file: usr/bin/passwd urwxr-xr-x
    * setgid file: usr/bin/chage grwxr-xr-x
    * setuid file: bin/su urwxr-xr-x
    * setuid file: bin/ping urwxr-xr-x
    * setgid file: usr/bin/expiry grwxr-xr-x
    * setuid file: usr/bin/newgrp urwxr-xr-x
    * setuid file: usr/bin/chsh urwxr-xr-x
    * setgid file: sbin/unix_chkpwd grwxr-xr-x
  • trivy
    • こちらは大量の出力結果が表示されるためサマリのみ貼っておきます。 CRITICALなものが69件検知されていることが分かります。
$ trivy sample-df:0.0.1

sample-df:0.0.1 (debian 10.2)
=============================
Total: 2401 (UNKNOWN: 23, LOW: 1291, MEDIUM: 520, HIGH: 498, CRITICAL: 69)

CNBベース

続いてCNBでビルドしたイメージの方を確認してみます。

  • dockle
    • こちらはWARNレベルのものは1件もなく、INFOレベルのものだけが検知されました。
$ dockle sample-cnb:0.0.1
INFO    - CIS-DI-0005: Enable Content trust for Docker
    * export DOCKER_CONTENT_TRUST=1 before docker pull/build
INFO    - CIS-DI-0006: Add HEALTHCHECK instruction to the container image
    * not found HEALTHCHECK statement
INFO    - CIS-DI-0008: Confirm safety of setuid/setgid files
    * setgid file: usr/bin/expiry grwxr-xr-x
    * setuid file: bin/umount urwxr-xr-x
    * setgid file: usr/bin/chage grwxr-xr-x
    * setuid file: usr/bin/newgrp urwxr-xr-x
    * setgid file: usr/bin/wall grwxr-xr-x
    * setuid file: usr/bin/chsh urwxr-xr-x
    * setuid file: bin/su urwxr-xr-x
    * setuid file: usr/bin/passwd urwxr-xr-x
    * setuid file: usr/bin/gpasswd urwxr-xr-x
    * setuid file: usr/bin/chfn urwxr-xr-x
    * setuid file: bin/mount urwxr-xr-x
    * setgid file: sbin/unix_chkpwd grwxr-xr-x
    * setgid file: sbin/pam_extrausers_chkpwd grwxr-xr-x
  • trivy
    • こちらもサマリのみ貼りますが、CRITICALに関しては0件となっています
$ trivy sample-cnb:0.0.1
2020-12-14T19:22:18.244+0900    INFO    Detecting Ubuntu vulnerabilities...

sample-cnb:0.0.1 (ubuntu 18.04)
===============================
Total: 75 (UNKNOWN: 0, LOW: 53, MEDIUM: 20, HIGH: 2, CRITICAL: 0)

この結果からもGoogle Cloud Buidpackを利用してビルドしたイメージの方が軽量かつセキュアな環境であることが分かると思います。

CIへの組み込み

上で紹介したツールはいずれもCIに組み込んで使用することも想定して作られています。 下記のようにオプションを指定して使うことで、CIのタイミングで実行&確認がしやすくなっています。

  • dockle
    • dockle --exit-code 1 [イメージ名]
  • trivy
    • trivy --exit-code 1 --severity CRITICAL --no-progress [イメージ名]

まとめ

今回はDockerfileのベストプラクティスのおさらいと、CNBを利用したコンテナイメージのビルド方法、セキュリティに関してさらっとまとめてみました。

今後もコンテナベースのアプリケーション開発が進むと、 これまで個人、チームレベルで任せていたDockerfileの作成、管理が破綻するのではと感じました。 CNBには組織として統制のとれたコンテナ作成やセキュリティ基準を継続的に満たすことの手段が提供されているので、 その辺りをうまく活用していく必要性を感じでいます。

セキュリティについても検知の仕組みだけでなく、日々の運用の中でいかに対応していくかということが大事だと思うので、 今後も試行錯誤しながら少しずつ前進していければと思っています。

明日の記事の担当はインフラエンジニアの山口さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co


  1. CIS(Center for Internet Security)とは、米国のNSA(National Security Agency/国家安全保障局)、DISA(Difense Informaton Systems Agency/国防情報システム局)、NIST(National Institute of Standards and Technology/米国立標準技術研究所)などの米国政府機関と、企業、学術機関などが協力して、インターネット・セキュリティ標準化に取り組む団体の名称

Rails + マイクロサービスでイベント駆動アーキテクチャを導入した話

はじめに

こんにちは、サーバーサイドエンジニアの@hokitaです。

この記事は Enigmo Advent Calendar 2020 の 16 日目の記事です。

弊社が運営するBUYMAは現状モノレポで管理されており、10年以上も運営しているサービスなのでソースも肥大化していて、メンテナンスが難しくなってきました。 そこで現在、本体から少しずつマイクロサービスに切り離していこうとしています。

その取組の中で配送処理の一部をマイクロサービス化する作業に携わることができました。今回はBUYMA本体と配送サービスとの通信にイベント駆動アーキテクチャを導入した話をしていきます。

イベント駆動アーキテクチャ

マイクロサービスでサービスを切り分ける場合、それぞれ責務が分かれるように分割するかと思います。 しかしサービス間の通信手段によっては各サービスが密になる恐れがあります。

そこでイベント駆動アーキテクチャを利用します。

f:id:hokita:20201215115957p:plain
PublisherとSubscriber

メリットとして

  • PublisherはSubscriberのことを知らない
  • Subscriberはイベントのこと以外知らない
  • Publisher1つに対し、複数のSubscriberを設置できる

このようにPublisherとSubscriberはお互い知らないので、疎の関係を保つことができます。

非同期メッセージングサービス

イベント駆動アーキテクチャを実現するために、非同期メッセージングサービスを利用します。

代表的なものとして

などがあります。

今回は Amazon SNS / Amazon SQS を使用しました。

Amazon SNS

docs.aws.amazon.com

発行者からサブスクライバー (プロデューサーおよびコンシューマーとも呼ばれます) へのメッセージ配信を提供するマネージド型サービスです。

メッセージを発信するものと思って頂ければと思います。メッセージの受け取り先(サブスクライバー)として今回はAmazon SQSですが、他にもAWS Lambdaなどでも取得することが可能です。

なぜSNSを利用するのか

次の図を見て頂ければ分かる通り、SNSがないとBUYMA本体が配送サービスを知っていることになります。つまりになってしまいます。

f:id:hokita:20201215120053p:plain
SNSなしの場合

Amazon SQS

docs.aws.amazon.com

メッセージキューイングサービスです。同じ非同期処理としてsidekiqやResqueを使っているサービスも多いかと思います。

主な特徴としては

  • "at least once"(最低1回)が保証されている
    • 逆に2回以上同じメッセージを取得する可能性がある
    • Redisのようにジョブを失うことがない
  • 順不同(それなりに担保されるのかと思っていましたが全く順不同でした。)

FIFO(First-In-First-Out)キュー

通常キューとは別でFIFOキューというものも使用することができます。

  • 受信する順序が保持される
  • 必ず1回処理される
  • 1つずつ処理される
  • 通常キューより処理が遅くなる

処理順序が決まっている処理(例えば製品価格の変更処理など)で便利かと思います。

ロングポーリング

ショートポーリングとロングポーリングというものがありますが、基本ロングポーリングが良いです。(ショートポーリングはいつ使われるのだろうという感じです。)

ロングポーリングですが最大20秒キューにメッセージがないか待機をして、あれば即座に実行します。(筆者は最初20秒間に溜まったメッセージを処理するのかと思っていましたが、勘違いでした。メッセージを取得したら即座に実行です。)

Shoryuken

github.com

RailsでSQSをジョブキューとして利用するときは現状Shoryuken一択です。Shoryukenを起動することでSQSのキューをポーリングし、キューを取得して処理を実行してくれます。エンキューももちろんできます。

一通りwikiに必要な情報が書かれているので、そちらを読めば問題なく実装できるかと思います。

以下設定ファイルや実装例を記載します。

shoryuken.yml

対象のキューの情報を記載します。

Shoryuken options · phstc/shoryuken Wiki · GitHub

# config/shoryuken.yml

groups:
  purchase_completed:
    concurrency: 1
    queues:
      - ['purchase_completed_queue', 1]
pidfile: ./tmp/pids/shoryuken.pid

※groupsで分けている理由

groups:
  group1:
    concurrency: 1
    queues:
      - ['a_queue', 2] # 重さ2
      - ['b_queue', 1] # 重さ1

このように同じグループに複数のキューを設定している場合、a_queueがメッセージを取得してもb_queueにメッセージがない場合はポーリング時間が終わるまでa_queueの処理は実行されないので注意が必要です。

Processing Groups · phstc/shoryuken Wiki · GitHub

またa_queueb_queueで大量にメッセージがある場合、a_queueb_queueの重さの2倍なので、処理の優先度も2倍になります。

Polling strategies · phstc/shoryuken Wiki · GitHub

shoryuken.rb

SQSの情報を記載します。

# config/initializers/shoryuken.rb

# ロングポーリング
Shoryuken.sqs_client_receive_message_opts = { wait_time_seconds: 20 }

Shoryuken.sqs_client = Aws::SQS::Client.new(
  region: ENV['AWS_REGION'],
  access_key_id: ENV['AWS_ACCESS_KEY_ID'],
  secret_access_key: ENV['AWS_ACCESS_SECRET_KEY']
)

起動

$ bundle exec shoryuken -R -C config/shoryuken.yml

ジョブ

例)決済完了時に実行するジョブ

# app/jobs/purchase_complete_job.rb

# 決済完了時のジョブ
class PurchaseCompleteJob < ApplicationJob
  include Shoryuken::Worker

  shoryuken_options queue: '<キュー名>',
                    auto_delete: true,
                    body_parser: :json

  def perform(_sqs_msg, body)
    message = JSON.parse(body['Message'])

    # 保存処理
  end
end

エンキュー

エンキューも簡単にできます。

# 配送ステータスの通知ジョブにエンキュー
StatusNotificationJob.perform_async(
  status_data: 'some status data'
)

FIFOキュー

前述したFIFOキューですが、Shoryukenでも扱うことができます。 基本通常キューと記述は同じですが、メッセージグループIDを指定することができます。 同じメッセージグループIDだと厳密な順序で、常に1件ずつ処理をします。

例えば取引のステータスを変更したい場合、ステータス変更の順序は重要だが、他の取引同士の順序は気にしない場合はメッセージグループIDに取引IDを指定したら良さそうです。

SomeJob.perform_async(
  csv_id: csv.id,
  message_group_id: <MESSAGE_GROUP_ID>
)

イベント駆動は一筋縄ではいかない話

各技術の説明をしてきましたが、ここからはイベント駆動アーキテクチャ導入時に必ず考慮する点を書いていきます。

冪等性

同じ処理を何度実行しても同じ結果が得られる性質のことです。

冪等 - Wikipedia

Amazon SQSの通常キューでは2回以上同じメッセージを取得する可能性があるので、重複してメッセージを取得することを考慮する必要があります。

今回実装した決済完了イベントでは、取引ID(冪等キー)をUnique Keyにして、DB側で制御するようにしました。

# 配送サービスの配送情報保存処理
def save!
  Delivery.save!(params)
rescue ActiveRecord::RecordNotUnique => e
  # 念の為取引ID(冪等キー)で重複していることを確認
  raise e unless Delivery.exists?(order_id: params[:order_id])

  # ログだけ残して正常終了させる
  Rails.logger.info(
    "#{self.class}##{__method__} Message: #{e}"
  )
end

もしくはFIFOキューを導入することでも解決が可能です。

整合性

モノレポの場合はDBのtransaction機能を利用して、失敗したらロールバックすることが可能です。しかしマイクロサービスだとそう簡単にはできません。

一般的にはTCCパターンやSagaパターンを使用する必要がでてきます。 qiita.com

今回は下記理由で、ロールバック処理を行いませんでした。

  • 連携するサービス数が少ないので今のところ処理がシンプル
  • 外部からの入力事項は送信前にバリデーションされている
  • BUYMA本体から配送サービスに送られてくるメッセージは信頼する
  • BUYMA本体の取引データと配送データに不整合がないかを毎日チェックしている

現状は問題なく動作していますが、今後はもっと堅牢にするためにTCCパターンやSagaパターンを導入する必要がありそうです。

最後に

イベント駆動アーキテクチャに使用した技術と考慮するポイントを書いていきましたが、マイクロサービスは思っている以上に学習することが多く、また実際に作成して経験値を積んでいくことが大事だと実感しました。 今後も実践を経て、レガシーから脱却できるように精進していきたいです。

明日の記事の担当は検索エンジニアの伊藤さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co