Apache Airflowで実装するDAG間の実行タイミング同期処理

こんにちは。
今年4月にエニグモに入社したデータエンジニアの谷元です。
この記事は Enigmo Advent Calendar 202020日目の記事です。

目次

はじめに

コロナ禍の中、皆さんどのようにお過ごしでしょうか。 私はリモートワークを続けてますが、自宅のリモートデスクワーク環境をすぐに整えなかったため、薄いクッションで座りながらローテーブルで3ヶ月経過した頃に身体の節々で悲鳴をあげました。猫背も加速...

さて、エニグモでの仕事も半年以上経過し、データ分析基盤の開発運用保守やBI上でのデータ整備などを対応をさせていただいてますが、今回は社内で利用されているAirflowの同期処理の話をしたいと思います。

尚、こちらの記事を書くにあたり、同環境の過去記事 Enigmo Advent Calendar 2018 がありますので、良ければそちらもご覧ください。

そもそも同期処理とは?

例えば、以下のようなデータ分析基盤上でのETL処理があったとします。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
処理2. DWH上のデータを加工後、外部へCSV連携する

上記の処理は順番に実行されないと必要なデータを含めた抽出ができなくなってしまい、外部へデータ連携できなくなります。そのためには処理1の正常終了確認後に処理2を実施するという、同期処理を意識した実装をする必要があります。

例だとシンプルですが、業務上では徐々にETL処理も増えていき、複雑化していきます。 こうした同期処理は、初めはなくても問題にならないことが多いのですが、徐々に処理遅延が発生してタイミングが合わなくなったり、予期せぬ一部の処理エラーが原因で関連する後続処理が全て意図せぬ状態で動き出してしまったりします。そうならないためにも、設計時点で同期について意識することが大事だと思います。

Airflowによる同期処理

Airflowでは ExternalTaskSensor を使用することで実装可能となります。 Airflowのソース上ではdependという用語が使われているので、Airflowの世界では「同期」ではなく、「依存」と呼んだ方が良いのかもしれません。適弁読み替えていただければ...

では、Airflowでの同期検証用サンプルコードを作成してみましたので、実際に動かしながら検証したいと思います。尚、Airflowバージョンは1.10.10となります。

先ほどのETL処理を例にして、下記の内容で検証してみます。 実際のファイル出力やDBへのロード処理などはここでは割愛して、DummyOperatorに置き換えてます。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
  dag_id: sample_db_to_dwh_daily
  schedule: 日次16:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

処理2. DWH上のテーブルを用いてデータ加工後、外部ツールへCSV連携する
  dag_id: sample_dwh_to_file_daily_for_sync_daily_16
  schedule: 日次17:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

検証時のコード

まずは処理1のサンプルコードになります。

from airflow.models import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='sample_db_to_dwh_daily',
    start_date=datetime(2020, 12, 1),
    schedule_interval='0 16 * * *'
)

tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

for table_name in tables:

    db_to_dwh_operator = DummyOperator(
        task_id='db_to_dwh_operator_%s' % table_name,
        dag=dag
    )

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed'
    )

    db_to_dwh_operator >> terminal_operator

次に処理2のサンプルコードです。

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta


tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

def _dwh_to_file_operator(dag,
                         table_name,
                         depend_external_dag_id,
                         depend_external_dag_execution_delta):

    dwh_to_file_operator = DummyOperator(
        task_id='dwh_to_file_operator_%s' % table_name,
        dag=dag
    )

    sensors = []
    sensors.append(ExternalTaskSensor(
        task_id='wait_for_sync_db_to_dwh_%s' % table_name,
        external_dag_id=depend_external_dag_id,
        external_task_id='terminal_operator_%s' % table_name,
        execution_delta=depend_external_dag_execution_delta,
        allowed_states=[State.SUCCESS, State.SKIPPED],
        dag=dag))

    for sensor in sensors:
        sensor >> dwh_to_file_operator

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed')

    dwh_to_file_operator >> terminal_operator


args = {
    'start_date': datetime(2020, 12, 3)
}

dag = DAG(
    dag_id='sample_dwh_to_file_daily_for_sync_daily',
    default_args=args,
    schedule_interval='0 17 * * *'
)


for table_name in tables:
    _dwh_to_file_operator(
        dag=dag,
        table_name=table_name,
        depend_external_dag_id='sample_db_to_dwh_daily',
        depend_external_dag_execution_delta=timedelta(hours=1)
    )

サンプルをAirflow画面で見ると?

上記2つのDAGをAirflowのWebUIで見ると以下のようになります。

f:id:enigmo7:20201211201546p:plain

同期処理をするoperatorは「wait_for_sync_db_to_dwh 」です。図でも ExternalTaskSensor のアイコンになってますね。 今回は3テーブルあり、それぞれ並列処理を想定しているため、3つあります。 そして、それぞれ、該当するテーブルのterminal_operator処理完了を確認後、後続処理が実行されます。

では上記の同期処理がAirflowのログでどのように表示されるか見ていきたいと思います。

同期遅延なし時のAirflowログ

まずは、既に sample_db_to_dwh_daily が完了していた場合です。 3テーブル毎のログを載せておきます。

TABLE_DAILY_1

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,833] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,840] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,841] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,841] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,841] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,848] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,851] {standard_task_runner.py:53} INFO - Started process 48063 to run task
[2020-12-09 02:00:09,905] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,921] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,924] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,834] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,834] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_2

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,831] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,840] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,840] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,846] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_2> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,848] {standard_task_runner.py:53} INFO - Started process 48062 to run task
[2020-12-09 02:00:09,902] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,916] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_2 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,918] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,921] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_2, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,832] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,832] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_3

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,829] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,837] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,837] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,844] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_3> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,847] {standard_task_runner.py:53} INFO - Started process 48061 to run task
[2020-12-09 02:00:09,901] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,915] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_3 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,917] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,920] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_3, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,831] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,831] {local_task_job.py:103} INFO - Task exited with return code 0

注目して欲しい行は下記となります。
※ 他テーブルのログも同じですので、ここから先は記載を割愛します

[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...

これが同期処理のログになります。 今回は既に正常終了していたため、1回の確認しか行われておらず、その後、後続処理も含めて正常終了してることが分かります。

同期遅延あり時のAirflowログ

次に前提のタスクが遅延して正常終了した場合のAirflowログもみていきたいと思います。

~~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08 17:00:00
~~~
[2020-12-10 02:00:04,174] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,182] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-10 02:00:04,182] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,189] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-08T17:00:00+00:00
[2020-12-10 02:00:04,192] {standard_task_runner.py:53} INFO - Started process 21683 to run task
[2020-12-10 02:00:04,245] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-10 02:00:04,259] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:01:04,322] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:02:04,385] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:03:04,448] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:04:04,511] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...

(省略)

[2020-12-10 10:27:29,163] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:28:29,226] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:29:29,273] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:30:29,298] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,360] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,363] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-10 10:31:29,366] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201208T170000, start_date=20201209T170004, end_date=20201210T013129
[2020-12-10 10:31:33,698] {logging_mixin.py:112} INFO - [2020-12-10 10:31:33,698] {local_task_job.py:103} INFO - Task exited with return code 0

先ほどのPoking行がログに表示され続けていることが分かります。今回は途中でタスクを手動で動かして正常終了させました。

尚、Pokingの間隔はExternalTaskSensorの引数でpoke_interval を設定すると変更ができました。ソースを見るとデフォルトは60秒のようです。ログとも一致してますね。

同期タイムアウト時のAirflowログ

業務では非機能要件も大事だと思います。 Airlfowでは、ExternalTaskSensorの引数でtimeoutがあるようです。 こちらでタイムアウト(default: 1週間ですかね、長っ)を適切に設定して、エラー検知をしてみたいと思います。

sample_dwh_to_file_daily_for_sync_hourly/wait_for_sync_db_to_dwh_TABLE_HOURLY_1/2020-12-09T17:00:00+00:00/4.log.
[2020-12-11 18:21:14,539] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,551] {taskinstance.py:880} INFO - Starting attempt 4 of 9
[2020-12-11 18:21:14,552] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,563] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_HOURLY_1> on 2020-12-09T17:00:00+00:00
[2020-12-11 18:21:14,568] {standard_task_runner.py:53} INFO - Started process 84130 to run task
[2020-12-11 18:21:14,625] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-11 18:21:14,639] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:22:14,701] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:23:14,764] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:24:14,828] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,892] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,898] {taskinstance.py:1145} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
    raise AirflowSensorTimeout('Snap. Time is OUT.')
AirflowSensorTimeout: Snap. Time is OUT.
[2020-12-11 18:25:14,899] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=sample_dwh_to_file_daily_for_sync_hourly, task_id=wait_for_sync_db_to_dwh_TABLE_HOURLY_1, execution_date=20201209T170000, start_date=20201211T092114, end_date=20201211T092514
[2020-12-11 18:25:15,070] {logging_mixin.py:112} INFO - [2020-12-11 18:25:15,070] {local_task_job.py:103} INFO - Task exited with return code 1

hourlyに変更して実施したログですが、「Snap. Time is OUT.」と良い感じ(?)にエラーとなってくれました。 業務上では、お目にかかりたくないログ内容ですが...

他にも色々あるようですが、この辺りを抑えておけば基本的な使い方は抑えたことになるかと思います。

所感

実際にAirflow同期処理をやってみて思ったのですが、

  • 同期を取る対象のoperatorをどれにするか
  • 同期を取るoperatorの時刻差はどれだけあるか
  • 非機能要件にあったタイムアウト設定で適切にエラーで落とそう
  • 複雑になるとDAG間の関係性がわかるグラフをWebUIでサクッとみたい

が、気になりました。

「同期を取る対象のoperator」は「terminal_operator_<テーブル名>」としました。
ダミー処理なので冗長にはなってしまいますが、各並列処理毎に加えておいたほうがDAG修正時の同期影響を意識しなくて良くなるのかなと思ったためです。

