enigmoデザイナーの「最近気になるデザイントピック」- アプリUI編

こんにちは。デザイナーの別所です。
本日はenigmoデザイナーの、最近気になっているデザインに関するトピックをご紹介したいと思います。
デザイナーのメンバーで4回に渡って更新しようと思いますのでよろしくお願いいたします。

今回は「最近気になるデザイントピック アプリUI編」がテーマです!

心地よい使い勝手!映画レビューアプリ「Must」

「Must」は映画/TVファンのためのSNSアプリです。
作品のレビューを投稿したり、見たい映画や番組をコレクションとして保存できたり、友達と繋がって他の人がコレクションしている作品を見たりすることができます。
おすすめ作品をデイリーでレコメンドしてもらえたりもします。
2016年にリリースされたアメリカ生まれのアプリです。

f:id:enigmo7:20210603171725p:plain
引用:https://mustapp.com/

デザイナーのこだわりを感じるUI/UX

f:id:enigmo7:20210603171820p:plain Mustの一番の特徴は、画面の下半分の操作で完結するように設計されているUI/UXではないでしょうか。
レビューの入力や「観たい」「鑑賞済み」を選択するボタンなど、このアプリのメインアクションは全て画面の下部で操作できるため、「指が届かない」「両手に持ち替える」などのストレスがなくスムーズに操作できます。
昨今のスマホの様々な端末サイズに左右されないような設計がされているんですね!
ストアのレビューでも優れたUI/UXに対してのコメントが多く見られます。 (レビューでUIについて言及されるアプリって珍しい気がします!)

心地良いUIモーション

Mustは見た目の美しさだけでなく、モーションにもこだわりを感じます。
作品のサムネイルをタップすると、本棚から選んで取り出すような雰囲気のモーションがついてます。タップした時にサムネが少し小さくなるところも、ユーザーが押したことを認知させる表現で、細かい技が効いてます。
作品情報のモーダルは作品サムネイルにかぶさる様に画面下から表示されレイヤー感があるので、作品サムネイルと作品情報との優先順位を感じさせる見た目になっています。

テキスト要素をギリギリまで排除したシンプルさ

f:id:enigmo7:20210603173215p:plain
引用:https://mustapp.com/
海外アプリっぽいな〜と感じるのがテキスト要素の圧倒的少なさ。
基本的には作品名はテキストで表示させず作品画像のみとなっています。 このシンプルさが美しい作品画像をより際立たせてる気がします。 視認性担保のため小さい画像も無いところが、作品情報を認識しやすく操作しやすいです。

楽しい絵文字遣い

f:id:enigmo7:20210603173326p:plain

絵文字の使い方にシャレが効いていてオシャレです。
レビューを10段階で入力するときは、数字によって表情が変わります。 こういう小技で入力を楽しめる体験って良いですね。

海外のアプリなので日本語のレビューや、邦題作品はまだ少ないようです。
日本でももっと広まって欲しいです!

株式会社エニグモ 正社員の求人一覧

hrmos.co

コロナウィルスがやって来た!情シス奮闘記

こんにちは、Corporate IT/Business ITを担当している足立 です。

この記事は Enigmo Advent Calendar 2020 の 24日目の記事です。

エニグモでは1人目のコーポレートIT担当として未着手な社内IT環境をコツコツ整備してます。 世間的には1人情シスと呼ばれるポジションです。

今回は2020年に取り組んだ事を書きたいと思います。

2020年は何と言ってもコロナウィルス

1月後半から徐々に猛威が迫ってきて弊社も2月17日に対応・方針についてのMTGが予定されてました。
また、偶然にも同タイミングでSlackの全社チャンネルに 会社に対し何らかの方針、対応があるのかと言う内容の投稿がありました。
今思うと見えない恐怖・不安が爆発した日だと思います。

f:id:enigmo7:20201211162325p:plain
コロナウィルス

2月17日 緊急招集 MTG

2月17日 夕方 経営陣、監査役、部署長、情シスが招集されました。
そこで決まった方針は下記内容になります。

方針の大概要

①「在宅勤務」の推奨
②出社勤務の場合は「時差通勤」推奨と「社内感染防止ルール」の厳守
③本方針は翌営業日からのスタート
コロナウィルスの影響により、急遽全社的にリモートワークを一斉にスタートする事になりました。

情シスとしてのMISSIONは リモートワーク環境の構築

リモートワーク環境・・・

実はリモートワークを本導入してませんでした。
当初の予定だとオフィスが元々国立競技場が近いと言うこともありオリンピックに向けて
リモートワークを段階的に試験導入する予定でした。
また、コロナ禍前は情シスとしてリモートワーク開始について反対の立場をとっていました。

反対した理由

当時でもリモートワークで業務出来る環境でありましたが
いくつか課題があり中々、賛成する事は出来ませんでした。
代表的な理由としては・・・

  • 運用、緊急時のフローが無い
    リモートワークの運用フロー・ルールが構築されていない
    重大インシデント発生した際の緊急連絡フローが確立出来てない
  • ウィルス対策が脆弱
    コンシュマー版のウィルス対策ソフトを利用してましたが、リモートワークを実施するには強化すべき部分でした。 やはり管理コンソールが無い為、管理者として各端末の状況が把握しにくい部分が課題でした。
  • MDM(Mobile Device Management)導入して無い
    ウィルス対策の話と同様に端末の状況を一括で集中管理出来るソリューションが未導入

運用・緊急時のフロー等は社内で方針を決めるだけなので、直ぐに対応が出来る内容ですが
正直、セキュリティ周りをもう少し強化な環境に整えてから実施したいと言う思いでした。

真っ先にやった事 デスクトップ パソコンのリプレイス