また、DummyOperator利用時に 「trigger_rule='none_failed'」の引数を付け加えないと、先に実行されたケースもありましたので注意が必要そうです。

「同期を取るoperatorの時刻差」ですが、何度も繰り返してると混乱してしまう可能性もあるので、テストでも十分に気をつけて対応しないといけないですよ(to自分)。実際の業務でもtimedeltaで指定する時刻に誤りをテストで見落としてしまい、その結果、リリース後に同期処理が想定時間通りに終わらず遅延してしまい、外部連携のタイミングに間に合わなくなり問題になってしまいました...。

今回の検証例は、お互いdaily実行だったのですが、頻度が異なると慎重な対応が求められそうです。 ただ、この辺の制御はAirflowのコアな制御なので、今後、利用者が意識しないで済むようにdag_idとオプション引数で渡したら、同期を取ってくれるような機能があると良いなとも思いました。利用頻度が多くなると、そういった実装も検討しないといけないのかもしれません。

あとは、WebUI上にてDAGの関係性がグラフで見れると、意図したoperatorで同期が取れているかの確認がやりやすいと思いました。 見方が分からないだけなのか、未実装なのか把握できてないのですが、WebUIをみる限り今回のAirflowのバージョン1.10.10ではなさそうです。

最後に

Airflowでの同期処理について少しでも伝わればと思い、このテーマで記事を書いてみました。 本記事を通して少しでもイメージを掴めて頂けますと幸いです。 他にも面白そうな機能はあると思いますので、また、機会があれば投稿したいと思います。

私からは以上となります。最後までお読み頂きありがとうございました。

明日の記事の担当はエンジニアの高山さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

マニフェストファイルに思いを馳せる

こんにちは。BUYMAの検索やデータ基盤周りを担当している竹田です。
この記事は Enigmo Advent Calendar 2020 の19日目の記事です。

エニグモに入社してGCPAWSといったクラウドサービスを利用することが多くなり、日々刺激を受けながら業務に従事しております。
その中でもKubernetesのようにシステムを「宣言的」に定義するモデルに技術進化の恩恵を感じており、自分の体験も踏まえて、クラウドサービスでKubernetesを一般利用するに至るまでどういう歴史的経緯があるのかを辿ってみたくなりました。
(実際Kubernetesで編集するファイルも「マニフェスト(≒宣言)ファイル」といいますね)
なお、記事内容には主観を含む部分や、内容を簡素化するため端折っている部分もありますので、あらかじめご了承ください。

chroot

まずはKubernetesで管理されるコンテナの歴史を探ってみたいと思います。
コンテナ技術の前身は1979年に登場した chroot と言われています。
chrootで特定階層以下をrootディレクトリとすることで、システムとの分離を実現することができます。
変更したディレクトリ配下には動作中のOSと同じディレクトリ構造を実体として持つ必要があり、利用シーンとしてはsshログインしたユーザにディレクトリ移動の制限をかけるような場合でしょうか。個人としてはあまり使った記憶はありません。
少なくともリソースの制限を設けることやディレクトリ階層をイメージのような形で持つことはできませんでした。

コンテナのベース技術

コンテナのベースとなる技術は2000年にFreeBSDから発表された FreeBSD jail という機構です。 カーネルに手を加えてOSレベルでの仮想化機構を実現しており、ファイルシステムやネットワークなどの分離ができるようです。
このFreeBSD jailは知らなかったのですが、当時は業務で Solaris を利用しており、すでに2005〜2006年頃にはSolarisコンテナという概念が出てきていたことを記憶しています。

また、この頃はまだあまりLinuxは表舞台には出てきていませんでした。10数年前当時は商用製品であることが重視されていたと思います。
当時のLinuxSolarisなどの商用UNIXと比較すると

  • OSとしての安定性が高くなかった
    • OOM Killerに悩まされたこともしばしば
  • 比較的スループットが重視されていた
    • とりあえずタスク間がフェアじゃなかった
  • リソース管理やデバッグ面が弱かった など

こういった背景もあり、ミッションクリティカルなシステムでは商用UNIXを選択するのが一般的でした。

ただ、2000年台初頭から、OSS(Open Source Software) が台頭してきました。
開発者のニーズにマッチしていたことや、 Red Hat社 のようなビジネスモデルを確立できたことなどが、OSSを後押しした背景にあると思います。

ちなみにコンテナが出てくるまでの仮想化技術の筆頭は VM(VirtualMachine) ですが、本記事では割愛します。

cgroups

Linuxにおけるコンテナは cgroups が根幹となっています。
カーネルの機構によりプロセスをグループ単位にして、そのグループ内でリソース配分や制限を行う技術です。
少しだけcgroupsには関わっていたこともあり、Linux商用UNIXに追いつきそうだなと感じていた頃でした(おそらく2008〜2009年頃)。
ただし、設定方法が特殊であり、一般利用するにはかなり難易度が高いものでした。

この後、cgroupsを管理できる LXC というソフトウェアが出ています。
ポータビリティ性が低かったのか、使い勝手が良くなかったのか、、なぜかあまり脚光は浴びていませんね。

Docker

言わずと知れたDocker社が開発したコンテナを管理するソフトウェアです。
自分の印象では利用者がcgroupsを意識することなく利用でき、それをコードベースで管理できる柔軟なラッパー、、という解釈です。
ざっくりと表現するとLinuxでは以下のイメージです。
f:id:enigmo7:20201214004444p:plain
近年のCPUパフォーマンスやSSD等によるI/Oパフォーマンスの向上、かつVMよりも遥かに軽量でポータビリティ性が高く、簡素に利用できるということもあり、主に開発者の間で一気に広まったように思います。

Kubernetes

Google社が2013年に発表した コンテナオーケストレーションツール です。
この当時すでに大量のシステムをコンテナ化していた事実に衝撃を受けたのを覚えています。
kubernetesは紆余曲折ありましたが、現在の標準になっているものと思います。
宣言的な記載により、ブルーグリーンデプロイメントやローリングアップデートが非常に簡素に実現できるようになりました。

  • 宣言的
    • システムがどういう状態にあるべきかを記述する
    • 問題があった場合もその状態になるよう再構成する
    • 内部アプリの修正やバージョンアップは、修正適用済みコンテナに置き換える

解釈は難しいです。今こうある状態が大事、というニュアンスでしょうか。
対義語としては命令的、ということなので、こうしてああしたらその状態になる、というニュアンスですかね。

思えば、トラブル発生の際はシステムを修復・復旧させることが一般的でした。
ステートレスシステム(状態維持が不要なシステム)では、もはや宣言的アプローチが一般的になっていると思います。

コンテナ、およびKubernetesのサービス利用

コンテナ技術は開発者にとっては非常に都合の良いものでしたが、実サービスでの利用では、となると敷居が高かったように思います。
自分の当時の立場で記載すると、以下のような理由からでした。

  • 枯れた技術ではない
  • リソース分割はVMで十分満足できている
  • Kubernetesをオンプレ環境で管理するのはハードルが高い
  • 新技術を使いたいという理由では上長(ひいては経営層)を納得させられない

クラウドサービスによるサポート

クラウドサービスでのサポート開始により、サービス利用の敷居が一気に下がったものと思います。
Google GKE、Amazon EKS、Microsoft AKS などが挙げられます。

コンテナオーケストレーションでは大量のシステムを管理・運用することに長けており、クラウドサービスの柔軟性と非常に相性が良いと思います。

  • マネージドなKubernetesにより管理を簡素化できる
  • サポートを受けられる
  • コスト面も使った分だけ

。。となると使わない手はないだろうな、という印象です。

終わりに

実際にはもっと複雑な背景があるとは存じますが、概要としてはこのような流れかなと思います。
「宣言的」概念はシステムのあるべき形だと感じつつも、まだステートレスなシステムでのベストプラクティスなのかなという感触です。
ステートフルシステム(状態維持が必要なシステム、例えばDBサーバなど)でも利用はできますが、まだちょっと厳しいので今後より良い概念・機能が出てくるのかな、という期待があります。

おじさんエンジニアとしては過去に思いを馳せつつも、これからも技術のビッグウェーブには乗っていきたいので日々勉強・キャッチアップが本当に大事ですね。

最後まで読んでいただきありがとうございました。
明日の記事担当はデータエンジニアの谷元さんです。よろしくお願いします!


株式会社エニグモ 正社員の求人一覧

hrmos.co

old schoolerなネットワークエンジニアがIAP Connectorを試してみた

お疲れ様です。インフラチームの山口です。 この記事は Enigmo Advent Calendar 2020の18日目の記事となります。

2020年はコロナ禍でほぼ全社的にリモートワークになったこともあり、 前職のネットワークエンジニアだった頃のWANやビデオ会議の思い出を思い返す機会が多い一年でした。 強く思い出に残っているのは大概、障害と機器の不具合などのトラブル系しかなく、それだけでお腹いっぱいになる感じです。

話は変わりますが、アドベントカレンダーは業務から少しずらした内容で書くポリシーなので、現行の業務とはあまり関係ない、リモートワークに伴ってよく起こりがちなネットワークのごまかしの話をします。 今年、私の業務何やってたかなというとEKSの運用サポートとオンプレの保守対応が多かったので、業務からずれた内容をエイヤで書き下します。

1.はじめに

本記事はオフィスおよびオンプレミスのDCのPublicIPのみをホワイトリストに登録したS3バケット上の画像を、リモートワーク下の各ご自宅から閲覧する方法を考えた際の検討事項を整理したものです。

IAP Connectorをタイトルに入れていますが、本記事内ではZero Trust的な機能は使用しません。 IAP ConnectorはGKEに簡単にデプロイできるNAT箱として使っています。 そのため、Beyond Corp Remote AccessやCloud IAPについても本記事では特に触れません。

2.問題設定と前提条件

問題設定とその問題を解くにあたっての前提条件(おもに弊社のVPN関連の構成など)を説明します。 まず、何をやりたいかの問題設定を概説し、その後に付随する前提条件を説明します。

2-1.問題設定

解きたい問題を以下に記載します。

  • オフィスとDCのPublicIPをホワイトリストに登録したS3バケット上の画像を、リモートワーク下の自宅から閲覧できるようにしたい
  • エンジニア以外のメンバーも閲覧できるようにしたい

2-2. 前提条件

2-1で説明した問題設定に加えて方法検討の際の制約になりうるNW構成などを思いつく限りに記載します。以降の節でなんやかんや本節の理由を参照して、対応案を絞っていきます。

  1. オンプレミスのDCにあるVPNサーバでL2TP/IPSecを終端する
  2. VPN接続後のクライアントPCの経路はスプリットトンネリング(トンネルインタフェースにはデフォルトルート向けてない)
  3. クライアントPCは、エンジニアはMacだが非エンジニアはWindows
  4. VPNサーバからクライアントに経路をpushすることは可能
  5. オンプレミス環境はパブリッククラウドに移行中のため余計なリソースを増やしたくない

また、概要を簡単に記載した図を以下に示します。

f:id:enigmo7:20201215105050p:plain
図1

3.方法検討

検討した方法を以下表と図に記載します。 方法は3種類に大別されます。タイトルにIAP Connectorを試してみたと記載しているタイトルの通り最終的には、3bを選択することになるのは明白ですが、建前として各案のPros/Consを考えていきます。

小項目 方法
案1.愚直案 S3バケットのIPアクセス制限に各自の自宅のPublicIPを登録する
案2.ルーティングでごまかし案 2a VPNサーバでS3向けの経路をPushしDC経由にする
2b sshuttleなどを使用しクライアント側でルーティングを調整する
案3.Proxy建てる案 3a DCにProxyを建てる
3b IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加

f:id:enigmo7:20201215105210p:plain
図2

案1.愚直案

S3バケット側で愚直に各ご自宅のPublicIPをホワイトリスト登録すればいいだけというのは、そのとおりでそれで済むならこの記事をグダグダ書いてる意味ないじゃんという形になってしまうので、半ば無理矢理感はありますが一旦は却下します。 これは人によって意見分かれそうですが、プライベートで契約しているPublicIPの情報をCloudFormationのテンプレートなどに書いて、Gitの履歴に残したくなさがあるので個人的には微妙かなという印象が強いです。

  • pros
    • 一番シンプル
  • cons
    • アクセスする社員の数増えたら運用手間かもしれない。
    • そもそも、ご自宅が固定IPとは限らないケースもある。
    • 現状はCloudFormationでS3バケット管理しているが、個人の自宅のPublicIPをテンプレートに残してしまうのって良いのか? なんか嫌じゃない?

案2. ルーティングでごまかし案

ルーティングでごまかし案は2a、2bに分かれます。 2aと2bの違いはルーティングの調整をVPNサーバ側で行うか、クライアントのPC側で行うかの違いです。

案2での基本方針は以下からS3で使用されるPublicIPのレンジを確認してそのIPレンジ向けの通信をDC経由にします。

2a、2bのpros/consを記載します。 正直自分しか使わないのなら他の環境に影響少なくて手軽な2bでローカルのPCでルーティングいじってごまかすぞ、となるのですが、 今回の要件的には他メンバーにも展開する可能性があるので却下します。 記事まとめている途中で、わざわざS3のPublicIPのレンジ全部経路切らなくてもDNS引いた結果をhostsに追加&それ向けのホストルートをトンネルインタフェースに切れば良いんじゃないかと思いましたが他の人に展開するという観点ではやっぱり却下です。

  • 2a: VPNサーバでS3向けの経路をPushしDC経由にする

    • pros
      • クライアントPC側の作業は楽
    • cons
      • この要件のためだけにVPNサーバの経路調整したくない
      • S3のPublicIPのレンジをすべてDC経由にした場合に意図しない影響って本当に出ないんだっけ?
  • 2b: sshuttleなどを使用しクライアント側でルーティングを調整する

    • pros
      • VPNサーバ側の設定を変えなくて済む
      • 当該S3バケットを参照したい人だけDC経由になるので、2aより意図しない副作用は少なそう
    • cons
      • エンジニアならローカルで経路調整したりできそうだけど、非エンジニアの人には難しいかもしれない

案3. Proxy建てる案

Proxy建てる案はどこにProxyを建てるかどうかで3a、3bに分かれます。 3aはDCにProxyを建てる案です。DCにnginxのProxy建ててアクセスしている実績はあるのですが、オンプレのリソースは極力減らしていっている状況なので却下します。

  • 3a: DCにProxyを建てる
    • pros
      • VPNのルーティングは調整しなくて済む
      • 同様のProxyはDCにも何台かすでにある
    • cons
      • DCに余計なリソースが増える
      • 運用手間
  • 3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加
    • pros
      • VPNのルーティングは調整しなくて済む
      • Deployment Managerでデプロイは楽にできそう
    • cons
      • 動作確認は必要
      • GKEのクラスタがデプロイされるのでコスト面は大丈夫?

方法決定

一旦案をまとめます。 5案だしましたが、以下の3案が確度高く現実的には行けそうな感じがします。 案1、案2bはこの検討時点では動作することが分かっていたので、案3bの構成の検討を次節で行います。 案3bの検討事項ではコスト面と動作確認がconsとして上がっていますが、本記事ではとりあえず動作確認だけを行います。

  • 案1.愚直案
    • とりあえずはこれできるけど面白みはない
  • 案2b: sshuttleなどを使用しクライアント側でルーティングを調整する
    • エンジニアは拾えるかつ楽だけど非エンジニアは拾えない
  • 案3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加
    • 検証はした方が良いが意外と楽にできそう

4.リソース作成・動作確認

本節では 案3b: IAP ConnectorをProxyとして建てる&S3バケット側のホワイトリストに追加 の説明を簡潔に行います。 IAP Connectorのデプロイは基本的には以下公式の手順に従います。

最終的な構成図を以下に示します。 構成上、Deployment Managerのテンプレートでは、IAP ConnectorのNAT後のPublicIPとGCLBにアサインするPublicIPも同じテンプレートでデプロイされてしまうようですが、以下の理由から、PublicIPはコンソールでリソースを作成しそれをテンプレートから参照するように修正しました。また、とりあえずぱっとデプロイできて動きだけ確認できればいいので、Terraformで作成する選択肢は取らず、雑にDeployment Managerのテンプレート一枚で作成しました。

  • GKEクラスタはなにか問題が出たら潰して再構築する程度の温度感の運用を想定
    • ambassadorのPodの問題判別やクラスタ運用は極力しない手抜き前提
  • PublicIPはGKEクラスタと合わせて潰れてしまうとS3バケットホワイトリスト設定やDNS設定が手間
    • 所謂ライフサイクル異なるリソースは別テンプレートにというやつ(今回PublicIPはCLIでリソース作ったけれど)

f:id:enigmo7:20201215105259p:plain
図3

5.まとめ

本記事では、IAP Connectorを容易にデプロイできるNAT箱として利用しました。 Cloud IAP側でZero Trust的な制御を入れたとしても、IAP ConnectorからS3バケットのアクセス制御は昔ながらのIP制限方式になってしまっているなど、ツッコミどころや深堀りする余地はありますが、 S3バケットの前段に設置する構成については、一応当初の目的通りの動作を確認できました。

また、この記事とは関係ない半ば余談の愚痴ですが、2020年の夏頃にIAP Connectorを検証したのですが、プロキシ先からのレスポンスの書き換えができなくて困って、IAP Connectorに対する熱が下がったという個人的な経緯もあり、VPNもしくは既存のProxyの置き換え手段として手放しで愚直に推せる仕組みではないなというのが率直な印象でした。ただ、活用できる局面は多そうなので、引き続き継続的にキャッチアップしていきたいなと思います。

しかしながら、面倒でもGKEにnginxのProxyを建ててそれをCloud IAPで守った方がIAP Connectorより融通聞くのではという気持ちもあり、目的とそれにかける手間のバランスを見極めるのは難しいなと思いました。結論は毒にも薬にもならない感じなのですが終わります。

明日の記事の担当は、データテクノロジーグループの竹田さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

Dockerfileのベストプラクティスとセキュリティについて

こんにちは、主に検索周りを担当しているエンジニアの伊藤です。

この記事は Enigmo Advent Calendar 2020 の 17 日目の記事です。

みなさんは適切なDockerfileを書けていますか?とりあえずイメージのビルドが出来ればいいやとなっていませんか? 今回は自戒の意味も込めて、改めてDockefileのベストプラクティスについて触れつつ、 そもそもDockerfileを書かずにコンテナイメージをビルドする方法とコンテナセキュリティに関する内容についてまとめてみました。

Dockerfileのベストプラクティス

ご存知の方も多いと思いますが、こちらがDocker社が推奨するベストプラクティとなっています。 せっかくなので事例を交えていくつかピックアップしてみます。

イメージサイズは極力小さくしよう

  • 軽量なベースイメージを選択する
  • 不要なパッケージはインストールしない
  • レイヤはなるべく減らす
    • RUN/COPY/ADDだけがレイヤを増やすのでこれを使用するときに意識しましょう。
      • RUNで実行するコマンドは極力&&で連結する
      • 可能な場合はマルチステージビルドを利用する
      • ADDを使用したアンチパターン(下記の例ではADDによって圧縮ファイルを含んだレイヤが余計に作成されてしまう)
        • アンチパターン

          ADD http://example.com/big.tar.xz /usr/src/things/
          RUN tar -xJf /usr/src/things/big.tar.xz -C /usr/src/things
          RUN make -C /usr/src/things all
          
        • 推奨例

          RUN mkdir -p /usr/src/things \
          && curl -SL http://example.com/big.tar.xz \
          | tar -xJC /usr/src/things \
          && make -C /usr/src/things all
          