緊急事態に付き兎に角、業務が出来る環境にすべくデスクトップPCのリプレイスに着手しました。 当然、パソコン無ければ仕事が出来ません。
当時、アルバイト・派遣社員が使用していた約20台程のWindowsデスクトップPCが
稼働中な為、直ぐにメーカーに連絡し、12~13台ずつ購入し3~4週間程で入れ替え完了させました。
キッティングに関し初めはイメージを作成しクローニングで対応も考えたのですが
作成・検証する時間も無かったのと、エニグモでは予めインストールするソフトも少ないので
人力による力技で対応しました。
2019年より少しずつノートPC化をしていたので、何とかなった感じです。

f:id:enigmo7:20201211170328p:plain
デスクトップPC リプレイス

WEB会議・問い合わせ対応

Zoomを導入しました。
実は運良くコロナ禍前から導入に向けて動いていました。
導入した理由としてはGoogle Workspace(旧称 G Suite)があるので
Google Meet利用出来ましたが、やはり大人数でのWEB会議はZoomの方が安定していたからです。
特にエニグモでは4Mと呼ばれる全体集会が月1回あるので、そこは重視しました。
また、仮想背景機能もあるので従業員のプライバシー保護の観点や
リモートコントロール機能もあるのが決定打でした。

Zoom導入前にトラブルサポート対応で解決までに3時間かかった事例があったので
この機能にはだいぶ助けられました。

Zoom対応ハードウェア DTEN導入

元々社長室・会議室にAppleTVが設置されておりプロジェクターを使用して
会議資料を投影してました。
MacはAirPlayを使用して安定的に投影出来ますが、Windowsの場合はAirParrot3と言う
シェアウェアを利用してましたが非常に動作が不安定と言う課題がありました。

プロジェクターを利用する際に部屋を暗くする不便さ、動作が不安定な課題を解決する為
Zoom対応ハードウェア DTENを1番大きい会議室と社長室に導入しました。 導入後はWindowsでも安定した投影が可能になりZoomRoomsアカウントで 無償アカウントユーザーでも時間無制限でWEB会議を利用出来るようになりました。
デメリットな部分としてはお値段が高いところでしょうか
個人的には気に入っているハードです。
当初は全会議室に導入しようとしていたので2台で止めといて良かったです。

f:id:enigmo7:20201211173357p:plainf:id:enigmo7:20201211173336p:plain
社長室と会議室

電話対応 fondesk モバイルチョイス050導入

コロナ禍と言えども会社へ電話が鳴る日々でした。
その殆どは業務と関係の無い営業電話や人材紹介の電話で月間300件程着信があり
主にバックオフィス部門にて全ての電話を受電し業務負荷が増え
電話対応する度に集中力が途切れて業務がままならない状況でした。
また、お客様の問い合わせについても迅速に対応出来ない事例も発生し
折返し対応するにも会社支給の携帯電話が無いので個人の携帯電話を利用する人も居ました。

そこで電話受付代行サービス fondeskを導入
導入後は会社代表電話番号をfondeskの番号へ転送し 転送先のオペレーターの人が一時対応し全て折返し対応となります。 対応内容はSlackにて投稿されるので、総務が内容毎に関係部署や個人へ共有する運用になりました。
導入後はオフィスが非常に静かになり、こんなにも効果があるものかと感心しました。

また、個人の電話対応として楽天コミュニケーションズのモバイルチョイス050を導入しました。
導入後は個人スマホに050番号を付与する事が出来るので、折返し対応時にプライベートの番号が伝わる事が無い様になりました。 メリットとしては簡単にBYODを導入出来る、データ通信ではなく音声回線を利用しているので音質が安定しているのが良い部分だと思います。

FAX対応 メール送信&Slack連携

電話の次に対応したのでFAXです。 レガシーなシステムと言えども銀行やカード会社のやり取りで使用する為、 何かしら対応が必要になりました。

弊社のFAXは複合機で送受信を行っているのですが 仕様を確認したところ受信したFAXをメール送信出来る機能があり直ぐに設定 メール送信されたものはzapierを経由しGoogleドライブ(共有ドライブ)へ保存され受信した際にSlackへ通知されるようにしました。

ただ、この構成で欠点があります。 複合機のFAXメール送信機能は用紙切れになるとメールが送信出来ない仕様でした。 ここは盲点でした。

f:id:enigmo7:20201211181155p:plain
FAXの構成

冒頭でも触れたウィルス対策

リモートワークを始めるにあたり絶対にウィルス対策(エンドポイントセキュリティ)の強化は最低限譲れないと思ってました。 そもそもWindowsNortonMacはESETとOS毎に違っている。 しかも、コンシュマー版を利用している状況 Excelでシリアル番号とインストール端末を管理すると言う運用

この状況を打破する為、CrowdStrikeを導入 念願のEDRを導入する事が出来ました。 展開が終わった時は達成感が凄かった・・・・ 何と言っても管理コンソールの存在に感動、アラートが飛んでくる頼もしさ、勝手にアンインストールされない安心感はプライスレス。

ちなみに導入直後に管理コンソールにログイン出来ない事象がありました。 原因はセカンダリ環境にCrowdStrikeが構築されログインURLが通常と違っていたのが原因でした。
(後日談としてESETのアンインストールを社内にアナウンスしたら、みんな光の速さで削除してました・・・)

まとめ 2021年へ

その他にも実施した事もまだあるのですが、今回書くことが出来ないのでいつかタイミングあれば書いてみたいと思います。 情シス視点での福利厚生制度を作ったり社内イベントをやったりしました。

今後リモートワークを核としたワークスタイルへの移行にむけてオフィスをリニューアルする予定で、 そこに向けて色々と動いています

  • サーバールーム移設対応
  • 社内ネットワーク対応(有線・無線)
  • 電話回線周り(光収容化・Dialpad導入)
  • インターネットFAX導入
  • 受付システム導入
  • 入退出システム導入
  • 座席予約システム導入
  • AV機器周り対応

また、4月にokta導入が決定しました! 出来たら来年こそはMDMを構築したいと思ってます。

では、以上になります。 ありがとうございました。