ビルドキャッシュを活用しよう

イメージをビルドするとき、DockerはDockerfileに書かれた命令を上から順番に実施します。 その際、各命令毎にキャッシュ内で再利用できる既存のイメージを探しますが、なければ以降のキャッシュは破棄されます。 そのため、更新頻度が高いものをDockerfileの後ろの方に記載することが重要になります。 例えば下記はappというアプリケーションコードを含むディレクトリをコンテナにコピーし、 pip installによって必要なライブラリをインストールする例です。

COPY app /tmp/
RUN pip install --requirement /tmp/requirements.txt
  • 推奨例
COPY requirements.txt /tmp/
RUN pip install --requirement /tmp/requirements.txt
COPY app /tmp/

一見すると前者の方がレイヤが少ない分、良さそうに見えますが、 app配下のコードに変更が入るたびにライブラリのインストールも行われ、 その分ビルド時間が伸びてしまいます。

Dockerfileに関する悩みどころ

ここまでDockerfileに関するベストプラクティスについて触れてきましたが、Dockerfileを作成、メンテするのって大変ではないですか?

  • どのベースイメージを使用すべきか?
  • イメージサイズが大きくなりすぎる
    • イメージサイズの削減を頑張ってたら時間が溶けた(開発作業に専念したいのに。。。)
  • Dockerfile自体のメンテが辛い
    • イメージサイズを小さくしようと思うとDockerfile自体の可読性が下がるというつらみ
  • ベストプラクティスを意識することが自体が辛い
  • セキュリティ的な懸念
    • 使用するベースイメージに脆弱性が含まれていないかなど

Dockerfileを書かないという選択肢

そこで続いてのお話がBuildpackについてです。 こちらを利用することでDockerfileを書くことなく、ソースコードからコンテナイメージを生成することが可能になるというものです。

Buildpack

  • 2011年にHerokuが考案し、Cloud FoundryGitlabKnative等で採用されている仕組み
  • 様々な言語のBuildpackを使ってユーザのアプリケーションコードに対して、「判定」、「ビルド」、「イメージ化」といった一連の流れを実施する事によって、基盤上で動作可能な形にアプリケーションコードを組み立てる

Cloud Native Buildpacks

  • 上記のHerokuオリジナルと呼ばれるBuildpackが特定の実行基盤でしか動作しないというでデメリットがあったのに対し、Dockerの急速な普及を背景に、OCIイメージのようなコンテナ標準を採用したイメージを作成しようと始まったのがCloud Native Buildpacks(以降 CNBと略) Projectです。
  • HerokuとPivotalが中心となって2018年1月にCNCF傘下でスタートし、現時点でCNCFのSandboxプロジェクトという立ち位置になっています
  • 以降はこちらのCNBについての概要について記載します

CNBの仕組み

CNBを利用してイメージを生成する際はビルダーというものを指定します。 ビルダーはアプリのビルド方法に関するすべての部品と情報をバンドルしたイメージとなっており、複数のbuildpacklifecyclestackで構成されています。

f:id:pma1013:20201214095357p:plain
公式サイトから引用

  • buildpack
    • ソースコードを検査し、アプリケーションをどうビルドし実行するかを決める
  • lifecycle
    • buildpackの実行を調整し、最終的なイメージを組み立てる
  • stack
    • ビルド及び実行環境用のコンテナイメージのペア

デモ

基本的にCNBを利用して運用していく際には、自前のビルダーを作成することになると思います。 今回はお試しということで、すでにあるビルダーを使って試してみたいと思います。

  • 前提条件
    • ローカル環境にDocker及びBuildpackがインストール済みであること
  • サンプルコード
    • Flaskを利用したWebアプリケーション(単純にHello Worldと出力するだけのもの)
    • 構成としては下記の通りで最低限のファイルのみ配置しています。
              .
              ├── requirements.txt
              └── src
                  ├── __init__.py
                  ├── app.py
                  └── templates
                      └── index.html
  • ビルド
    それではpackコマンドを使ってビルドしてみましょう。
$ pack build sample-cnb:0.0.1
Please select a default builder with:

    pack set-default-builder <builder-image>

Suggested builders:
    Google:                gcr.io/buildpacks/builder:v1      Ubuntu 18 base image with buildpacks for .NET, Go, Java, Node.js, and Python
    Heroku:                heroku/buildpacks:18              heroku-18 base image with buildpacks for Ruby, Java, Node.js, Python, Golang, & PHP
    Paketo Buildpacks:     paketobuildpacks/builder:base     Ubuntu bionic base image with buildpacks for Java, .NET Core, NodeJS, Go, Ruby, NGINX and Procfile
    Paketo Buildpacks:     paketobuildpacks/builder:full     Ubuntu bionic base image with buildpacks for Java, .NET Core, NodeJS, Go, PHP, Ruby, Apache HTTPD, NGINX and Procfile
    Paketo Buildpacks:     paketobuildpacks/builder:tiny     Tiny base image (bionic build image, distroless-like run image) with buildpacks for Java Native Image and Go

Tip: Learn more about a specific builder with:
    pack inspect-builder <builder-image>

packコマンドを実行すると上記のようにビルダーを指定しろと言われます。 今回はここでおすすめされている Google Cloud Buildpacks を利用して実行します。

$ pack build sample-cnb:0.0.1 --builder gcr.io/buildpacks/builder:v1
v1: Pulling from buildpacks/builder
Digest: sha256:f0bb866219220921cbc094ca7ac2baf7ee4a7f32ed965ed2d5e2abbf20e2b255
Status: Image is up to date for gcr.io/buildpacks/builder:v1
v1: Pulling from buildpacks/gcp/run
Digest: sha256:83eb67ec38bb38c275d732b07775231e7289e0e2b076b12d5567a0c401873eb7
Status: Image is up to date for gcr.io/buildpacks/gcp/run:v1
===> DETECTING
google.python.runtime            0.9.1
google.python.missing-entrypoint 0.9.0
google.utils.label               0.0.1
===> ANALYZING
Previous image with name "sample-cnb:0.0.1" not found
===> RESTORING
===> BUILDING
=== Python - Runtime (google.python.runtime@0.9.1) ===
Using runtime version from .python-version: 3.7.8
Installing Python v3.7.8
Upgrading pip to the latest version and installing build tools
--------------------------------------------------------------------------------
Running "/layers/google.python.runtime/python/bin/python3 -m pip install --upgrade pip setuptools wheel"
Collecting pip
  Downloading pip-20.3.1-py2.py3-none-any.whl (1.5 MB)
Collecting setuptools
  Downloading setuptools-51.0.0-py3-none-any.whl (785 kB)
Collecting wheel
  Downloading wheel-0.36.2-py2.py3-none-any.whl (35 kB)
Installing collected packages: pip, setuptools, wheel
  Attempting uninstall: pip
    Found existing installation: pip 20.1.1
    Uninstalling pip-20.1.1:
      Successfully uninstalled pip-20.1.1
  Attempting uninstall: setuptools
    Found existing installation: setuptools 47.1.0
    Uninstalling setuptools-47.1.0:
      Successfully uninstalled setuptools-47.1.0
Successfully installed pip-20.3.1 setuptools-51.0.0 wheel-0.36.2
Done "/layers/google.python.runtime/python/bin/python3 -m pip inst..." (6.427479028s)
=== Python - pip (google.python.missing-entrypoint@0.9.0) ===
Failure: (ID: 194879d1) Failed to run /bin/build: for Python, an entrypoint must be manually set, either with "GOOGLE_ENTRYPOINT" env var or by creating a "Procfile" file
--------------------------------------------------------------------------------
Sorry your project couldn't be built.
Our documentation explains ways to configure Buildpacks to better recognise your project:
 -> https://github.com/GoogleCloudPlatform/buildpacks/blob/main/README.md
If you think you've found an issue, please report it:
 -> https://github.com/GoogleCloudPlatform/buildpacks/issues/new
--------------------------------------------------------------------------------
ERROR: failed to build: exit status 1
ERROR: failed to build: executing lifecycle: failed with status code: 145

今度は上記のようなエラーが出力されます。 どうやらDockerfileのentrypointに相当する GOOGLE_ENTRYPOINTを設定する必要があるようです。 該当のオプションを追加して下記の通り再トライしてみます。

$ pack build sample-cnb:0.0.1 --builder gcr.io/buildpacks/builder:v1 --env GOOGLE_ENTRYPOINT="flask run --host 0.0.0.0 --port 5000"
〜省略〜
Adding cache layer 'google.python.pip:pip'
Adding cache layer 'google.python.pip:pipcache'
Successfully built image sample-cnb:0.0.1

上記のようにSuccessfullyと出力されれば無事にコンテナイメージのビルドは完了しています。 作成されたイメージを確認してみましょう。

REPOSITORY              TAG           IMAGE ID       CREATED         SIZE
sample-cnb              0.0.1         4c60a192da62   40 years ago    289MB

sample-cnbというイメージが作成されていることが確認できました。 ここで気になるのは作成日が40 years agoとなっていることです。 これについては公式サイトに記載がありましたが、 どうやら再現可能なビルドを目的とした意図的な設計のようです。

  • コンテナ起動
    ビルドしたコンテナを起動して正常に動作することを確認します。 下記コマンドでコンテナを起動して、
    $ docker run --rm -p 5000:5000 -e FLASK_ENV=development sample-cnb:0.0.1
    こちらにアクセスすると、下記の画面が表示されることが確認できました。

f:id:pma1013:20201214165419p:plain

  • Dockerfileを使ったビルド
    最後に比較のためにDockerfileを利用したビルドも行います。

    • Dockerfileの準備
FROM python:3.7

WORKDIR /app

COPY requirements.txt /app

RUN pip install -r requirements.txt

COPY src /app/

ENV FLASK_APP=/app/app.py

ENTRYPOINT ["flask", "run"]
CMD ["--host", "0.0.0.0", "--port", "5000"]
  • ビルド
    $ docker build -t sample-df:0.0.1 .

  • 比較
    Dockerfileベースでビルドしたイメージは下記の通りとなります。 CNBで作成したイメージの方が軽量なOSが利用されていることが分かります

REPOSITORY              TAG           IMAGE ID       CREATED          SIZE
sample-df              0.0.1           9a5c14fd1846   14 seconds ago   928MB

CNBのメリット

CNBのメリットをざっとまとめると下記のような感じになるかと思います。

  • 開発に注力できる
    • 開発者はDockerfileを作成、メンテすることから開放される
  • 持続可能な運用
    • スケーラブルなセキュリティ対応
      • 散在しがちなDockerfileすべてにおいて脆弱性対応などしていくのは現実的ではない

セキュリティについて

私のコンテナセキュリティに対する知識としては、下記のようなレベルのものでした。

  • コンテナにおけるセキュリティって何すればいいの?
  • そもそもコンテナに限らず何をすればセキュリティちゃんとしてますって言えるの?

という訳でコンテナにおけるセキュリティ基準やツールとしてはどういったものがあるのかを調査した結果をまとめます。

概要

コンテナにおけるセキュリティ基準

コンテナの脆弱性スキャン

  • コンテナ環境もオンプレ同様にOSのライブラリやパッケージなどから構成されるため、これまで通り脆弱性対策が必須である
  • それに加えてコンテナイメージ、ランタイム環境の脆弱性にも配慮する必要がある

ツールの活用

とりあえず手軽に上記のセキュリティ基準チェックと脆弱性スキャンを行いたいというモチベーションの元、以前から気になるツールをピックアップしました。

dockle

https://github.com/goodwithtech/dockle

trivy

https://github.com/aquasecurity/trivy

  • 概要
    • コンテナの脆弱性スキャンツール
  • 使い方
    • trivy [イメージ名]

デモ

ここで上記で作成したコンテナイメージ(Dockerfileから作成したイメージとCNBで作成したイメージ)をそれぞれのツールにかけた場合にどういった結果になるか確認してみたいと思います。

Dockerfileベース

まずはDockerfileからビルドしたイメージの方です。

  • dockle
    • WARNレベルが1件検知されました。
$ dockle sample-df:0.0.1
WARN    - CIS-DI-0001: Create a user for the container
    * Last user should not be root
INFO    - CIS-DI-0005: Enable Content trust for Docker
    * export DOCKER_CONTENT_TRUST=1 before docker pull/build
INFO    - CIS-DI-0006: Add HEALTHCHECK instruction to the container image
    * not found HEALTHCHECK statement
INFO    - CIS-DI-0008: Confirm safety of setuid/setgid files
    * setuid file: usr/bin/chfn urwxr-xr-x
    * setgid file: usr/bin/ssh-agent grwxr-xr-x
    * setuid file: usr/lib/openssh/ssh-keysign urwxr-xr-x
    * setuid file: bin/umount urwxr-xr-x
    * setgid file: usr/bin/wall grwxr-xr-x
    * setuid file: bin/mount urwxr-xr-x
    * setuid file: usr/bin/gpasswd urwxr-xr-x
    * setuid file: usr/bin/passwd urwxr-xr-x
    * setgid file: usr/bin/chage grwxr-xr-x
    * setuid file: bin/su urwxr-xr-x
    * setuid file: bin/ping urwxr-xr-x
    * setgid file: usr/bin/expiry grwxr-xr-x
    * setuid file: usr/bin/newgrp urwxr-xr-x
    * setuid file: usr/bin/chsh urwxr-xr-x
    * setgid file: sbin/unix_chkpwd grwxr-xr-x
  • trivy
    • こちらは大量の出力結果が表示されるためサマリのみ貼っておきます。 CRITICALなものが69件検知されていることが分かります。
$ trivy sample-df:0.0.1

sample-df:0.0.1 (debian 10.2)
=============================
Total: 2401 (UNKNOWN: 23, LOW: 1291, MEDIUM: 520, HIGH: 498, CRITICAL: 69)

CNBベース

続いてCNBでビルドしたイメージの方を確認してみます。

  • dockle
    • こちらはWARNレベルのものは1件もなく、INFOレベルのものだけが検知されました。
$ dockle sample-cnb:0.0.1
INFO    - CIS-DI-0005: Enable Content trust for Docker
    * export DOCKER_CONTENT_TRUST=1 before docker pull/build
INFO    - CIS-DI-0006: Add HEALTHCHECK instruction to the container image
    * not found HEALTHCHECK statement
INFO    - CIS-DI-0008: Confirm safety of setuid/setgid files
    * setgid file: usr/bin/expiry grwxr-xr-x
    * setuid file: bin/umount urwxr-xr-x
    * setgid file: usr/bin/chage grwxr-xr-x
    * setuid file: usr/bin/newgrp urwxr-xr-x
    * setgid file: usr/bin/wall grwxr-xr-x
    * setuid file: usr/bin/chsh urwxr-xr-x
    * setuid file: bin/su urwxr-xr-x
    * setuid file: usr/bin/passwd urwxr-xr-x
    * setuid file: usr/bin/gpasswd urwxr-xr-x
    * setuid file: usr/bin/chfn urwxr-xr-x
    * setuid file: bin/mount urwxr-xr-x
    * setgid file: sbin/unix_chkpwd grwxr-xr-x
    * setgid file: sbin/pam_extrausers_chkpwd grwxr-xr-x
  • trivy
    • こちらもサマリのみ貼りますが、CRITICALに関しては0件となっています
$ trivy sample-cnb:0.0.1
2020-12-14T19:22:18.244+0900    INFO    Detecting Ubuntu vulnerabilities...

sample-cnb:0.0.1 (ubuntu 18.04)
===============================
Total: 75 (UNKNOWN: 0, LOW: 53, MEDIUM: 20, HIGH: 2, CRITICAL: 0)

この結果からもGoogle Cloud Buidpackを利用してビルドしたイメージの方が軽量かつセキュアな環境であることが分かると思います。

CIへの組み込み

上で紹介したツールはいずれもCIに組み込んで使用することも想定して作られています。 下記のようにオプションを指定して使うことで、CIのタイミングで実行&確認がしやすくなっています。

  • dockle
    • dockle --exit-code 1 [イメージ名]
  • trivy
    • trivy --exit-code 1 --severity CRITICAL --no-progress [イメージ名]

まとめ

今回はDockerfileのベストプラクティスのおさらいと、CNBを利用したコンテナイメージのビルド方法、セキュリティに関してさらっとまとめてみました。

今後もコンテナベースのアプリケーション開発が進むと、 これまで個人、チームレベルで任せていたDockerfileの作成、管理が破綻するのではと感じました。 CNBには組織として統制のとれたコンテナ作成やセキュリティ基準を継続的に満たすことの手段が提供されているので、 その辺りをうまく活用していく必要性を感じでいます。

セキュリティについても検知の仕組みだけでなく、日々の運用の中でいかに対応していくかということが大事だと思うので、 今後も試行錯誤しながら少しずつ前進していければと思っています。

明日の記事の担当はインフラエンジニアの山口さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co


  1. CIS(Center for Internet Security)とは、米国のNSA(National Security Agency/国家安全保障局)、DISA(Difense Informaton Systems Agency/国防情報システム局)、NIST(National Institute of Standards and Technology/米国立標準技術研究所)などの米国政府機関と、企業、学術機関などが協力して、インターネット・セキュリティ標準化に取り組む団体の名称

Rails + マイクロサービスでイベント駆動アーキテクチャを導入した話

はじめに

こんにちは、サーバーサイドエンジニアの@hokitaです。

この記事は Enigmo Advent Calendar 2020 の 16 日目の記事です。

弊社が運営するBUYMAは現状モノレポで管理されており、10年以上も運営しているサービスなのでソースも肥大化していて、メンテナンスが難しくなってきました。 そこで現在、本体から少しずつマイクロサービスに切り離していこうとしています。

その取組の中で配送処理の一部をマイクロサービス化する作業に携わることができました。今回はBUYMA本体と配送サービスとの通信にイベント駆動アーキテクチャを導入した話をしていきます。

イベント駆動アーキテクチャ

マイクロサービスでサービスを切り分ける場合、それぞれ責務が分かれるように分割するかと思います。 しかしサービス間の通信手段によっては各サービスが密になる恐れがあります。

そこでイベント駆動アーキテクチャを利用します。

f:id:hokita:20201215115957p:plain
PublisherとSubscriber

メリットとして

  • PublisherはSubscriberのことを知らない
  • Subscriberはイベントのこと以外知らない
  • Publisher1つに対し、複数のSubscriberを設置できる

このようにPublisherとSubscriberはお互い知らないので、疎の関係を保つことができます。

非同期メッセージングサービス

イベント駆動アーキテクチャを実現するために、非同期メッセージングサービスを利用します。

代表的なものとして

などがあります。

今回は Amazon SNS / Amazon SQS を使用しました。

Amazon SNS

docs.aws.amazon.com

発行者からサブスクライバー (プロデューサーおよびコンシューマーとも呼ばれます) へのメッセージ配信を提供するマネージド型サービスです。

メッセージを発信するものと思って頂ければと思います。メッセージの受け取り先(サブスクライバー)として今回はAmazon SQSですが、他にもAWS Lambdaなどでも取得することが可能です。

なぜSNSを利用するのか

次の図を見て頂ければ分かる通り、SNSがないとBUYMA本体が配送サービスを知っていることになります。つまりになってしまいます。

f:id:hokita:20201215120053p:plain
SNSなしの場合

Amazon SQS

docs.aws.amazon.com

メッセージキューイングサービスです。同じ非同期処理としてsidekiqやResqueを使っているサービスも多いかと思います。