明日の記事の担当は 出品審査担当 の 杉山 さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

リモートワーク文脈で超低コスト擬似DNSを社内で提供しました。

こんにちは、インフラチームの加藤です。

この記事は Enigmo Advent Calendar 2020の23日目の記事となります。
本記事では、リモートワーク環境のため、擬似DNSを社内提供したお話をします。

エニグモでは、今年の2月頃から全社的にリモートワークを開始しました。
それに伴いインフラチームでは、リモートワークのネットワーク周りの対応を行いました。


エニグモが運用しているサーバ群

エニグモの運用するサーバは、データセンター内に構築したものとAWSのものがあります。
情シスの足立さんが、SaaS導入を進めて下さったためオフィス内にサーバはほぼありません。

サーバへの疎通経路

オフィス・リモート環境共にVPN経由(+ファイアウォール)で、サーバ群へアクセス可能です。

f:id:enigmo7:20201221131920p:plain


リモートワーク開始後のサーバアクセスの問題

リモートワーク開始直後から、ネットワーク設定に関するお問い合わせと、インフラチームの対応が発生しました。VPNWifiの設定など、個々人のネットワーク設定の問題は、一度解決すれば再発することは滅多にありませんでしたが、名前解決は、何度もお問い合わせが頻発する厄介な課題でした。

なぜ、名前解決がリモートワークの課題だったのか

コロナ禍以前
エニグモでは、オフィス環境からサーバへのアクセスを楽にするため、ファイアウォールでオフィスのグローバルIPを許可し、非エンジニアスタッフの使用するサーバへはサーバのグローバルIPを指定してアクセス可能にしていました。

f:id:enigmo7:20201221132000p:plain

名前解決がリモートワークの課題となった原因
リモート環境からサーバへは、VPN経由でサーバのローカルIPを指定してアクセスする形でした。そのためオフィスとリモート環境では、サーバのアクセス情報が異なりました。

整理すると、以下の形になります。

職場 アクセス元のIP アクセス先のIP
リモート環境 スタッフのお家 サーバのローカルIPを指定
オフィス オフィスのGIP サーバのグローバルIPを指定

リモートワーク開始直後は、採用や営業活動、検品作業のため出社が必要なスタッフも居て、シフトで出社日を回していました。出社したスタッフは、Hostsを社内用に修正せねばならず、名前解決関係のお問い合わせが継続していました。DNSを導入すべきでしたが、工数がかかり難しいところでした。

f:id:enigmo7:20201221132042p:plain

名前解決の課題を解決

Hostsを一元管理できないかと思案していたところ、SwitchHosts!を発見し利用することにしました。

f:id:enigmo7:20201221132126p:plain

SwitchHosts!とは

Hostsファイルのサーバ管理が可能になる、端末のアプリです。
開発者は、サーバからHostsファイルをダウンロードして端末で使用する形となります。

f:id:enigmo7:20201221132146p:plain

SwitchHosts!のよかったところ

  • 無料公開されているアプリケーションだったこと。
  • 配布元にリモートサーバが使えて、Hostsファイルを集中管理できたこと。
  • Mac/Windowsでも、同じUIで使えてサポートし易かったこと。
  • HostsのパーツごとのON/OFFができ、localでHostsを修正できて組み合わせられること。
  • 端末のHostsのバックアップが取れること。 f:id:enigmo7:20201221132237p:plain

SwitchHosts!を使用した、擬似DNSの運用

ユースケースに合わせたHostsファイルを作成し、データセンターにNginxコンテナを立てて配布できるようにしました。 これにより、オフィスでもリモート環境でも、簡単にDNSの機能を提供できるようになりました。

f:id:enigmo7:20201221132221p:plain

SwitchHosts!導入後の課題と解決方法

SwitchHosts!の導入後、名前解決が原因となる問題はさっぱりとなくなりました。
しかし「AWS環境における開発に伴い、Hostsの修正が都度必要になる」という課題が残りました。

  • 原因は、以下2つでした。
    • スポットインスタンスのマシンが再作成された際、Hostsの修正が必要だった。
    • AWS環境に、日々サーバが増設されるため、Hostsに追記が必要だった。

一時は、ダウンロード元のHostsファイルを手動で修正していましたが、無駄な作業でした。
そこで、Hosts情報の更新スクリプトを作成し、Hostsファイルの自動更新を実現しました。


#AWSのマシンには、${Prefix}タグと${Name}タグに、
#マシンのAZやサイト環境を命名規則として持たせています。
#!/usr/bin/env ruby

require 'aws-sdk-ec2'

ec2_client = Aws::EC2::Client.new()

#ARGVに、${Prefix}タグの値(マシンのAZやサイト環境を命名規則としたもの)を持たせています。
Prefix_list = ARGV

f = { filters:[{ name: "tag:Prefix", values: Prefix_list }]}
reservations = ec2_client.describe_instances(f).inject([]) do |s,list|
  s += list.reservations
end
instances = reservations.each.inject([]) do |s,list|
  s += list.instances
end

instance_list = []
instances.each do |e|
  instance_list << {
    :instance_id => e.instance_id,
    :vpc_id => e.vpc_id,
    :public_ip => e.public_ip_address,
    :private_ip => e.private_ip_address,
    :name_tag => e.tags.find{|t| t.key == "Name"}.value,
    :prefix_tag => e.tags.find{|t| t.key == "Prefix"}.value.downcase,
  }
end

instance_list.each do |e|
  hostname = e[:name_tag].downcase

  #あとは、環境に合わせてよしなに加工してHostsファイルに出力します。

end

まとめ

DNSの機能を低コストで提供できてよかったです❗️
VPNへの依存度も減らしたいので、Google IAPを検証中です。
同じインフラチームの先輩社員 山口さんが、記事にされていました。
old schoolerなネットワークエンジニアがIAP Connectorを試してみた - エニグモ開発者ブログ

最後に