主な特徴としては

  • "at least once"(最低1回)が保証されている
    • 逆に2回以上同じメッセージを取得する可能性がある
    • Redisのようにジョブを失うことがない
  • 順不同(それなりに担保されるのかと思っていましたが全く順不同でした。)

FIFO(First-In-First-Out)キュー

通常キューとは別でFIFOキューというものも使用することができます。

  • 受信する順序が保持される
  • 必ず1回処理される
  • 1つずつ処理される
  • 通常キューより処理が遅くなる

処理順序が決まっている処理(例えば製品価格の変更処理など)で便利かと思います。

ロングポーリング

ショートポーリングとロングポーリングというものがありますが、基本ロングポーリングが良いです。(ショートポーリングはいつ使われるのだろうという感じです。)

ロングポーリングですが最大20秒キューにメッセージがないか待機をして、あれば即座に実行します。(筆者は最初20秒間に溜まったメッセージを処理するのかと思っていましたが、勘違いでした。メッセージを取得したら即座に実行です。)

Shoryuken

github.com

RailsでSQSをジョブキューとして利用するときは現状Shoryuken一択です。Shoryukenを起動することでSQSのキューをポーリングし、キューを取得して処理を実行してくれます。エンキューももちろんできます。

一通りwikiに必要な情報が書かれているので、そちらを読めば問題なく実装できるかと思います。

以下設定ファイルや実装例を記載します。

shoryuken.yml

対象のキューの情報を記載します。

Shoryuken options · phstc/shoryuken Wiki · GitHub

# config/shoryuken.yml

groups:
  purchase_completed:
    concurrency: 1
    queues:
      - ['purchase_completed_queue', 1]
pidfile: ./tmp/pids/shoryuken.pid

※groupsで分けている理由

groups:
  group1:
    concurrency: 1
    queues:
      - ['a_queue', 2] # 重さ2
      - ['b_queue', 1] # 重さ1

このように同じグループに複数のキューを設定している場合、a_queueがメッセージを取得してもb_queueにメッセージがない場合はポーリング時間が終わるまでa_queueの処理は実行されないので注意が必要です。

Processing Groups · phstc/shoryuken Wiki · GitHub

またa_queueb_queueで大量にメッセージがある場合、a_queueb_queueの重さの2倍なので、処理の優先度も2倍になります。

Polling strategies · phstc/shoryuken Wiki · GitHub

shoryuken.rb

SQSの情報を記載します。

# config/initializers/shoryuken.rb

# ロングポーリング
Shoryuken.sqs_client_receive_message_opts = { wait_time_seconds: 20 }

Shoryuken.sqs_client = Aws::SQS::Client.new(
  region: ENV['AWS_REGION'],
  access_key_id: ENV['AWS_ACCESS_KEY_ID'],
  secret_access_key: ENV['AWS_ACCESS_SECRET_KEY']
)

起動

$ bundle exec shoryuken -R -C config/shoryuken.yml

ジョブ

例)決済完了時に実行するジョブ

# app/jobs/purchase_complete_job.rb

# 決済完了時のジョブ
class PurchaseCompleteJob < ApplicationJob
  include Shoryuken::Worker

  shoryuken_options queue: '<キュー名>',
                    auto_delete: true,
                    body_parser: :json

  def perform(_sqs_msg, body)
    message = JSON.parse(body['Message'])

    # 保存処理
  end
end

エンキュー

エンキューも簡単にできます。

# 配送ステータスの通知ジョブにエンキュー
StatusNotificationJob.perform_async(
  status_data: 'some status data'
)

FIFOキュー

前述したFIFOキューですが、Shoryukenでも扱うことができます。 基本通常キューと記述は同じですが、メッセージグループIDを指定することができます。 同じメッセージグループIDだと厳密な順序で、常に1件ずつ処理をします。

例えば取引のステータスを変更したい場合、ステータス変更の順序は重要だが、他の取引同士の順序は気にしない場合はメッセージグループIDに取引IDを指定したら良さそうです。

SomeJob.perform_async(
  csv_id: csv.id,
  message_group_id: <MESSAGE_GROUP_ID>
)

イベント駆動は一筋縄ではいかない話

各技術の説明をしてきましたが、ここからはイベント駆動アーキテクチャ導入時に必ず考慮する点を書いていきます。

冪等性

同じ処理を何度実行しても同じ結果が得られる性質のことです。

冪等 - Wikipedia

Amazon SQSの通常キューでは2回以上同じメッセージを取得する可能性があるので、重複してメッセージを取得することを考慮する必要があります。

今回実装した決済完了イベントでは、取引ID(冪等キー)をUnique Keyにして、DB側で制御するようにしました。

# 配送サービスの配送情報保存処理
def save!
  Delivery.save!(params)
rescue ActiveRecord::RecordNotUnique => e
  # 念の為取引ID(冪等キー)で重複していることを確認
  raise e unless Delivery.exists?(order_id: params[:order_id])

  # ログだけ残して正常終了させる
  Rails.logger.info(
    "#{self.class}##{__method__} Message: #{e}"
  )
end

もしくはFIFOキューを導入することでも解決が可能です。

整合性

モノレポの場合はDBのtransaction機能を利用して、失敗したらロールバックすることが可能です。しかしマイクロサービスだとそう簡単にはできません。

一般的にはTCCパターンやSagaパターンを使用する必要がでてきます。 qiita.com

今回は下記理由で、ロールバック処理を行いませんでした。

  • 連携するサービス数が少ないので今のところ処理がシンプル
  • 外部からの入力事項は送信前にバリデーションされている
  • BUYMA本体から配送サービスに送られてくるメッセージは信頼する
  • BUYMA本体の取引データと配送データに不整合がないかを毎日チェックしている

現状は問題なく動作していますが、今後はもっと堅牢にするためにTCCパターンやSagaパターンを導入する必要がありそうです。

最後に

イベント駆動アーキテクチャに使用した技術と考慮するポイントを書いていきましたが、マイクロサービスは思っている以上に学習することが多く、また実際に作成して経験値を積んでいくことが大事だと実感しました。 今後も実践を経て、レガシーから脱却できるように精進していきたいです。

明日の記事の担当は検索エンジニアの伊藤さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

Figma Pluginの作り方

この記事は Enigmo Advent Calendar 2020の15日目の記事です。

エニグモでは、2日目の記事「デザインツールをXd→Figmaへした話 / プロトタイプ作るようになった話」にもあるように、UIに関わるメンバーは、Figmaを使用してデザインすることが多くなりました。

私もそのうちの一人で、SketchやXDに比べて便利なことが多く、デザインの確認・共有がしやすくなったように感じています。 日頃の業務でFigmaを使っていて、作ってみたい機能が出てきたため、Figma Pluginの開発を始めてみました。

記事について

本記事では、Figma Pluginの開発を始めるまでの作業や内容について説明します。

Figma Pluginの公式ドキュメントは大変分かりやすく書かれていますが、少し内容が古くなっている箇所もあり、そのまま進めても動作しないこともありましたので記していければと思います。

ちなみに、本記事は公開日直前に書いており、後述する作りたいプラグインは本記事内で完成しません。すみません。

作りたいもの

日頃の業務で、デザインを作成したときにUIの設計意図や説明をコメントとして、デザイン上に添えることがよくあります。(以下イメージ)

f:id:enigmo7:20201215130834p:plain

コメントする度に、「frame」を用意して、テキストを置いて、どこの部分にコメントしているのかを分かるようにする必要があり、少し手間だと感じておりました。 どこか効率化できないかと思い、Figma Pluginの作成を試みました。

想定したプラグインの概要は、まず選択した「Layer」の名前を取得し、プラグインのUI側でコメントを書いて、ボタンを押すと、コメント用の「Frame」が生成されるというものです。デザイン上のコメントは、時には邪魔になることもあるので、一括で表示・非表示なども制御できるとよいなと考えております。

Figmaのコメント機能でやればいいじゃないか』と思われるかもしれませんが、コメントするためにモードを切り替える必要があり面倒で、また、デザインを見たメンバーが確実にコメントまで見てくれているとは限らないため、今のところはデザイン上に「frame」としてコメントを置きたいと考えております。

用意しておくもの

Visual StudioCode

  • エディタは基本的に何でも大丈夫だと思いますが、TypeScriptを使用するため公式ではVSCodeが推奨されています。

Node.js

  • まだの方は以下のサイトからダウンロードしておきます

TypeScript

  • JavaScriptでも開発はできますが、変数がどんなプロパティを持っているのかすぐに知ることができるため推奨されています。

Figmaデスクトップアプリ

  • ブラウザのものではなく、デスクトップアプリが必要です。ローカルファイルを読み込みながら開発できます。

プラグインを動かすまで

Figmaアプリを起動

アプリケーションメニューの「Plugins」>「Manage Plugins...」を選択します。

f:id:enigmo7:20201215131331p:plain

表示された画面の「In Development」のセクションから「+」ボタンをクリックします。

f:id:enigmo7:20201215131335p:plain

プラグインのテンプレートを作成、ローカルへ保存

「Create a plugin」のモーダルが出てくるので、作りたいプラグインの名前を入力し、「Continue」ボタンをクリックします。

f:id:enigmo7:20201215131338p:plain

「Choose a template」の画面に進むので、自分が作りたいテンプレートの形式を選択します。今回私はUIを伴ったプラグインを作成したいので、一番右の「With UI & browser APIs」を選択しました。

f:id:enigmo7:20201215131343p:plain

「Save as...」からローカルに保存します。保存すると、先程の「In Development」のセクションに連携されます。

f:id:enigmo7:20201215131347p:plain

VSCodeでファイルを開く

7つのファイルで構成されています。それぞれの説明は、後述します。

f:id:enigmo7:20201215132608p:plain

TypeScriptの自動コンパイルの設定

「ターミナル」>「ビルド タスクの実行...」から「tsc:ウォッチ - sample/tsconfig.json」を選択し、クリックします。

f:id:enigmo7:20201215132613p:plain

これで編集を加える度に自動でコンパイルが実行されますが、TS2304のエラーが出るので、型定義ファイルをインストールしてください。

f:id:enigmo7:20201215133233p:plain

npm install --save-dev @figma/plugin-typings

これでエラーがなくなったと思います。 先程保存したプラグインが実際にFigmaアプリ上で動くようになります。

Figmaプラグインを実行

「Plugins」> 「Development」 > 「sample」をクリックするとプラグインが実行すると、長方形作成者が現れます。ちなみに、「Plugins」> 「Development」 > 「Open console」で見慣れたコンソールを表示することができます。

f:id:enigmo7:20201215134913p:plain

構成ファイルについて

manifest.json

詳しい説明はこちらに記載されております。

code.ts

先程の長方形を作成するプラグインでいうと、Figmaのページに長方形を作成する処理が書かれています。

figma.showUI(__html__);

figma.ui.onmessage = msg => {
  if (msg.type === 'create-rectangles') {
    const nodes: SceneNode[] = [];
    for (let i = 0; i < msg.count; i++) {
      const rect = figma.createRectangle();
      rect.x = i * 150;
      rect.fills = [{type: 'SOLID', color: {r: 1, g: 0.5, b: 0}}];
      figma.currentPage.appendChild(rect);
      nodes.push(rect);
    }
    figma.currentPage.selection = nodes;
    figma.viewport.scrollAndZoomIntoView(nodes);
  }
  figma.closePlugin();
};

code.js

  • 上記 code.tsコンパイルされた後の姿
  • manifest.jsonで読み込まれ、実行されるファイル

ui.html

先程の長方形を作成するプラグインでいうと、長方形の数を入力するプラグイン側のUI部分になります。

<h2>Rectangle Creator</h2>
<p>Count: <input id="count" value="5"></p>
<button id="create">Create</button>
<button id="cancel">Cancel</button>
<script>

document.getElementById('create').onclick = () => {
  const textbox = document.getElementById('count');
  const count = parseInt(textbox.value, 10);
  parent.postMessage({ pluginMessage: { type: 'create-rectangles', count } }, '*')
}

document.getElementById('cancel').onclick = () => {
  parent.postMessage({ pluginMessage: { type: 'cancel' } }, '*')
}

</script>

HTMLファイルになっておりますが、<script> タグでJavaScriptのコードが埋め込まれています。

このファイル側、つまりUI側ではFigmaの操作を直接行うことができず、Web Messaging APIのpostMessageを介して、 code.ts とお互いにやり取りすることで連携しています。

f:id:enigmo7:20201215150946p:plain

※ その他にも構成ファイルには、 package.json tsconfig.json README.mdがあります。

Webpack + Reactの導入

せっかくなので、Reactを導入して開発していきたいと思います。 導入の仕方は自由ですが、今回は公式ドキュメントに沿って進めていきます。

インストール

公式のドキュメントには以下のインストール行うように書かれていますが、

npm install --save-dev css-loader html-webpack-inline-source-plugin@beta html-webpack-plugin style-loader ts-loader typescript url-loader webpack webpack-cli

この通りにインストールしてしまうと動かないため、Figmaが提供しているReactのサンプルの依存関係を参考にして、インストールします。

"dependencies": {
    "@types/react": "^16.8.23",
    "@types/react": "^16.8.23",
    "@types/react-dom": "^16.8.5",
    "css-loader": "^3.1.0",
    "html-webpack-inline-source-plugin": "0.0.10",
    "html-webpack-plugin": "^3.2.0",
    "react": "^16.8.6",
    "react-dom": "^16.8.6",
    "style-loader": "^0.23.1",
    "ts-loader": "^6.0.4",
    "typescript": "^4.0.3",
    "url-loader": "^2.1.0",
    "webpack": "^4.38.0",
    "webpack-cli": "^3.3.6"
  }

ファイル作成

ライブラリのインストールが終わったら、構成ファイルを変更します。新たに以下のファイルを作成し、中身はサンプルを参考にします。

  • src/code.ts
  • src/ui.html
  • src/ui.tsx
  • src/ui.css
  • src/logo.svg
  • webpack.config.js

ファイルの変更

manifest.jsonuimainを以下のように書き換えます。

{
  ...
  "main": "dist/code.js",
  "ui": "dist/ui.html",
  ...
}

tsconfig.jsonを以下のように書き換えます。

{
  "compilerOptions": {
    "target": "es6",
    "jsx": "react",
    "typeRoots": [
      "./node_modules/@types",
      "./node_modules/@figma"
    ]
  }
}

ファイルを削除

先程まで使用していた code.ts , ui.html ファイルは不要になるので削除します。

これで環境が整いました。 Figmaデスクトップアプリから実行すると、プラグインが実行されるようになりました。

プラグインのUIデザイン

最後になりましたが、プラグインのUIは、Figma Componentsが参考になりそうでした。 必ずしもFigmaに沿ったUIである必要はなさそうですが、Plugin Review GuidelinesでもFigmaに沿ったデザインが推奨されています。

We highly recommend matching your plugin to Figma's UI so we can create a seamless experience for our users.

終わりに

今回はFigma Plugin作成の準備についてまとめました。

デザイナーであっても自力で作成できる範囲だと思いましたので、これから冒頭で述べたプラグインを作っていければと思います。

明日の記事の担当は、サーバーサイドエンジニアの@hokitaさんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧 hrmos.co

SQL Server、BigQuery、Redshift 日付型の比較&リファレンス

この記事はEnigmo Advent Calendar 2020 の14日目の記事です。

はじめに

こんにちは、エニグモ 嘉松です。

簡単な自己紹介ですが、BUYMAのプロモーションやマーケティングを行っている事業部に所属して、その中のデータ活用推進室という部署で会社のデータ活用の推進やマーケティング・オートメーションツール(MAツール)を活用した販促支援、CRMなどを担当しています。(データ活用推進室、長らく私一人部署だったのですが、先月1名増えて2名体制になりました!)

目次

背景

エニグモのデータ活用の大きな特徴として、エンジニアに限らず、マーケターやマーチャンダイザー(MD)、カスタマーサポートや役員まで、多くの社員、ほとんどの社員と言っても過言では無いくらいの人が自分でSQLを叩いてデータを見る、分析するという文化、カルチャーが根付いているということが言えると思います。

また、データ活用基盤の整備も積極的に進めており、クラウドで提供されているビックデータ向けのデータベースをデータレイクやデータウェアハウス(DWH)として利用しています。

このように複数のデータベースを活用してく中で出てくる問題点が、SQLシンタックスの違いです。 特にエニグモではエンジニアでは無いユーザもたくさんいるので、SQLの作成に多くの時間がとられてしまうと、本来の業務へも影響がでてきてしまいますし、データ活用は停滞してしまいます。

そこで、この記事ではMicrosoftが提供しているSQLServerGoogleが提供しているフルマネージド型分析データウェアハウスであるBigQuery、Amazon Web ServicesAWS)のクラウド型データウェアハウスであるRedshiftの3製品を対象として、特に混乱するであろう日付および時刻関連のデータ型について整理することで、今後のリファレンスになればと思っています。

日付および時刻関連のデータ型

まず、ここでは各データベースの日付および時刻関連のデータ型(の代表的なもの)を列挙します。

※データ型の表記(大文字小文字)、説明の内容については各データベースのマニュアルにおおよそ準拠しています。

SQL Server

データ型 説明 タイムゾーン
date 日付型 なし
datetime 日時型(タイムゾーンなし) なし
datetimeoffset 日時型(タイムゾーンあり) あり

BigQuery

データ型 説明 タイムゾーン
DATE 日付型 なし
DATETIME 日時型 なし
TIMESTAMP タイムスタンプ型 あり

Redshift

データ型 説明 タイムゾーン
DATE カレンダー日付 (年、月、日) なし
TIMESTAMP 日付と時刻 (タイムゾーンなし) なし
TIMESTAMPTZ 日付と時刻 (タイムゾーンあり) あり

タイムゾーンとは?

タイムゾーンについて言及すると、それだけで1本の記事になるくらいなので、簡単に説明します。

データーベースにおけるタイムゾーンのあり・なしとは、標準時間をUTCとするか、それとも個々のデータベースで決めるか、ということです。

タイムゾーンありのデータ型を使う場合は、当然、データを格納する時にもタイムゾーンを指定してデータを格納する必要があります。

また、タイムゾーンなしのデータ型を使う場合は、そのデーターベースにはどのタイムゾーンでデータが格納されているかを、意識して使う必要があります。 例えば日本の時間で格納したデータを、ニューヨークの時間帯で表示させるには、時間を14時間戻してあげるといったことを意識的に行う必要があります。

いずれにしても、ひとつのデータベースで時差のある地域の時間を扱う場合は、時差を意識することからは逃れられません。基準となる時間をUTCにするのか、どうかの違いです。 逆に日本時間だけで良いシステムであれば、扱う時間は常に日本時間なので、タイムゾーンなしのデータ型を使うことで、タイムゾーンを意識する必要がなくなります。

データ型まとめ

日付型 日時型(タイムゾーン無し) 日時型(タイムゾーン有り)
SQLServer date datetime datetimeoffset
BigQuery DATE DATETIME TIMESTAMP
Redshift DATE TIMESTAMP TIMESTAMPTZ

日付型は3データベースともDATEで分かりやすいですね。

日時型(タイムゾーン無し)はSQLServerとBigQueryがDATETIMEなのに対して、RedshiftがTIMESTAMP

日時型(タイムゾーン有り)は全てのデーターベースで異なります。

更にTIMESTAMPはRedshiftでは日時型(タイムゾーン有り)なのに対して、BigQueryでは日時型(タイムゾーン無し)となっています。

この時点で既にややこしくなってますね。