明日の記事の担当は、情シスの足立さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

今年組織作りに貢献するためにやったこと

今年組織作りに貢献するためにやったこと

こんにちは。アプリケーション開発グループの穴澤です。

この記事は Enigmo Advent Calendar 2020 の 22 日目の記事です。

今年の振り返りとともに主に自分が去年から今年にかけて中途で入社した方にやってきたことを書いておきます。 これからチームに新しい人を迎える人や、中途採用入社する方、チームの組織づくりに興味がある方に読んでいただければとおもいます。

実際に取り組んだこと

  1. オンボーディング資料
  2. 開発手法、フロー・チェックリストの読み合わせ
  3. 1on1

1.オンボーディング資料

f:id:enigmo7:20201209113806p:plain

エニグモでは部署問わず、今年から中途で入社した社員向けのzoomでのオンボーディングが開始されました。 簡単にいうと、「各事業部が具体的になにをやっているか」「どんな取り組みをしているか」を中途入社した方に説明します。 入社された人は部署ごとに数回に分けて参加しています。

思えば今年の早い段階からリモートワークが推奨された状況で、中途入社問わず、自分の所属する部署以外の方と顔を見合わせて話をする機会というのが極端に減りました。例えば「先月入社した方ですね、XX部署のXXです!」といったようなコミュニケーションも、廊下やエレベータで顔をあわせたり、ミーティングの終わりですれ違う、お手洗いで顔を合わせるなどのタイミングで行われてきました。それぞれを取るととても些細なことですが、これらを積み重ねた何かが会社の社風や文化を醸造する一部になっていたんだなと感じます。

社風や文化をオンラインでも実現するために、このオンボーディング時に自分が紹介する担当として実践していることは以下の点です。細かい箇所は割愛しますが、リモートワークならでは、でしょうか。

  • 説明資料に自部署の社員の氏名とSlack名の紐付け(顔とアイコンと名前とSlack名が一致しない問題)
  • アプリ、インフラ、データ基盤、それぞれについての相談相手としてまずは覚えておいてほしい人をエピソードを添えて紹介(そんなに色々紹介されても覚えられない問題)
  • BUYMAの仕組みで聞きたいこと、相談したいことがあったら、ここで相談してね、のチャットの紹介(何かを相談したくなってもどこだかわからない問題)
  • 参加してくれた方とサービスエンジニアリング本部との具体的な関わり(紹介してもらったけど具体的にどういう関わりがあるんだろう問題)

2.開発手法、フロー・チェックリストの読み合わせ

f:id:enigmo7:20201209112322p:plain

BUYMAというサービスが産声を上げて10年以上たちます。尖った技術を使っているところもあれば枯れているところ、 やや温かみのある仕組みのところもあり、中途で入社するエンジニアの方が躓くところもあるため、主に若手の方対象に週1回程度開発やリリースの際に 気をつけてほしい点をesaにまとめたフロー・チェックリストの読み合わせを行いました。

基本的に開発に関わる問題点は、所属しているチームのリーダーやメンターが答えてくれますが、彼らに一点集中よりも、誰にきいても答えてもらえる、誰にきいても大丈夫、という雰囲気作りに自分も貢献したかったからです。ましてや、自分の所属するチーム以外のエンジニアと、躓きや悩みを共有する場所はいくつあってもいいと考えています。 時には検索エンジンチームのエンジニアをゲストに迎えて検索周りの説明をしてもらったり、チェックリストについては実際トラブルに遭遇したエンジニアに「なぜこのチェック項目があるのか」というのを語ってもらいました。

3.1on1

1on1は前職でも取り組んできたことですが、今年特に自分が取り組んできたことをあげます。

アジェンダを用意する

序盤はなかなか用意できませんでしたが、後半は実施の1,2日前にいくつかアジェンダを用意するようにしました。 4,5つ用意してその中のトピックで話が広がれば盛り上がり、割愛してできるだけ「話が盛り上がる」「相手の指向性、興味関心がわかりそう」な所を意識してすすめていきました。

1on1は「・・・最近どうですか」というやり方もありますが、1on1を実施するメンバーの身になって考えてみると 「なにを話せばいいんだ」「なにを言われるのか」と身構えられてしまうこともあります。明日はこういう事を話そう。を用意する方が 時間も有意義ですし、もしネガティブな話をされるということが事前にわかっていれば、自分が言いたいことも事前に用意するなど、お互いに準備ができ、結果的に少しの準備で得られる物が大きいと感じました。

会社全体の動きがわかるトピックをアジェンダに混ぜて話す

メンバーの仕事や取り組みと直接関係のない、会社の動きや施策。最近の売上。他部署に入社した人のこんなところが凄いなど。 会社の中での私達の部署の動き、施策以外の取り組みや課題など、自分と会社をつないでいる環境にどんなものがあり、どんな動きをしているか。中途入社の序盤だと、見えにくい組織の動きを定期的にアジェンダに混ぜて話をするようにしています。

  f:id:enigmo7:20201209112739p:plain

まとめ

今年は、いつもやっていること+αの「工夫する」注力をしました。特に、コロナ禍のリモートワークで感じやすい「孤独感」を和らげるための「皆さんは一人ではないですよ」ということを伝えるために、様々なできることを模索した年だった様に思います。取り組みの結果を定量的にはかることはできないので難しいですが、フィードバックを受けながら、来年も少しずつ改善していきたいと思います。 よいお年を。

明日の記事の担当は インフラグループ の 加藤 さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

データアナリストが転職活動で求めたこと

こんにちは、データアナリストの田中里澄です。
エニグモではデータ活用推進室という部署に所属しており、主に他部署が行った施策の効果検証を担当しています。

私は2020年11月にエニグモに入社したので、今回はその転職活動の中でどうして今の職を選んだのか、また入社後どのような仕事をしているのかを紹介できればと思います。
前職はライブ配信サービスを運営している会社で、同じくデータアナリストとして働いていました。
転職理由について前職に対してネガティブなことは一切なく(むしろ今も大好きな会社です)、今の自分は別の会社で経験を積んだ方がいいと判断したためです。