現在日時(日付と時間)の取得方法

次に、それぞれのデータベースで現在の日時(日付と時間)を取得する関数を見ていきます。 ここに挙げた関数以外もありますが、よく使う(であろう)ものを列挙しています。

SQL Server

GETDATE関数

戻り値の型:datetime

SELECT
 GETDATE()
;
------------
2020-12-09 08:20:17.645

BigQuery

CURRENT_TIMESTAMP関数

戻り値の型:TIMESTAMP

SELECT
 CURRENT_TIMESTAMP()
;
------------
2020-12-10 08:07:47.222776 UTC

括弧は省略可能です。 UTCで表示されます。 日本時間(JST)で表示させたい場合は以降の「日付型 → 文字型」を参照ください。

Redshift

SYSDATE関数

戻り値の型:TIMESTAMP

select
 sysdate
;
------------
2020-12-09T08:20:17.645728

GETDATE関数

戻り値の型:TIMESTAMP

select
 getdate()
;
------------
2020-12-09T08:20:17.645728

どちらもデフォルトではUTCが表示されるので、日本時間を表示したい場合はセッションのタイム ゾーン(デフォルトではUTC)を設定してあげる必要があります。

set timezone = 'Asia/Tokyo';
select
 sysdate
;
------------
2020-12-09T17:20:17.645728

 

set timezone = 'Asia/Tokyo';
select
 getdate()
;
------------
2020-12-09T17:20:17.645728

現在日付の取得方法

SQL Server

SQL Serverには単体で日付を取得する関数が無いので、GETDATE()で現在の日にち時刻を取得した後に、CONVERTを使ってdate型に変換してあげる必要があります。

SELECT
 CONVERT(date, GETDATE())
;
------------
2020-12-09

BigQuery

CURRENT_DATE関数

戻り値の型:DATE

SELECT
 CURRENT_DATE()
;
------------
2020-12-09

引数に何もして指定しないとUTCの日にちが返ってくるので、日本時間での日にちを取得する場合は、引数にタイムゾーンを指定してあげます。ここ注意ですね。

SELECT
 CURRENT_DATE("Asia/Tokyo")
;
------------
2020-12-09

Redshift

CURRENT_DATE関数

戻り値の型:DATE

select
 current_date
;
------------
2020-12-09

現在日時の取得方法まとめ

SQL Server BigQuery Redshift
日時 GETDATE() CURRENT_DATETIME() sydate
getdate()
日付 なし CURRENT_DATE() current_date

日付型 → 文字型

日付型のデータを文字列に変換する方法について記載します。

SQL Server

年月日(YYYY/MM/DD形式)

SELECT
 CONVERT(nvarchar, getdate(), 111)
;
------------
2020/12/10

年月日(YYYYMMDD形式)

SELECT
 CONVERT(nvarchar, getdate(), 112)
;
------------
20201210

年月日時分秒(yyyy-mm-dd hh:mi:ss.mmm (24h))

SELECT
 CONVERT(nvarchar, getdate(), 21)
;
------------
2020-12-10 16:34:37.837

年月日時分秒(yyyy-mm-ddThh:mi:ss.mmm形式(ISO8601標準))

SELECT
 CONVERT(nvarchar, getdate(), 126)
;
------------
2020-12-10T16:30:05.690

BigQuery

DATE型

FORMAT_DATE(format_string, date_expr)

指定されたformat_string(形式設定要素)に従ってdate_exprをフォーマットします。

DATE型でサポートされる形式設定要素

形式設定要素 説明
%Y 10 進数として表示される、世紀を含む年。
%y 10 進数(00-99)として表示される年。世紀は含みません。
%m 0 進数として表示される月(01~12)。
%d 10 進数として表示される、月内の日付(01~31)。
%F %Y-%m-%d 形式の日付。

年月日(YYYYMMDD形式)

SELECT
 FORMAT_DATE("%Y%m%d", CURRENT_DATE())
;
------------
20201210

年月日(YYYY-MM-DD形式)

SELECT
 FORMAT_DATE("%F", CURRENT_DATE())
;
------------
2020-12-10

TIMESTAMP型

FORMAT_TIMESTAMP(format_string, timestamp[, timezone])

指定されたformat_string(形式設定要素)に従ってtimestampをフォーマットします。

タイムゾーンを指定すると指定したタイムゾーンに変換されて表示されます。

タイムゾーン名

TIMESTAMP型でサポートされる形式設定要素

形式設定要素 説明
%Y 10 進数として表示される、世紀を含む年。
%y 10 進数(00-99)として表示される年。世紀は含みません。
%m 10 進数として表示される月(01~12)。
%d 10 進数として表示される、月内の日付(01~31)。
%H 10 進数で表示される時間(24 時間制)(00~23)。
%M 10 進数として表示される分(00~59)。
%S 10 進数として表示される秒(00~60)。
%F %Y-%m-%d 形式の日付。
%T %H:%M:%S 形式の時刻。
%Z タイムゾーンの名前。
%z 必要に応じて +HHMM または -HHMM の形式で示されるグリニッジ子午線からのオフセット。

年月日時分秒(yyyy-mm-dd hh:mi:ss.mmm (24h))

SELECT
 FORMAT_TIMESTAMP("%Y-%m-%d %H:%M:%S", CURRENT_TIMESTAMP(), "Asia/Tokyo")
;
------------
2020-12-10 17:13:13

Redshift

TO_CHAR (timestamp_expression, 'format')

日時形式の文字列

形式設定要素 説明
YYYY 4 桁の年数
MM 月番号 (01~12)
DD 日にちを数字表示 (01–31)
HH24 時 (24 時間制、00–23)
MI 分 (00–59)
SS 秒 (00–59)

年月日時分秒(yyyy-mm-dd hh:mi:ss.mmm (24h)) UTC

select
 to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS') 
;
------------
2020-12-10 08:13:13

年月日時分秒(yyyy-mm-dd hh:mi:ss.mmm (24h)) JTC

select
,to_char(convert_timezone('Asia/Tokyo', sysdate), 'YYYY-MM-DD HH24:MI:SS') 
;
------------
2020-12-10 17:13:13

文字型 → 日付型

SQL Server

datetime型

日付だけ指定した場合は時分秒は0時0分0秒となります。

SELECT
 CONVERT(datetime, '2020/12/10')
;
------------
2020-12-10T00:00:00

日付のスラッシュ(/)は省略することもできます。

SELECT
 CONVERT(datetime, '20201210')
;
------------
2020-12-10T00:00:00

時分秒を指定したい場合は日付の後にスペースを開けて、時:分:秒を付けます。

SELECT
 CONVERT(datetime, '2020/12/10 12:15:30')
;
------------
2020-12-10T12:15:30

BigQuery

DATE型

CASTを使います。 年月日は-ハイフンで区切ります。 2020/12/10のように/スラッシュで区切ったり、20201210のように区切らない場合はエラーになります。

SELECT
 CAST('2020-12-10' AS DATE)
;
------------
2020-12-10

TIMESTAMP型

日付だけ指定した場合は時分秒は0時0分0秒となります。 また、タイムゾーンUTCになります。

SELECT
 CAST('2020-12-10' AS TIMESTAMP)
;
------------
2020-12-10 00:00:00 UTC

時分秒を指定したい場合は日付の後にスペースを開けて、時:分:秒を付けます。

SELECT
 CAST('2020-12-10 12:15:30' AS TIMESTAMP)
;
------------
2020-12-10 12:15:30 UTC

タイムゾーンを指定したい場合は+09のようにUTCからの時差を指定します。

SELECT
 CAST('2020-12-10 12:15:30+09' AS TIMESTAMP)
;
------------
2020-03-10 12:15:30 UTC

Redshift

DATE型

TO_DATE (string, format)

引数には、変換したい文字列とそのフォーマットを指定します。

SELECT
 TO_DATE('2020/12/10', 'YYYY/MM/DD')
;
------------
2020-12-10

フォーマットの方法によって変換したい文字列の形式を指定できます。

SELECT
 TO_DATE('2020-12-10', 'YYYY-MM-DD')
;
------------
2020-12-10

こんなことでも大丈夫です。

SELECT
 TO_DATE('2020###12$$$10', 'YYYY###MM$$$DD')
;
------------
2020-12-10

CASTを使うこともできます。

SELECT
 CAST('2020-12-10' AS DATE)
;
------------
2020-12-10

TIMESTAMP型

日付だけ指定した場合は時分秒は0時0分0秒となります。

SELECT
 CAST('2020-12-10' AS TIMESTAMP)
;
------------
2020-12-10 00:00:00

年月日の区切りは/でも大丈夫です。

SELECT
 CAST('2020/12/10' AS TIMESTAMP)
;
------------
2020-12-10 00:00:00

区切り文字がなくても大丈夫です。

SELECT
 CAST('20201210' AS TIMESTAMP)
;
------------
2020-12-10 00:00:00

時分秒を指定したい場合は日付の後にスペースを開けて、時:分:秒を付けます。

SELECT
 CAST('2020-12-10 12:15:30' AS TIMESTAMP)
;
------------
2020-12-10 12:15:30

最後に

この記事では、SQL Server、BigQuery、Redshiftの3つのデーターベースを対象に、日付および時刻関連のデータ型についてまとめました。 日頃、私は上記のデータベースを使い分けている、それもおおよそ均等に使っているような状況なので、特に日付型の関数についてはよく迷ったりしています。 何度bigquery 日付 文字列 変換でググったことか。 今回、このように整理することで、迷ったときはこの記事を参照することで、少しでも生産性を高められたらと思っています。 記載したSQLについては、実際に実行した上で確認していますが、データーベースのバージョンの違いなどによってエラーになったり、そもそも間違っていたりする可能性もあるので、その場合はコメントなどに記載いただければ、修正や補足など入れていきたいと思っています。


株式会社エニグモ 正社員の求人一覧

hrmos.co