なぜこの内容を書くのか?

理由は社外の方向けと社内の方向けでそれぞれあります。

  • 社外の方にはデータアナリストの転職活動の参考にしてもらいたいため

  • 社内の方にはまだ入社して日が浅い私の自己紹介になるためです。転職活動だけでなく、自分がどんな仕事をできるか/したいかについても書いています。

そもそもデータアナリストとは?

ここであくまで自分の考えにはなりますが、データアナリストとはどんな仕事をするのか?を説明します。
読んでくださっている方に知っていただきたいですし、私が転職活動をしたときにも実際に自分の中で整理しました。
その整理のおかげで、無数にある求人(特にデータ〇〇という職種は最近多い)の中でピックアップする際や、面接時に自分がしたいことを説明する際に役立ちました。

データアナリストがどんな仕事をするのかを一言で表すと、「意思決定に必要な情報/インサイトを提供する仕事」だと思っています。
データを分析する仕事じゃないの?という意見もあるかもしれませんが、分析はあくまで手段であり意思決定をすることが目的だと考えているのでこのような書き方をしました。
もしかするとコンサルという職種でイメージされる仕事に近いかもしれません。(余談ですが、思考法を勉強するためにコンサル向けの本を読むこともあります)
逆にいうと、こういった仕事をする人のことを「データアナリスト」と定義している会社が多いとも言えます。(本質的にはこちらが正しいかもしれません)

どんな会社を求めていたのか?

上記で書いたデータアナリストとしての仕事ができることはもちろんですが、さらにそれを具体化した自分のしたい仕事について2点、またそれに関連した仕事の環境について2点合わせて4点求めていることがありました。

どんな意思決定に携わりたいのか

意思決定にも長期的な経営に関するものや日々のオペレーションに関するものなど色々な種類があります。
その中でも私はプロダクトの改善に関する意思決定に携わりたいと考えていました。
なぜならユーザーに喜んでもらえることに直結して実感しやすく、また分析するなかで人がどんな考えで行動するのかを垣間見ることができるためです。

どんなデータを分析するのか

データは大きく分けると定量と定性の2種類に分けることができますが私はその両方を用いて分析したいと考えていました。
特に定量的なデータだけを扱うことが求められる場合もありますが、ユーザーの行動理由を知るためにはそれだけでは分からないことがたくさんあるためです。

他の職種(特に意思決定者)との連携が取りやすい

上の方で書いていたデータアナリストの仕事内容を遂行するためには他の職種の方との連携が大切です。
その中でも特に最終的に意思決定をする方との連携がとても大事だと考えていました。
分析をする前にはどんなことが分かれば意思決定ができるかを摺り合わせ、分析後にはそこで分かったことを共有し議論することで意思決定に付き添うためです。

分析をするためのデータ基盤が整っているか

整っているという言葉が曖昧ですが、例外はあれど日々の分析をする際の前処理やデータの準備に自分の持っているスキル以上のことを求められないことが基準でした。
もちろん後々はデータ基盤の構築などのスキルも伸ばしていきたいのですが、それをすぐに求められて本当にやりたいことに時間が使えないのはしんどいので求める要件に入っていました。

実際に入社してみて感じたこと

最終的にエニグモ社に決めた理由としては、上記の求めていることを全て満たせていると選考中や内定をいただいた後の面談で感じたからです。
ここでは入ってみて実際に感じたことを紹介します。

  • 分析結果を共有する際に、集計結果だけでなく考察やネクストアクションも求められまた歓迎されていい。
  • 求めていた意思決定者との連携について、非常に取りやすいので分析の方針を立てやすい。
  • データ分析基盤について、基本的に必要なデータがBQにあって扱いやすい。ただし、それぞれのテーブルの定義などがわからないところやGAのデータについての扱い方を調べる必要があったりと、データはあるが扱い方は聞いたり調べたりが必要なので、今後入ってくる方のためにまだまだ整える余地がある。

入ってからどんな仕事をしているか

主に2種類の施策の振り返りとアプリの分析方針を立てる仕事をしています。

施策についてはどちらも私が入社する少し前から始めていたもので、毎月改善を加えてやっていこうという前提のものでした。
なので振り返りでどんな数字を見るのか?というところから任せてもらい、翌月の改善点の提案を出すところまで行っています。

アプリの分析方針については、私が入社するまでは売上というKGIを追いつつ、KPIとして何を追うべきかについて試行錯誤をしていたという状況です。
そこで今後追うべきKPIについて決める仕事を担っており、今は私の提案を元に議論しています。

総じていうと、現在自分のしたいと思っていた仕事ができており今後もまだまだやりたいことが溢れています。
コロナ禍で入社後のコミュニケーションは以前と比較すると難しいなとは感じておりますが、より一層迅速で納得感のある意思決定に貢献できるように日々の業務に努めていければと思っています。


株式会社エニグモ 正社員の求人一覧

hrmos.co

Apache Airflowで実装するDAG間の実行タイミング同期処理

こんにちは。
今年4月にエニグモに入社したデータエンジニアの谷元です。
この記事は Enigmo Advent Calendar 202020日目の記事です。

目次

はじめに

コロナ禍の中、皆さんどのようにお過ごしでしょうか。 私はリモートワークを続けてますが、自宅のリモートデスクワーク環境をすぐに整えなかったため、薄いクッションで座りながらローテーブルで3ヶ月経過した頃に身体の節々で悲鳴をあげました。猫背も加速...

さて、エニグモでの仕事も半年以上経過し、データ分析基盤の開発運用保守やBI上でのデータ整備などを対応をさせていただいてますが、今回は社内で利用されているAirflowの同期処理の話をしたいと思います。

尚、こちらの記事を書くにあたり、同環境の過去記事 Enigmo Advent Calendar 2018 がありますので、良ければそちらもご覧ください。

そもそも同期処理とは?

例えば、以下のようなデータ分析基盤上でのETL処理があったとします。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
処理2. DWH上のデータを加工後、外部へCSV連携する

上記の処理は順番に実行されないと必要なデータを含めた抽出ができなくなってしまい、外部へデータ連携できなくなります。そのためには処理1の正常終了確認後に処理2を実施するという、同期処理を意識した実装をする必要があります。

例だとシンプルですが、業務上では徐々にETL処理も増えていき、複雑化していきます。 こうした同期処理は、初めはなくても問題にならないことが多いのですが、徐々に処理遅延が発生してタイミングが合わなくなったり、予期せぬ一部の処理エラーが原因で関連する後続処理が全て意図せぬ状態で動き出してしまったりします。そうならないためにも、設計時点で同期について意識することが大事だと思います。

Airflowによる同期処理

Airflowでは ExternalTaskSensor を使用することで実装可能となります。 Airflowのソース上ではdependという用語が使われているので、Airflowの世界では「同期」ではなく、「依存」と呼んだ方が良いのかもしれません。適弁読み替えていただければ...

では、Airflowでの同期検証用サンプルコードを作成してみましたので、実際に動かしながら検証したいと思います。尚、Airflowバージョンは1.10.10となります。

先ほどのETL処理を例にして、下記の内容で検証してみます。 実際のファイル出力やDBへのロード処理などはここでは割愛して、DummyOperatorに置き換えてます。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
  dag_id: sample_db_to_dwh_daily
  schedule: 日次16:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

処理2. DWH上のテーブルを用いてデータ加工後、外部ツールへCSV連携する
  dag_id: sample_dwh_to_file_daily_for_sync_daily_16
  schedule: 日次17:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

検証時のコード

まずは処理1のサンプルコードになります。

from airflow.models import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='sample_db_to_dwh_daily',
    start_date=datetime(2020, 12, 1),
    schedule_interval='0 16 * * *'
)

tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

for table_name in tables:

    db_to_dwh_operator = DummyOperator(
        task_id='db_to_dwh_operator_%s' % table_name,
        dag=dag
    )

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed'
    )

    db_to_dwh_operator >> terminal_operator

次に処理2のサンプルコードです。

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta


tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

def _dwh_to_file_operator(dag,
                         table_name,
                         depend_external_dag_id,
                         depend_external_dag_execution_delta):

    dwh_to_file_operator = DummyOperator(
        task_id='dwh_to_file_operator_%s' % table_name,
        dag=dag
    )

    sensors = []
    sensors.append(ExternalTaskSensor(
        task_id='wait_for_sync_db_to_dwh_%s' % table_name,
        external_dag_id=depend_external_dag_id,
        external_task_id='terminal_operator_%s' % table_name,
        execution_delta=depend_external_dag_execution_delta,
        allowed_states=[State.SUCCESS, State.SKIPPED],
        dag=dag))

    for sensor in sensors:
        sensor >> dwh_to_file_operator

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed')

    dwh_to_file_operator >> terminal_operator


args = {
    'start_date': datetime(2020, 12, 3)
}

dag = DAG(
    dag_id='sample_dwh_to_file_daily_for_sync_daily',
    default_args=args,
    schedule_interval='0 17 * * *'
)


for table_name in tables:
    _dwh_to_file_operator(
        dag=dag,
        table_name=table_name,
        depend_external_dag_id='sample_db_to_dwh_daily',
        depend_external_dag_execution_delta=timedelta(hours=1)
    )

サンプルをAirflow画面で見ると?

上記2つのDAGをAirflowのWebUIで見ると以下のようになります。

f:id:enigmo7:20201211201546p:plain

同期処理をするoperatorは「wait_for_sync_db_to_dwh 」です。図でも ExternalTaskSensor のアイコンになってますね。 今回は3テーブルあり、それぞれ並列処理を想定しているため、3つあります。 そして、それぞれ、該当するテーブルのterminal_operator処理完了を確認後、後続処理が実行されます。

では上記の同期処理がAirflowのログでどのように表示されるか見ていきたいと思います。

同期遅延なし時のAirflowログ

まずは、既に sample_db_to_dwh_daily が完了していた場合です。 3テーブル毎のログを載せておきます。

TABLE_DAILY_1

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,833] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,840] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,841] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,841] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,841] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,848] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,851] {standard_task_runner.py:53} INFO - Started process 48063 to run task
[2020-12-09 02:00:09,905] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,921] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,924] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,834] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,834] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_2

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,831] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,840] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,840] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,846] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_2> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,848] {standard_task_runner.py:53} INFO - Started process 48062 to run task
[2020-12-09 02:00:09,902] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,916] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_2 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,918] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,921] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_2, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,832] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,832] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_3

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,829] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,837] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,837] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,844] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_3> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,847] {standard_task_runner.py:53} INFO - Started process 48061 to run task
[2020-12-09 02:00:09,901] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,915] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_3 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,917] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,920] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_3, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,831] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,831] {local_task_job.py:103} INFO - Task exited with return code 0

注目して欲しい行は下記となります。
※ 他テーブルのログも同じですので、ここから先は記載を割愛します

[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...

これが同期処理のログになります。 今回は既に正常終了していたため、1回の確認しか行われておらず、その後、後続処理も含めて正常終了してることが分かります。

同期遅延あり時のAirflowログ

次に前提のタスクが遅延して正常終了した場合のAirflowログもみていきたいと思います。

~~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08 17:00:00
~~~
[2020-12-10 02:00:04,174] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,182] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-10 02:00:04,182] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,189] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-08T17:00:00+00:00
[2020-12-10 02:00:04,192] {standard_task_runner.py:53} INFO - Started process 21683 to run task
[2020-12-10 02:00:04,245] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-10 02:00:04,259] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:01:04,322] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:02:04,385] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:03:04,448] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:04:04,511] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...

(省略)

[2020-12-10 10:27:29,163] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:28:29,226] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:29:29,273] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:30:29,298] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,360] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,363] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-10 10:31:29,366] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201208T170000, start_date=20201209T170004, end_date=20201210T013129
[2020-12-10 10:31:33,698] {logging_mixin.py:112} INFO - [2020-12-10 10:31:33,698] {local_task_job.py:103} INFO - Task exited with return code 0

先ほどのPoking行がログに表示され続けていることが分かります。今回は途中でタスクを手動で動かして正常終了させました。

尚、Pokingの間隔はExternalTaskSensorの引数でpoke_interval を設定すると変更ができました。ソースを見るとデフォルトは60秒のようです。ログとも一致してますね。

同期タイムアウト時のAirflowログ

業務では非機能要件も大事だと思います。 Airlfowでは、ExternalTaskSensorの引数でtimeoutがあるようです。 こちらでタイムアウト(default: 1週間ですかね、長っ)を適切に設定して、エラー検知をしてみたいと思います。

sample_dwh_to_file_daily_for_sync_hourly/wait_for_sync_db_to_dwh_TABLE_HOURLY_1/2020-12-09T17:00:00+00:00/4.log.
[2020-12-11 18:21:14,539] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,551] {taskinstance.py:880} INFO - Starting attempt 4 of 9
[2020-12-11 18:21:14,552] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,563] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_HOURLY_1> on 2020-12-09T17:00:00+00:00
[2020-12-11 18:21:14,568] {standard_task_runner.py:53} INFO - Started process 84130 to run task
[2020-12-11 18:21:14,625] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-11 18:21:14,639] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:22:14,701] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:23:14,764] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:24:14,828] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,892] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,898] {taskinstance.py:1145} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
    raise AirflowSensorTimeout('Snap. Time is OUT.')
AirflowSensorTimeout: Snap. Time is OUT.
[2020-12-11 18:25:14,899] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=sample_dwh_to_file_daily_for_sync_hourly, task_id=wait_for_sync_db_to_dwh_TABLE_HOURLY_1, execution_date=20201209T170000, start_date=20201211T092114, end_date=20201211T092514
[2020-12-11 18:25:15,070] {logging_mixin.py:112} INFO - [2020-12-11 18:25:15,070] {local_task_job.py:103} INFO - Task exited with return code 1

hourlyに変更して実施したログですが、「Snap. Time is OUT.」と良い感じ(?)にエラーとなってくれました。 業務上では、お目にかかりたくないログ内容ですが...

他にも色々あるようですが、この辺りを抑えておけば基本的な使い方は抑えたことになるかと思います。

所感

実際にAirflow同期処理をやってみて思ったのですが、

  • 同期を取る対象のoperatorをどれにするか
  • 同期を取るoperatorの時刻差はどれだけあるか
  • 非機能要件にあったタイムアウト設定で適切にエラーで落とそう
  • 複雑になるとDAG間の関係性がわかるグラフをWebUIでサクッとみたい

が、気になりました。

「同期を取る対象のoperator」は「terminal_operator_<テーブル名>」としました。
ダミー処理なので冗長にはなってしまいますが、各並列処理毎に加えておいたほうがDAG修正時の同期影響を意識しなくて良くなるのかなと思ったためです。

また、DummyOperator利用時に 「trigger_rule='none_failed'」の引数を付け加えないと、先に実行されたケースもありましたので注意が必要そうです。

「同期を取るoperatorの時刻差」ですが、何度も繰り返してると混乱してしまう可能性もあるので、テストでも十分に気をつけて対応しないといけないですよ(to自分)。実際の業務でもtimedeltaで指定する時刻に誤りをテストで見落としてしまい、その結果、リリース後に同期処理が想定時間通りに終わらず遅延してしまい、外部連携のタイミングに間に合わなくなり問題になってしまいました...。

今回の検証例は、お互いdaily実行だったのですが、頻度が異なると慎重な対応が求められそうです。 ただ、この辺の制御はAirflowのコアな制御なので、今後、利用者が意識しないで済むようにdag_idとオプション引数で渡したら、同期を取ってくれるような機能があると良いなとも思いました。利用頻度が多くなると、そういった実装も検討しないといけないのかもしれません。

あとは、WebUI上にてDAGの関係性がグラフで見れると、意図したoperatorで同期が取れているかの確認がやりやすいと思いました。 見方が分からないだけなのか、未実装なのか把握できてないのですが、WebUIをみる限り今回のAirflowのバージョン1.10.10ではなさそうです。

最後に

Airflowでの同期処理について少しでも伝わればと思い、このテーマで記事を書いてみました。 本記事を通して少しでもイメージを掴めて頂けますと幸いです。 他にも面白そうな機能はあると思いますので、また、機会があれば投稿したいと思います。

私からは以上となります。最後までお読み頂きありがとうございました。

明日の記事の担当はエンジニアの高山さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

マニフェストファイルに思いを馳せる

こんにちは。BUYMAの検索やデータ基盤周りを担当している竹田です。
この記事は Enigmo Advent Calendar 2020 の19日目の記事です。

エニグモに入社してGCPAWSといったクラウドサービスを利用することが多くなり、日々刺激を受けながら業務に従事しております。
その中でもKubernetesのようにシステムを「宣言的」に定義するモデルに技術進化の恩恵を感じており、自分の体験も踏まえて、クラウドサービスでKubernetesを一般利用するに至るまでどういう歴史的経緯があるのかを辿ってみたくなりました。
(実際Kubernetesで編集するファイルも「マニフェスト(≒宣言)ファイル」といいますね)
なお、記事内容には主観を含む部分や、内容を簡素化するため端折っている部分もありますので、あらかじめご了承ください。

chroot

まずはKubernetesで管理されるコンテナの歴史を探ってみたいと思います。
コンテナ技術の前身は1979年に登場した chroot と言われています。
chrootで特定階層以下をrootディレクトリとすることで、システムとの分離を実現することができます。
変更したディレクトリ配下には動作中のOSと同じディレクトリ構造を実体として持つ必要があり、利用シーンとしてはsshログインしたユーザにディレクトリ移動の制限をかけるような場合でしょうか。個人としてはあまり使った記憶はありません。
少なくともリソースの制限を設けることやディレクトリ階層をイメージのような形で持つことはできませんでした。

コンテナのベース技術

コンテナのベースとなる技術は2000年にFreeBSDから発表された FreeBSD jail という機構です。 カーネルに手を加えてOSレベルでの仮想化機構を実現しており、ファイルシステムやネットワークなどの分離ができるようです。
このFreeBSD jailは知らなかったのですが、当時は業務で Solaris を利用しており、すでに2005〜2006年頃にはSolarisコンテナという概念が出てきていたことを記憶しています。

また、この頃はまだあまりLinuxは表舞台には出てきていませんでした。10数年前当時は商用製品であることが重視されていたと思います。
当時のLinuxSolarisなどの商用UNIXと比較すると

  • OSとしての安定性が高くなかった
    • OOM Killerに悩まされたこともしばしば
  • 比較的スループットが重視されていた
    • とりあえずタスク間がフェアじゃなかった
  • リソース管理やデバッグ面が弱かった など

こういった背景もあり、ミッションクリティカルなシステムでは商用UNIXを選択するのが一般的でした。

ただ、2000年台初頭から、OSS(Open Source Software) が台頭してきました。
開発者のニーズにマッチしていたことや、 Red Hat社 のようなビジネスモデルを確立できたことなどが、OSSを後押しした背景にあると思います。

ちなみにコンテナが出てくるまでの仮想化技術の筆頭は VM(VirtualMachine) ですが、本記事では割愛します。

cgroups

Linuxにおけるコンテナは cgroups が根幹となっています。
カーネルの機構によりプロセスをグループ単位にして、そのグループ内でリソース配分や制限を行う技術です。
少しだけcgroupsには関わっていたこともあり、Linux商用UNIXに追いつきそうだなと感じていた頃でした(おそらく2008〜2009年頃)。
ただし、設定方法が特殊であり、一般利用するにはかなり難易度が高いものでした。

この後、cgroupsを管理できる LXC というソフトウェアが出ています。
ポータビリティ性が低かったのか、使い勝手が良くなかったのか、、なぜかあまり脚光は浴びていませんね。

Docker

言わずと知れたDocker社が開発したコンテナを管理するソフトウェアです。
自分の印象では利用者がcgroupsを意識することなく利用でき、それをコードベースで管理できる柔軟なラッパー、、という解釈です。
ざっくりと表現するとLinuxでは以下のイメージです。
f:id:enigmo7:20201214004444p:plain
近年のCPUパフォーマンスやSSD等によるI/Oパフォーマンスの向上、かつVMよりも遥かに軽量でポータビリティ性が高く、簡素に利用できるということもあり、主に開発者の間で一気に広まったように思います。

Kubernetes

Google社が2013年に発表した コンテナオーケストレーションツール です。
この当時すでに大量のシステムをコンテナ化していた事実に衝撃を受けたのを覚えています。
kubernetesは紆余曲折ありましたが、現在の標準になっているものと思います。
宣言的な記載により、ブルーグリーンデプロイメントやローリングアップデートが非常に簡素に実現できるようになりました。

  • 宣言的
    • システムがどういう状態にあるべきかを記述する
    • 問題があった場合もその状態になるよう再構成する
    • 内部アプリの修正やバージョンアップは、修正適用済みコンテナに置き換える

解釈は難しいです。今こうある状態が大事、というニュアンスでしょうか。
対義語としては命令的、ということなので、こうしてああしたらその状態になる、というニュアンスですかね。

思えば、トラブル発生の際はシステムを修復・復旧させることが一般的でした。
ステートレスシステム(状態維持が不要なシステム)では、もはや宣言的アプローチが一般的になっていると思います。

コンテナ、およびKubernetesのサービス利用

コンテナ技術は開発者にとっては非常に都合の良いものでしたが、実サービスでの利用では、となると敷居が高かったように思います。
自分の当時の立場で記載すると、以下のような理由からでした。

  • 枯れた技術ではない
  • リソース分割はVMで十分満足できている
  • Kubernetesをオンプレ環境で管理するのはハードルが高い
  • 新技術を使いたいという理由では上長(ひいては経営層)を納得させられない

クラウドサービスによるサポート

クラウドサービスでのサポート開始により、サービス利用の敷居が一気に下がったものと思います。
Google GKE、Amazon EKS、Microsoft AKS などが挙げられます。

コンテナオーケストレーションでは大量のシステムを管理・運用することに長けており、クラウドサービスの柔軟性と非常に相性が良いと思います。

  • マネージドなKubernetesにより管理を簡素化できる
  • サポートを受けられる
  • コスト面も使った分だけ

。。となると使わない手はないだろうな、という印象です。

終わりに

実際にはもっと複雑な背景があるとは存じますが、概要としてはこのような流れかなと思います。
「宣言的」概念はシステムのあるべき形だと感じつつも、まだステートレスなシステムでのベストプラクティスなのかなという感触です。
ステートフルシステム(状態維持が必要なシステム、例えばDBサーバなど)でも利用はできますが、まだちょっと厳しいので今後より良い概念・機能が出てくるのかな、という期待があります。

おじさんエンジニアとしては過去に思いを馳せつつも、これからも技術のビッグウェーブには乗っていきたいので日々勉強・キャッチアップが本当に大事ですね。

最後まで読んでいただきありがとうございました。
明日の記事担当はデータエンジニアの谷元さんです。よろしくお願いします!


株式会社エニグモ 正社員の求人一覧

hrmos.co