エッジコンピューティングでアクセス集中、パーソナライズ、プライバシー保護の課題を解決

エンジニアの木村です。最近は負荷対策のためのリファクタリングやリアーキテクティングのリードや、データ基盤・ML・検索基盤を担当するチームのマネージャーとしてデータ関連の案件に携わっております。

先週、webinar 形式で行われた Akamai TechWeek 2021 Japan にて、6/16 に「EdgeWorkersの導入について」というテーマで、昨年末にバイマに導入したEdgeWorkersというAkamaiのサービスについて講演しました。本ブログでもその内容を共有したいと思います。

スライドはこちらになります。

エッジコンピューティング導入の背景

導入背景としては、アクセス集中下でもサービス自身のドメインを通じて自らユーザーを追跡し、ファーストパーティデータを収集する必要性という事業課題がありました。

アクセス集中を伴うマーケティング施策への対応

事業課題の1つとして、モバイルへのプッシュ通知と、フラッシュセール施策という、いずれもアクセスの集中を引き起こすマーケティング施策へサービスとして対応していく必要がありました。

まず、モバイルへのプッシュ通知が増加しているのですが、その背景には時代やサービスの成長フェーズの移り変わりがあります。マスマーケティングからOne to Oneマーケティングへと、我々の注力ポイントが変わってきているからです。実際、以前は認知を広げて新規顧客を獲得するためにTVCMによるマスキャンペーンを展開し、当時はアクセス集中といえばTVCMの放映起因がほとんどでした。最近は既存顧客一人一人に対してパーソナライズ した通知を送ってリテンションを図る施策が増えています。モバイルアプリをインストールしているユーザーやLINEのバイマ公式アカウントと連携しているユーザーはサービスへのロイヤリティが高く、通知の開封率も高いので特に一斉に送信するような通知ではTVCM放映以上のアクセス集中が生まれています。

フラッシュセールとは、ブラックフライデーサイバーマンデーなどのセール施策のことです。期間や商品の在庫も限定した形で行われ、また先程のプッシュ通知により開始が通知されるのでアクセスの集中が発生します。毎年性能改善を重ねてはいたのですが、スケールが難しいオンプレのインフラを使っているサービスのところでは対策に限界があったのと、年々増加していくユーザー数に追いつかない状況で、毎年このセールの開始直後はサイトのパフォーマンスが不安定になっていました。

パーソナライズとプライバシー保護の両立

直面していた事業課題の2つ目は、プライバシー保護とパーソナライズの両立です。

パーソナライズ についてですが、最近はレコメンドなどに代表されるパーソナライズ されたUXへの期待が高まっています。北米やイギリスの消費者を対象にした2019年の調査*1では、63%の消費者がサービスの標準としてパーソナライズ を期待しているそうです。また、自分だけの特別なオファーが送られてきたり、サービスとの各接点で一人の同一の顧客として認識されることで一人の個人として扱われていると感じるそうです。したがって、サイトのUXだけでなく、マーケティング施策にもパーソナライズ が求められています。さらに、そういったUX改善やマーケティング施策の効果測定でも、施策による単純なKPIの動きだけでなく、ユーザーを一人一人区別して分析し、どのような属性の顧客に効いているのかといった高い解像度での分析が求められています。

また、ユーザーはパーソナライズ を期待している一方で、プライバシーの保護についての意識も高まっていて、個人データの収集に関して透明性が求められています。それに答えるように、SafariChromeといったメジャーなブラウザでブラウザフィンガープリントやサードパーティクッキー、クライアントサイドクッキーは何らかの制限や規制、あるいは廃止される予定になっています。これにより、ユーザーを追跡する手段としては、サービス自身のドメインを使ってのファーストパーティクッキーだけが残され、そのクッキーを通じて収集されるファーストパーティデータの重要性が増してきています*2

エッジコンピューティングの導入理由

そういった背景から、なぜエッジコンピューティングの導入に至ったのかをお話しします。

CDN導入のための必須要件

背景にあったアクセスの集中に対応するために、それまで画像・js・cssといった静的コンテンツのみで多なっていたCDNでの配信を、Webアプリから返しているHTMLやAPIという動的コンテンツの配信にも導入することになりました。また、これも背景で説明したパーソナライズ やデータ収集の必要性から、CDNを選定するにあたり以下の3つの必須要件を定めました。

  • レスポンスをパーソナライズ可能
  • エッジアクセスデータと自社DWHの連携が可能
  • ファーストパーティCookie操作をエッジへ移行可能

1つ目はレスポンスをパーソナライズ可能性、つまりキャッシュするコンテンツをユーザーによって分けたりが可能かどうかです。2つ目はエッジアクセスデータと自社DWHの連携が可能かという点です。それまではオリジンのWebサーバーのログをDWHへ連携することでユーザーのサイト内の行動を分析していたのですが、キャッシュされるコンテンツに関してはオリジン側へログが残らなくなるため、分析のためにエッジのアクセスデータを自社DWHへ連携する必要があります。3つ目ですが、バイマでは未ログインでセッションを張っていないユーザーも追跡可能にするためにファーストパーティcookieを利用していて、これによりUXのパーソナライズやマーケティング施策の効果計測を実現しています。そのCookieの操作処理をエッジへ移行する必要がありました。

ITP2.1 への対応

Cookie操作処理のエッジへ移行する必要性の経緯としては、ITP 2.1 への対応というのがあります。

f:id:enigmo7:20210617143016p:plainf:id:enigmo7:20210617143020p:plainf:id:enigmo7:20210617143027p:plain
Cookieの操作処理の変遷
バイマでは当初、そのCookieの操作をJavaScriptAPIであるDocument.cookieで行っていたのですが、2019年にApple によってブラウザのJavascriptで作成するCookieの有効期限は最大7日に制限するというITP 2.1のSafariへの導入 がアナウンスされました。バイマではiOSSafariのユーザーが大半を占めるので、我々はそれまでブラウザのJavascriptで行っていたCookieの操作をオリジンのアプリケーションサーバー側へ移行し、Set-Cookieヘッダーによりcookieを操作するように修正を行いました。今回のCDNの導入の際は、キャッシュヒット時にもSet-Cookieヘッダを返すことでCookieを操作する必要性がでてきます。したがって、ファーストパーティCookieの操作処理をエッジサーバー側へ移行する必要性がありました。

Akamai EdgeWorkers を採用

いくつかのCDNサービスからAkamaiを選んだ理由としては、上述の必須要件を満たしていたという点がありました。プロパティだけでもかなり柔軟なルール設定が可能で、それだけでもある程度パーソナライズは実現可能でした。また、DataStream 2 というサービスにいよりエッジのアクセスデータをリアルタイムに近い形で、GoogleのBigQuery上で構築している自社のDWHへデータ連携が可能でした。さらに、EdgeWorkersによりCookie操作をエッジサーバー上への移行が可能という点で必須要件を全て満たすことができました。

また、すでにスタティックコンテンツの配信では既に導入実績があったという点も採用への後押しになりました。実は、この3つの必須要件のパーソナライズ、データ連携、エッジコンピューティングの機能については、他の競合CDNサービスも提供されています。ただ我々としては利用に慣れていて信頼のあるAkamaiを使いたかったので、逆にこちらからエッジコンピューティングの機能について問い合わせたところ、EdgeWorkersを提案いただき採用に至りました。

EdgeWorkers の開発・運用

EdgeWorkersでどのように開発をすすめたのかや、実際の運用状況について紹介します。

利用開始は簡単

最初のEdgeWorkersへのとっかかりは簡単でした。すでにAkamaiをお使いの方にはお馴染みのWebコンソール「Control Center」から利用可能です。実装も簡単で、開発者はイベントハンドラのファンクションを実装するだけになります。今回クライアントへのレスポンスするタイミングで呼ばれる onClientResponse を実装しました。Web UI上でコーディング、アクティベートも可能だったので、チュートリアルとしてのHello Worldを動かすところまでは1日で到達できました。

ライブラリも利用可能

利用できるライブラリについてですが、cookie操作やURLのパースなどは標準で組み込まれいているモジュールで十分可能でした。また、rollup.jsというJavaScriptのモジュールバンドラーにより標準組み込みでない、npmリポジトリで公開されているサードパーティ製モジュールもbundle可能でした。1点だけ注意というか、当然ではあるのですが、ブラウザJavaScriptのコードをそのまま移植しようとしても動かなかったです。元々はブラウザJavaScriptCookieを操作していたロジックはあったのですが、書き換える必要はありました。さらに細かいところで言うと、ブラウザだとwindowのオブジェクトにatobやbtoaというbase64エンコーディング・デコーディングをしてくれるfunctionが付いてるんですが、それは使えず、代わりにrollup.jsでbase64エンコーディングを行う3rdパーティモジュールをbundleして呼び出して使っています。

開発時のデバッグ方法

開発時のデバッグ方法ですが、ログ出力をレスポンスヘッダに埋め込んで確認できるenhancend debug headerを利用し、ステージングへアップロード&アクティベートを繰り返しながらのプリントデバッグで切り抜けました。本当はakamaiCLIでローカルにSandboxを構築して利用する方法もあるのですが、その構築が手順通りにやってもうまくいかず、ブラックフライデーが差し迫っていたこともあり、問い合わせる余裕すらなかったので、実環境のステージングでの開発となりました。ただステージングへのアクティベートは毎回許容範囲内の時間で終わってくれるので、このやり方でも開発は可能でした。手順通りにやってうまくいかないという点なのですが、ある程度経験のあるメンバーが二人がトライして、二人ともSandbox構築にハマっていたので、ドキュメントに関しては何らかの改善の余地はありそうです。昨年末の話なので、既に改善されているかもしれないですが、期待とフィードバックの意味も込めて、ここで苦労したエピソードとして紹介させていただきます。

開発時の参考リソース

開発時に参考にしたリソースとしては、ユーザーガイドもあるのですが、他にもGitHubのリポジトリにexampleが公開されていて、先行事例があまりない段階での利用だったので、開発時にはそれが大変助かりました。ブログにも参考になりそうな記事があります。ユースケースの紹介もありますし、Github ActionsでCICDを実現するような事例も載っていて、デベロッパーフレンドリーにできてるなと思いました。

本番運用してみて

実際に本番運用してみている状況についてですが、実行時間的なパフォーマンスは非常に良いです。エッジでなにか処理を行っていてもUX上はなにもレイテンシを感じないレベルを維持できています。

f:id:enigmo7:20210617153422p:plain
EdgeWorkersの実行時間の推移

ただし、本番環境でのトラブルシューティング時のログ調査に課題があるなと感じました。原因調査のため、ログ出力の解析をするためにログ取得をしようとすると、Log Delivery Serviceを使うのですが、実際に手元にログが届くのが次の日になるので、トラブルシューティングのサイクルを回すのが非常に時間を要する状態でした。EdgeWorkersをFaaS (Function As A Service) として見たときに、競合としてはAWSのLambdaやGCPのCloud Functionがそれに当たると思うのですが、これらの場合はリアルタイムに近い形でWebUI上で本番のログをtailしたり、検索、分析できたりします。当然Akamaiそういった機能はあるのだろうと、期待していたところもありましたが、そういた機能は未提供となっているところは残念なポイントでした。今後提供されることを期待したいと思います。

EdgeWorkersの今後の活用と期待

私たちのEdgeWorkersの今後の活用案や、EdgeWorkers自体に対してどのような進化を期待しているかというお話しをしたいと思います。

EdgeWorkersの今後の活用案

今後の活用案としては、これまではキャッシュすることでオリジン側の負荷をオフロードするだけでしたが、EdgeWorkersによりオリジンのロジック自体をエッジへマイグレーションしてよりアグレッシブなオフロードが可能となったと思うので、今回はCookie操作の処理だけでしたが、分散処理可能なものはEdgeWorkersへ移行してよりオリジン側の負荷を減らすことが考えられます。また、これまでオリジン側の負荷の制約で諦めていたサイトUXのパーソナライズ施策がたくさんあります。工夫次第ではEdgeWorkersを使うことで実現できそうなものもありそうです。

EdgeWorkersへの今後の期待

最後に、EdgeWorkersへの今後の期待としては、運用のところでも述べた競合並みの監視・ログ管理ツールの提供です。また、監視として使えるものとしてEdgeWorkers Report APIが提供されていて、各種メトリクスを取得できるのですが、自社の監視ツールに手動で設定する必要があります。最近は我々も導入しているDataDogやNew Relicといった3rd パーティの監視SaaSがあるので、SaaS側にEdgeWorkersのインテグレーションを用意してもらって簡単にメトリクスやログを収集し、常時監視できたらありがたいと思いました。率直に言って、現状のレポーティングの機能だけですと、Cookieの操作のようなライトな使い方が限界かなという印象です。これら監視・ログのサポートが揃えば、ある程度複雑な処理を書いても本番運用に耐えると思うので、今後の活用案のところで書いたような、マイクロサービス をエッジ上で構築するぐらいのヘビーな活用も目指せると思いました。

また、データ連携の強化も期待したいです。EdgeKVという、Key Valueストアとして使えるストレージの機能があるのですが、そこのデータがエッジ上にあるままでは活用は限定的なものになってしまうと思いました。そこのデータも自社のDWHなどに取り込み、他のデータと紐づけての活用が可能になれば理想的です。

まとめ

私たちは、アクセス集中による負荷の軽減・パーソナライズ・プライバシーの保護というお互いにトレードオフの関係にある課題を解決する必要がありました。エッジコンピューティングの導入により、それらをどれも犠牲にすることなくクリアすることができました。EdgeWorkersには、ドキュメントの整備や運用面ではまだ課題はあるもの、それらが改善されれば開発者体験としてはさらに向上し、よりディープな活用へと踏み出すことができると期待しています。

最後に、私たちは今回紹介したようなUXの向上やプライバシー保護についての課題など、純粋にユーザー目線でサービスの課題解決に日々取り組んでおります。CDNを始めクラウドサービスや、最新テクノロジーを駆使して世界を変える流れを作るサービスを提供する仕事に興味のある方は、下のリンクからご応募お待ちしております。

株式会社エニグモ 正社員の求人一覧

hrmos.co

enigmoデザイナーの「最近気になるデザイントピック」- アプリUI編

こんにちは。デザイナーの別所です。
本日はenigmoデザイナーの、最近気になっているデザインに関するトピックをご紹介したいと思います。

今回のテーマは「アプリUI編」です!

心地よい使い勝手!映画レビューアプリ「Must」

「Must」は映画/TVファンのためのSNSアプリです。
作品のレビューを投稿したり、見たい映画や番組をコレクションとして保存できたり、友達と繋がって他の人がコレクションしている作品を見たりすることができます。
おすすめ作品をデイリーでレコメンドしてもらえたりもします。
2016年にリリースされたアメリカ生まれのアプリです。

f:id:enigmo7:20210603171725p:plain
引用:https://mustapp.com/

デザイナーのこだわりを感じるUI/UX

f:id:enigmo7:20210603171820p:plain Mustの一番の特徴は、画面の下半分の操作で完結するように設計されているUI/UXではないでしょうか。
レビューの入力や「観たい」「鑑賞済み」を選択するボタンなど、このアプリのメインアクションは全て画面の下部で操作できるため、「指が届かない」「両手に持ち替える」などのストレスがなくスムーズに操作できます。
昨今のスマホの様々な端末サイズに左右されないような設計がされているんですね!
ストアのレビューでも優れたUI/UXに対してのコメントが多く見られます。 (レビューでUIについて言及されるアプリって珍しい気がします!)

心地良いUIモーション

Mustは見た目の美しさだけでなく、モーションにもこだわりを感じます。
作品のサムネイルをタップすると、本棚から選んで取り出すような雰囲気のモーションがついてます。タップした時にサムネが少し小さくなるところも、ユーザーが押したことを認知させる表現で、細かい技が効いてます。
作品情報のモーダルは作品サムネイルにかぶさる様に画面下から表示されレイヤー感があるので、作品サムネイルと作品情報との優先順位を感じさせる見た目になっています。

テキスト要素をギリギリまで排除したシンプルさ

f:id:enigmo7:20210603173215p:plain
引用:https://mustapp.com/
海外アプリっぽいな〜と感じるのがテキスト要素の圧倒的少なさ。
基本的には作品名はテキストで表示させず作品画像のみとなっています。 このシンプルさが美しい作品画像をより際立たせてる気がします。 視認性担保のため小さい画像も無いところが、作品情報を認識しやすく操作しやすいです。

楽しい絵文字遣い

f:id:enigmo7:20210603173326p:plain

絵文字の使い方にシャレが効いていてオシャレです。
レビューを10段階で入力するときは、数字によって表情が変わります。 こういう小技で入力を楽しめる体験って良いですね。

海外のアプリなので日本語のレビューや、邦題作品はまだ少ないようです。
日本でももっと広まって欲しいです!

株式会社エニグモ 正社員の求人一覧

hrmos.co

コロナウィルスがやって来た!情シス奮闘記

こんにちは、Corporate IT/Business ITを担当している足立 です。

この記事は Enigmo Advent Calendar 2020 の 24日目の記事です。

エニグモでは1人目のコーポレートIT担当として未着手な社内IT環境をコツコツ整備してます。 世間的には1人情シスと呼ばれるポジションです。

今回は2020年に取り組んだ事を書きたいと思います。

2020年は何と言ってもコロナウィルス

1月後半から徐々に猛威が迫ってきて弊社も2月17日に対応・方針についてのMTGが予定されてました。
また、偶然にも同タイミングでSlackの全社チャンネルに 会社に対し何らかの方針、対応があるのかと言う内容の投稿がありました。
今思うと見えない恐怖・不安が爆発した日だと思います。

f:id:enigmo7:20201211162325p:plain
コロナウィルス

2月17日 緊急招集 MTG

2月17日 夕方 経営陣、監査役、部署長、情シスが招集されました。
そこで決まった方針は下記内容になります。

方針の大概要

①「在宅勤務」の推奨
②出社勤務の場合は「時差通勤」推奨と「社内感染防止ルール」の厳守
③本方針は翌営業日からのスタート
コロナウィルスの影響により、急遽全社的にリモートワークを一斉にスタートする事になりました。

情シスとしてのMISSIONは リモートワーク環境の構築

リモートワーク環境・・・

実はリモートワークを本導入してませんでした。
当初の予定だとオフィスが元々国立競技場が近いと言うこともありオリンピックに向けて
リモートワークを段階的に試験導入する予定でした。
また、コロナ禍前は情シスとしてリモートワーク開始について反対の立場をとっていました。

反対した理由

当時でもリモートワークで業務出来る環境でありましたが
いくつか課題があり中々、賛成する事は出来ませんでした。
代表的な理由としては・・・

  • 運用、緊急時のフローが無い
    リモートワークの運用フロー・ルールが構築されていない
    重大インシデント発生した際の緊急連絡フローが確立出来てない
  • ウィルス対策が脆弱
    コンシュマー版のウィルス対策ソフトを利用してましたが、リモートワークを実施するには強化すべき部分でした。 やはり管理コンソールが無い為、管理者として各端末の状況が把握しにくい部分が課題でした。
  • MDM(Mobile Device Management)導入して無い
    ウィルス対策の話と同様に端末の状況を一括で集中管理出来るソリューションが未導入

運用・緊急時のフロー等は社内で方針を決めるだけなので、直ぐに対応が出来る内容ですが
正直、セキュリティ周りをもう少し強化な環境に整えてから実施したいと言う思いでした。

真っ先にやった事 デスクトップ パソコンのリプレイス

緊急事態に付き兎に角、業務が出来る環境にすべくデスクトップPCのリプレイスに着手しました。 当然、パソコン無ければ仕事が出来ません。
当時、アルバイト・派遣社員が使用していた約20台程のWindowsデスクトップPCが
稼働中な為、直ぐにメーカーに連絡し、12~13台ずつ購入し3~4週間程で入れ替え完了させました。
キッティングに関し初めはイメージを作成しクローニングで対応も考えたのですが
作成・検証する時間も無かったのと、エニグモでは予めインストールするソフトも少ないので
人力による力技で対応しました。
2019年より少しずつノートPC化をしていたので、何とかなった感じです。

f:id:enigmo7:20201211170328p:plain
デスクトップPC リプレイス

WEB会議・問い合わせ対応

Zoomを導入しました。
実は運良くコロナ禍前から導入に向けて動いていました。
導入した理由としてはGoogle Workspace(旧称 G Suite)があるので
Google Meet利用出来ましたが、やはり大人数でのWEB会議はZoomの方が安定していたからです。
特にエニグモでは4Mと呼ばれる全体集会が月1回あるので、そこは重視しました。
また、仮想背景機能もあるので従業員のプライバシー保護の観点や
リモートコントロール機能もあるのが決定打でした。

Zoom導入前にトラブルサポート対応で解決までに3時間かかった事例があったので
この機能にはだいぶ助けられました。

Zoom対応ハードウェア DTEN導入

元々社長室・会議室にAppleTVが設置されておりプロジェクターを使用して
会議資料を投影してました。
MacはAirPlayを使用して安定的に投影出来ますが、Windowsの場合はAirParrot3と言う
シェアウェアを利用してましたが非常に動作が不安定と言う課題がありました。

プロジェクターを利用する際に部屋を暗くする不便さ、動作が不安定な課題を解決する為
Zoom対応ハードウェア DTENを1番大きい会議室と社長室に導入しました。 導入後はWindowsでも安定した投影が可能になりZoomRoomsアカウントで 無償アカウントユーザーでも時間無制限でWEB会議を利用出来るようになりました。
デメリットな部分としてはお値段が高いところでしょうか
個人的には気に入っているハードです。
当初は全会議室に導入しようとしていたので2台で止めといて良かったです。

f:id:enigmo7:20201211173357p:plainf:id:enigmo7:20201211173336p:plain
社長室と会議室

電話対応 fondesk モバイルチョイス050導入

コロナ禍と言えども会社へ電話が鳴る日々でした。
その殆どは業務と関係の無い営業電話や人材紹介の電話で月間300件程着信があり
主にバックオフィス部門にて全ての電話を受電し業務負荷が増え
電話対応する度に集中力が途切れて業務がままならない状況でした。
また、お客様の問い合わせについても迅速に対応出来ない事例も発生し
折返し対応するにも会社支給の携帯電話が無いので個人の携帯電話を利用する人も居ました。

そこで電話受付代行サービス fondeskを導入
導入後は会社代表電話番号をfondeskの番号へ転送し 転送先のオペレーターの人が一時対応し全て折返し対応となります。 対応内容はSlackにて投稿されるので、総務が内容毎に関係部署や個人へ共有する運用になりました。
導入後はオフィスが非常に静かになり、こんなにも効果があるものかと感心しました。

また、個人の電話対応として楽天コミュニケーションズのモバイルチョイス050を導入しました。
導入後は個人スマホに050番号を付与する事が出来るので、折返し対応時にプライベートの番号が伝わる事が無い様になりました。 メリットとしては簡単にBYODを導入出来る、データ通信ではなく音声回線を利用しているので音質が安定しているのが良い部分だと思います。

FAX対応 メール送信&Slack連携

電話の次に対応したのでFAXです。 レガシーなシステムと言えども銀行やカード会社のやり取りで使用する為、 何かしら対応が必要になりました。

弊社のFAXは複合機で送受信を行っているのですが 仕様を確認したところ受信したFAXをメール送信出来る機能があり直ぐに設定 メール送信されたものはzapierを経由しGoogleドライブ(共有ドライブ)へ保存され受信した際にSlackへ通知されるようにしました。

ただ、この構成で欠点があります。 複合機のFAXメール送信機能は用紙切れになるとメールが送信出来ない仕様でした。 ここは盲点でした。

f:id:enigmo7:20201211181155p:plain
FAXの構成

冒頭でも触れたウィルス対策

リモートワークを始めるにあたり絶対にウィルス対策(エンドポイントセキュリティ)の強化は最低限譲れないと思ってました。 そもそもWindowsNortonMacはESETとOS毎に違っている。 しかも、コンシュマー版を利用している状況 Excelでシリアル番号とインストール端末を管理すると言う運用

この状況を打破する為、CrowdStrikeを導入 念願のEDRを導入する事が出来ました。 展開が終わった時は達成感が凄かった・・・・ 何と言っても管理コンソールの存在に感動、アラートが飛んでくる頼もしさ、勝手にアンインストールされない安心感はプライスレス。

ちなみに導入直後に管理コンソールにログイン出来ない事象がありました。 原因はセカンダリ環境にCrowdStrikeが構築されログインURLが通常と違っていたのが原因でした。
(後日談としてESETのアンインストールを社内にアナウンスしたら、みんな光の速さで削除してました・・・)

まとめ 2021年へ

その他にも実施した事もまだあるのですが、今回書くことが出来ないのでいつかタイミングあれば書いてみたいと思います。 情シス視点での福利厚生制度を作ったり社内イベントをやったりしました。

今後リモートワークを核としたワークスタイルへの移行にむけてオフィスをリニューアルする予定で、 そこに向けて色々と動いています

  • サーバールーム移設対応
  • 社内ネットワーク対応(有線・無線)
  • 電話回線周り(光収容化・Dialpad導入)
  • インターネットFAX導入
  • 受付システム導入
  • 入退出システム導入
  • 座席予約システム導入
  • AV機器周り対応

また、4月にokta導入が決定しました! 出来たら来年こそはMDMを構築したいと思ってます。

では、以上になります。 ありがとうございました。

明日の記事の担当は 出品審査担当 の 杉山 さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co

リモートワーク文脈で超低コスト擬似DNSを社内で提供しました。

こんにちは、インフラチームの加藤です。

この記事は Enigmo Advent Calendar 2020の23日目の記事となります。
本記事では、リモートワーク環境のため、擬似DNSを社内提供したお話をします。

エニグモでは、今年の2月頃から全社的にリモートワークを開始しました。
それに伴いインフラチームでは、リモートワークのネットワーク周りの対応を行いました。


エニグモが運用しているサーバ群

エニグモの運用するサーバは、データセンター内に構築したものとAWSのものがあります。
情シスの足立さんが、SaaS導入を進めて下さったためオフィス内にサーバはほぼありません。

サーバへの疎通経路

オフィス・リモート環境共にVPN経由(+ファイアウォール)で、サーバ群へアクセス可能です。

f:id:enigmo7:20201221131920p:plain


リモートワーク開始後のサーバアクセスの問題

リモートワーク開始直後から、ネットワーク設定に関するお問い合わせと、インフラチームの対応が発生しました。VPNWifiの設定など、個々人のネットワーク設定の問題は、一度解決すれば再発することは滅多にありませんでしたが、名前解決は、何度もお問い合わせが頻発する厄介な課題でした。

なぜ、名前解決がリモートワークの課題だったのか

コロナ禍以前
エニグモでは、オフィス環境からサーバへのアクセスを楽にするため、ファイアウォールでオフィスのグローバルIPを許可し、非エンジニアスタッフの使用するサーバへはサーバのグローバルIPを指定してアクセス可能にしていました。

f:id:enigmo7:20201221132000p:plain

名前解決がリモートワークの課題となった原因
リモート環境からサーバへは、VPN経由でサーバのローカルIPを指定してアクセスする形でした。そのためオフィスとリモート環境では、サーバのアクセス情報が異なりました。

整理すると、以下の形になります。

職場 アクセス元のIP アクセス先のIP
リモート環境 スタッフのお家 サーバのローカルIPを指定
オフィス オフィスのGIP サーバのグローバルIPを指定

リモートワーク開始直後は、採用や営業活動、検品作業のため出社が必要なスタッフも居て、シフトで出社日を回していました。出社したスタッフは、Hostsを社内用に修正せねばならず、名前解決関係のお問い合わせが継続していました。DNSを導入すべきでしたが、工数がかかり難しいところでした。

f:id:enigmo7:20201221132042p:plain

名前解決の課題を解決

Hostsを一元管理できないかと思案していたところ、SwitchHosts!を発見し利用することにしました。

f:id:enigmo7:20201221132126p:plain

SwitchHosts!とは

Hostsファイルのサーバ管理が可能になる、端末のアプリです。
開発者は、サーバからHostsファイルをダウンロードして端末で使用する形となります。

f:id:enigmo7:20201221132146p:plain

SwitchHosts!のよかったところ

  • 無料公開されているアプリケーションだったこと。
  • 配布元にリモートサーバが使えて、Hostsファイルを集中管理できたこと。
  • Mac/Windowsでも、同じUIで使えてサポートし易かったこと。
  • HostsのパーツごとのON/OFFができ、localでHostsを修正できて組み合わせられること。
  • 端末のHostsのバックアップが取れること。 f:id:enigmo7:20201221132237p:plain

SwitchHosts!を使用した、擬似DNSの運用

ユースケースに合わせたHostsファイルを作成し、データセンターにNginxコンテナを立てて配布できるようにしました。 これにより、オフィスでもリモート環境でも、簡単にDNSの機能を提供できるようになりました。

f:id:enigmo7:20201221132221p:plain

SwitchHosts!導入後の課題と解決方法

SwitchHosts!の導入後、名前解決が原因となる問題はさっぱりとなくなりました。
しかし「AWS環境における開発に伴い、Hostsの修正が都度必要になる」という課題が残りました。

  • 原因は、以下2つでした。
    • スポットインスタンスのマシンが再作成された際、Hostsの修正が必要だった。
    • AWS環境に、日々サーバが増設されるため、Hostsに追記が必要だった。

一時は、ダウンロード元のHostsファイルを手動で修正していましたが、無駄な作業でした。
そこで、Hosts情報の更新スクリプトを作成し、Hostsファイルの自動更新を実現しました。


#AWSのマシンには、${Prefix}タグと${Name}タグに、
#マシンのAZやサイト環境を命名規則として持たせています。
#!/usr/bin/env ruby

require 'aws-sdk-ec2'

ec2_client = Aws::EC2::Client.new()

#ARGVに、${Prefix}タグの値(マシンのAZやサイト環境を命名規則としたもの)を持たせています。
Prefix_list = ARGV

f = { filters:[{ name: "tag:Prefix", values: Prefix_list }]}
reservations = ec2_client.describe_instances(f).inject([]) do |s,list|
  s += list.reservations
end
instances = reservations.each.inject([]) do |s,list|
  s += list.instances
end

instance_list = []
instances.each do |e|
  instance_list << {
    :instance_id => e.instance_id,
    :vpc_id => e.vpc_id,
    :public_ip => e.public_ip_address,
    :private_ip => e.private_ip_address,
    :name_tag => e.tags.find{|t| t.key == "Name"}.value,
    :prefix_tag => e.tags.find{|t| t.key == "Prefix"}.value.downcase,
  }
end

instance_list.each do |e|
  hostname = e[:name_tag].downcase

  #あとは、環境に合わせてよしなに加工してHostsファイルに出力します。

end

まとめ

DNSの機能を低コストで提供できてよかったです❗️
VPNへの依存度も減らしたいので、Google IAPを検証中です。
同じインフラチームの先輩社員 山口さんが、記事にされていました。
old schoolerなネットワークエンジニアがIAP Connectorを試してみた - エニグモ開発者ブログ

最後に

明日の記事の担当は、情シスの足立さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

今年組織作りに貢献するためにやったこと

今年組織作りに貢献するためにやったこと

こんにちは。アプリケーション開発グループの穴澤です。

この記事は Enigmo Advent Calendar 2020 の 22 日目の記事です。

今年の振り返りとともに主に自分が去年から今年にかけて中途で入社した方にやってきたことを書いておきます。 これからチームに新しい人を迎える人や、中途採用入社する方、チームの組織づくりに興味がある方に読んでいただければとおもいます。

実際に取り組んだこと

  1. オンボーディング資料
  2. 開発手法、フロー・チェックリストの読み合わせ
  3. 1on1

1.オンボーディング資料

f:id:enigmo7:20201209113806p:plain

エニグモでは部署問わず、今年から中途で入社した社員向けのzoomでのオンボーディングが開始されました。 簡単にいうと、「各事業部が具体的になにをやっているか」「どんな取り組みをしているか」を中途入社した方に説明します。 入社された人は部署ごとに数回に分けて参加しています。

思えば今年の早い段階からリモートワークが推奨された状況で、中途入社問わず、自分の所属する部署以外の方と顔を見合わせて話をする機会というのが極端に減りました。例えば「先月入社した方ですね、XX部署のXXです!」といったようなコミュニケーションも、廊下やエレベータで顔をあわせたり、ミーティングの終わりですれ違う、お手洗いで顔を合わせるなどのタイミングで行われてきました。それぞれを取るととても些細なことですが、これらを積み重ねた何かが会社の社風や文化を醸造する一部になっていたんだなと感じます。

社風や文化をオンラインでも実現するために、このオンボーディング時に自分が紹介する担当として実践していることは以下の点です。細かい箇所は割愛しますが、リモートワークならでは、でしょうか。

  • 説明資料に自部署の社員の氏名とSlack名の紐付け(顔とアイコンと名前とSlack名が一致しない問題)
  • アプリ、インフラ、データ基盤、それぞれについての相談相手としてまずは覚えておいてほしい人をエピソードを添えて紹介(そんなに色々紹介されても覚えられない問題)
  • BUYMAの仕組みで聞きたいこと、相談したいことがあったら、ここで相談してね、のチャットの紹介(何かを相談したくなってもどこだかわからない問題)
  • 参加してくれた方とサービスエンジニアリング本部との具体的な関わり(紹介してもらったけど具体的にどういう関わりがあるんだろう問題)

2.開発手法、フロー・チェックリストの読み合わせ

f:id:enigmo7:20201209112322p:plain

BUYMAというサービスが産声を上げて10年以上たちます。尖った技術を使っているところもあれば枯れているところ、 やや温かみのある仕組みのところもあり、中途で入社するエンジニアの方が躓くところもあるため、主に若手の方対象に週1回程度開発やリリースの際に 気をつけてほしい点をesaにまとめたフロー・チェックリストの読み合わせを行いました。

基本的に開発に関わる問題点は、所属しているチームのリーダーやメンターが答えてくれますが、彼らに一点集中よりも、誰にきいても答えてもらえる、誰にきいても大丈夫、という雰囲気作りに自分も貢献したかったからです。ましてや、自分の所属するチーム以外のエンジニアと、躓きや悩みを共有する場所はいくつあってもいいと考えています。 時には検索エンジンチームのエンジニアをゲストに迎えて検索周りの説明をしてもらったり、チェックリストについては実際トラブルに遭遇したエンジニアに「なぜこのチェック項目があるのか」というのを語ってもらいました。

3.1on1

1on1は前職でも取り組んできたことですが、今年特に自分が取り組んできたことをあげます。

アジェンダを用意する

序盤はなかなか用意できませんでしたが、後半は実施の1,2日前にいくつかアジェンダを用意するようにしました。 4,5つ用意してその中のトピックで話が広がれば盛り上がり、割愛してできるだけ「話が盛り上がる」「相手の指向性、興味関心がわかりそう」な所を意識してすすめていきました。

1on1は「・・・最近どうですか」というやり方もありますが、1on1を実施するメンバーの身になって考えてみると 「なにを話せばいいんだ」「なにを言われるのか」と身構えられてしまうこともあります。明日はこういう事を話そう。を用意する方が 時間も有意義ですし、もしネガティブな話をされるということが事前にわかっていれば、自分が言いたいことも事前に用意するなど、お互いに準備ができ、結果的に少しの準備で得られる物が大きいと感じました。

会社全体の動きがわかるトピックをアジェンダに混ぜて話す

メンバーの仕事や取り組みと直接関係のない、会社の動きや施策。最近の売上。他部署に入社した人のこんなところが凄いなど。 会社の中での私達の部署の動き、施策以外の取り組みや課題など、自分と会社をつないでいる環境にどんなものがあり、どんな動きをしているか。中途入社の序盤だと、見えにくい組織の動きを定期的にアジェンダに混ぜて話をするようにしています。

  f:id:enigmo7:20201209112739p:plain

まとめ

今年は、いつもやっていること+αの「工夫する」注力をしました。特に、コロナ禍のリモートワークで感じやすい「孤独感」を和らげるための「皆さんは一人ではないですよ」ということを伝えるために、様々なできることを模索した年だった様に思います。取り組みの結果を定量的にはかることはできないので難しいですが、フィードバックを受けながら、来年も少しずつ改善していきたいと思います。 よいお年を。

明日の記事の担当は インフラグループ の 加藤 さんです。お楽しみに。

株式会社エニグモ 正社員の求人一覧

hrmos.co

データアナリストが転職活動で求めたこと

こんにちは、データアナリストの田中里澄です。
エニグモではデータ活用推進室という部署に所属しており、主に他部署が行った施策の効果検証を担当しています。

私は2020年11月にエニグモに入社したので、今回はその転職活動の中でどうして今の職を選んだのか、また入社後どのような仕事をしているのかを紹介できればと思います。
前職はライブ配信サービスを運営している会社で、同じくデータアナリストとして働いていました。
転職理由について前職に対してネガティブなことは一切なく(むしろ今も大好きな会社です)、今の自分は別の会社で経験を積んだ方がいいと判断したためです。

なぜこの内容を書くのか?

理由は社外の方向けと社内の方向けでそれぞれあります。

  • 社外の方にはデータアナリストの転職活動の参考にしてもらいたいため

  • 社内の方にはまだ入社して日が浅い私の自己紹介になるためです。転職活動だけでなく、自分がどんな仕事をできるか/したいかについても書いています。

そもそもデータアナリストとは?

ここであくまで自分の考えにはなりますが、データアナリストとはどんな仕事をするのか?を説明します。
読んでくださっている方に知っていただきたいですし、私が転職活動をしたときにも実際に自分の中で整理しました。
その整理のおかげで、無数にある求人(特にデータ〇〇という職種は最近多い)の中でピックアップする際や、面接時に自分がしたいことを説明する際に役立ちました。

データアナリストがどんな仕事をするのかを一言で表すと、「意思決定に必要な情報/インサイトを提供する仕事」だと思っています。
データを分析する仕事じゃないの?という意見もあるかもしれませんが、分析はあくまで手段であり意思決定をすることが目的だと考えているのでこのような書き方をしました。
もしかするとコンサルという職種でイメージされる仕事に近いかもしれません。(余談ですが、思考法を勉強するためにコンサル向けの本を読むこともあります)
逆にいうと、こういった仕事をする人のことを「データアナリスト」と定義している会社が多いとも言えます。(本質的にはこちらが正しいかもしれません)

どんな会社を求めていたのか?

上記で書いたデータアナリストとしての仕事ができることはもちろんですが、さらにそれを具体化した自分のしたい仕事について2点、またそれに関連した仕事の環境について2点合わせて4点求めていることがありました。

どんな意思決定に携わりたいのか

意思決定にも長期的な経営に関するものや日々のオペレーションに関するものなど色々な種類があります。
その中でも私はプロダクトの改善に関する意思決定に携わりたいと考えていました。
なぜならユーザーに喜んでもらえることに直結して実感しやすく、また分析するなかで人がどんな考えで行動するのかを垣間見ることができるためです。

どんなデータを分析するのか

データは大きく分けると定量と定性の2種類に分けることができますが私はその両方を用いて分析したいと考えていました。
特に定量的なデータだけを扱うことが求められる場合もありますが、ユーザーの行動理由を知るためにはそれだけでは分からないことがたくさんあるためです。

他の職種(特に意思決定者)との連携が取りやすい

上の方で書いていたデータアナリストの仕事内容を遂行するためには他の職種の方との連携が大切です。
その中でも特に最終的に意思決定をする方との連携がとても大事だと考えていました。
分析をする前にはどんなことが分かれば意思決定ができるかを摺り合わせ、分析後にはそこで分かったことを共有し議論することで意思決定に付き添うためです。

分析をするためのデータ基盤が整っているか

整っているという言葉が曖昧ですが、例外はあれど日々の分析をする際の前処理やデータの準備に自分の持っているスキル以上のことを求められないことが基準でした。
もちろん後々はデータ基盤の構築などのスキルも伸ばしていきたいのですが、それをすぐに求められて本当にやりたいことに時間が使えないのはしんどいので求める要件に入っていました。

実際に入社してみて感じたこと

最終的にエニグモ社に決めた理由としては、上記の求めていることを全て満たせていると選考中や内定をいただいた後の面談で感じたからです。
ここでは入ってみて実際に感じたことを紹介します。

  • 分析結果を共有する際に、集計結果だけでなく考察やネクストアクションも求められまた歓迎されていい。
  • 求めていた意思決定者との連携について、非常に取りやすいので分析の方針を立てやすい。
  • データ分析基盤について、基本的に必要なデータがBQにあって扱いやすい。ただし、それぞれのテーブルの定義などがわからないところやGAのデータについての扱い方を調べる必要があったりと、データはあるが扱い方は聞いたり調べたりが必要なので、今後入ってくる方のためにまだまだ整える余地がある。

入ってからどんな仕事をしているか

主に2種類の施策の振り返りとアプリの分析方針を立てる仕事をしています。

施策についてはどちらも私が入社する少し前から始めていたもので、毎月改善を加えてやっていこうという前提のものでした。
なので振り返りでどんな数字を見るのか?というところから任せてもらい、翌月の改善点の提案を出すところまで行っています。

アプリの分析方針については、私が入社するまでは売上というKGIを追いつつ、KPIとして何を追うべきかについて試行錯誤をしていたという状況です。
そこで今後追うべきKPIについて決める仕事を担っており、今は私の提案を元に議論しています。

総じていうと、現在自分のしたいと思っていた仕事ができており今後もまだまだやりたいことが溢れています。
コロナ禍で入社後のコミュニケーションは以前と比較すると難しいなとは感じておりますが、より一層迅速で納得感のある意思決定に貢献できるように日々の業務に努めていければと思っています。


株式会社エニグモ 正社員の求人一覧

hrmos.co

Apache Airflowで実装するDAG間の実行タイミング同期処理

こんにちは。
今年4月にエニグモに入社したデータエンジニアの谷元です。
この記事は Enigmo Advent Calendar 202020日目の記事です。

目次

はじめに

コロナ禍の中、皆さんどのようにお過ごしでしょうか。 私はリモートワークを続けてますが、自宅のリモートデスクワーク環境をすぐに整えなかったため、薄いクッションで座りながらローテーブルで3ヶ月経過した頃に身体の節々で悲鳴をあげました。猫背も加速...

さて、エニグモでの仕事も半年以上経過し、データ分析基盤の開発運用保守やBI上でのデータ整備などを対応をさせていただいてますが、今回は社内で利用されているAirflowの同期処理の話をしたいと思います。

尚、こちらの記事を書くにあたり、同環境の過去記事 Enigmo Advent Calendar 2018 がありますので、良ければそちらもご覧ください。

そもそも同期処理とは?

例えば、以下のようなデータ分析基盤上でのETL処理があったとします。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
処理2. DWH上のデータを加工後、外部へCSV連携する

上記の処理は順番に実行されないと必要なデータを含めた抽出ができなくなってしまい、外部へデータ連携できなくなります。そのためには処理1の正常終了確認後に処理2を実施するという、同期処理を意識した実装をする必要があります。

例だとシンプルですが、業務上では徐々にETL処理も増えていき、複雑化していきます。 こうした同期処理は、初めはなくても問題にならないことが多いのですが、徐々に処理遅延が発生してタイミングが合わなくなったり、予期せぬ一部の処理エラーが原因で関連する後続処理が全て意図せぬ状態で動き出してしまったりします。そうならないためにも、設計時点で同期について意識することが大事だと思います。

Airflowによる同期処理

Airflowでは ExternalTaskSensor を使用することで実装可能となります。 Airflowのソース上ではdependという用語が使われているので、Airflowの世界では「同期」ではなく、「依存」と呼んだ方が良いのかもしれません。適弁読み替えていただければ...

では、Airflowでの同期検証用サンプルコードを作成してみましたので、実際に動かしながら検証したいと思います。尚、Airflowバージョンは1.10.10となります。

先ほどのETL処理を例にして、下記の内容で検証してみます。 実際のファイル出力やDBへのロード処理などはここでは割愛して、DummyOperatorに置き換えてます。

処理1. 連携元DBのあるテーブルをDataLake(GCS)へファイルとして格納後、DWH(BigQuery)へロードする
  dag_id: sample_db_to_dwh_daily
  schedule: 日次16:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

処理2. DWH上のテーブルを用いてデータ加工後、外部ツールへCSV連携する
  dag_id: sample_dwh_to_file_daily_for_sync_daily_16
  schedule: 日次17:00
  tables: TABLE_DAILY_1, TABLE_DAILY_2, TABLE_DAILY_3

検証時のコード

まずは処理1のサンプルコードになります。

from airflow.models import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='sample_db_to_dwh_daily',
    start_date=datetime(2020, 12, 1),
    schedule_interval='0 16 * * *'
)

tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

for table_name in tables:

    db_to_dwh_operator = DummyOperator(
        task_id='db_to_dwh_operator_%s' % table_name,
        dag=dag
    )

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed'
    )

    db_to_dwh_operator >> terminal_operator

次に処理2のサンプルコードです。

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State
from datetime import datetime, timedelta


tables = ['TABLE_DAILY_1',
          'TABLE_DAILY_2',
          'TABLE_DAILY_3'
          ]

def _dwh_to_file_operator(dag,
                         table_name,
                         depend_external_dag_id,
                         depend_external_dag_execution_delta):

    dwh_to_file_operator = DummyOperator(
        task_id='dwh_to_file_operator_%s' % table_name,
        dag=dag
    )

    sensors = []
    sensors.append(ExternalTaskSensor(
        task_id='wait_for_sync_db_to_dwh_%s' % table_name,
        external_dag_id=depend_external_dag_id,
        external_task_id='terminal_operator_%s' % table_name,
        execution_delta=depend_external_dag_execution_delta,
        allowed_states=[State.SUCCESS, State.SKIPPED],
        dag=dag))

    for sensor in sensors:
        sensor >> dwh_to_file_operator

    terminal_operator = DummyOperator(
        task_id='terminal_operator_%s' % table_name,
        dag=dag,
        trigger_rule='none_failed')

    dwh_to_file_operator >> terminal_operator


args = {
    'start_date': datetime(2020, 12, 3)
}

dag = DAG(
    dag_id='sample_dwh_to_file_daily_for_sync_daily',
    default_args=args,
    schedule_interval='0 17 * * *'
)


for table_name in tables:
    _dwh_to_file_operator(
        dag=dag,
        table_name=table_name,
        depend_external_dag_id='sample_db_to_dwh_daily',
        depend_external_dag_execution_delta=timedelta(hours=1)
    )

サンプルをAirflow画面で見ると?

上記2つのDAGをAirflowのWebUIで見ると以下のようになります。

f:id:enigmo7:20201211201546p:plain

同期処理をするoperatorは「wait_for_sync_db_to_dwh 」です。図でも ExternalTaskSensor のアイコンになってますね。 今回は3テーブルあり、それぞれ並列処理を想定しているため、3つあります。 そして、それぞれ、該当するテーブルのterminal_operator処理完了を確認後、後続処理が実行されます。

では上記の同期処理がAirflowのログでどのように表示されるか見ていきたいと思います。

同期遅延なし時のAirflowログ

まずは、既に sample_db_to_dwh_daily が完了していた場合です。 3テーブル毎のログを載せておきます。

TABLE_DAILY_1

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,833] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,840] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,841] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,841] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,841] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,848] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,851] {standard_task_runner.py:53} INFO - Started process 48063 to run task
[2020-12-09 02:00:09,905] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,921] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,924] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,834] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,834] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_2

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,831] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,839] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,840] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,840] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,846] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_2> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,848] {standard_task_runner.py:53} INFO - Started process 48062 to run task
[2020-12-09 02:00:09,902] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_2 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,916] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_2 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,918] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,921] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_2, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,832] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,832] {local_task_job.py:103} INFO - Task exited with return code 0

TABLE_DAILY_3

~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07 17:00:00
~~
[2020-12-09 02:00:09,829] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [queued]>
[2020-12-09 02:00:09,837] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,837] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-09 02:00:09,837] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-09 02:00:09,844] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_3> on 2020-12-07T17:00:00+00:00
[2020-12-09 02:00:09,847] {standard_task_runner.py:53} INFO - Started process 48061 to run task
[2020-12-09 02:00:09,901] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_3 2020-12-07T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-09 02:00:09,915] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_3 on 2020-12-07T16:00:00+00:00 ...
[2020-12-09 02:00:09,917] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-09 02:00:09,920] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_3, execution_date=20201207T170000, start_date=20201208T170009, end_date=20201208T170009
[2020-12-09 02:00:19,831] {logging_mixin.py:112} INFO - [2020-12-09 02:00:19,831] {local_task_job.py:103} INFO - Task exited with return code 0

注目して欲しい行は下記となります。
※ 他テーブルのログも同じですので、ここから先は記載を割愛します

[2020-12-09 02:00:09,919] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-07T16:00:00+00:00 ...

これが同期処理のログになります。 今回は既に正常終了していたため、1回の確認しか行われておらず、その後、後続処理も含めて正常終了してることが分かります。

同期遅延あり時のAirflowログ

次に前提のタスクが遅延して正常終了した場合のAirflowログもみていきたいと思います。

~~~
DAG: sample_dwh_to_file_daily_for_sync_daily
Task Instance: wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08 17:00:00
~~~
[2020-12-10 02:00:04,174] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [queued]>
[2020-12-10 02:00:04,182] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,182] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-12-10 02:00:04,182] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-10 02:00:04,189] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_DAILY_1> on 2020-12-08T17:00:00+00:00
[2020-12-10 02:00:04,192] {standard_task_runner.py:53} INFO - Started process 21683 to run task
[2020-12-10 02:00:04,245] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_daily.wait_for_sync_db_to_dwh_TABLE_DAILY_1 2020-12-08T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-10 02:00:04,259] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:01:04,322] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:02:04,385] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:03:04,448] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 02:04:04,511] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...

(省略)

[2020-12-10 10:27:29,163] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:28:29,226] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:29:29,273] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:30:29,298] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,360] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_daily.terminal_operator_TABLE_DAILY_1 on 2020-12-08T16:00:00+00:00 ...
[2020-12-10 10:31:29,363] {base_sensor_operator.py:123} INFO - Success criteria met. Exiting.
[2020-12-10 10:31:29,366] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=sample_dwh_to_file_daily_for_sync_daily, task_id=wait_for_sync_db_to_dwh_TABLE_DAILY_1, execution_date=20201208T170000, start_date=20201209T170004, end_date=20201210T013129
[2020-12-10 10:31:33,698] {logging_mixin.py:112} INFO - [2020-12-10 10:31:33,698] {local_task_job.py:103} INFO - Task exited with return code 0

先ほどのPoking行がログに表示され続けていることが分かります。今回は途中でタスクを手動で動かして正常終了させました。

尚、Pokingの間隔はExternalTaskSensorの引数でpoke_interval を設定すると変更ができました。ソースを見るとデフォルトは60秒のようです。ログとも一致してますね。

同期タイムアウト時のAirflowログ

業務では非機能要件も大事だと思います。 Airlfowでは、ExternalTaskSensorの引数でtimeoutがあるようです。 こちらでタイムアウト(default: 1週間ですかね、長っ)を適切に設定して、エラー検知をしてみたいと思います。

sample_dwh_to_file_daily_for_sync_hourly/wait_for_sync_db_to_dwh_TABLE_HOURLY_1/2020-12-09T17:00:00+00:00/4.log.
[2020-12-11 18:21:14,539] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [queued]>
[2020-12-11 18:21:14,551] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,551] {taskinstance.py:880} INFO - Starting attempt 4 of 9
[2020-12-11 18:21:14,552] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-12-11 18:21:14,563] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): wait_for_sync_db_to_dwh_TABLE_HOURLY_1> on 2020-12-09T17:00:00+00:00
[2020-12-11 18:21:14,568] {standard_task_runner.py:53} INFO - Started process 84130 to run task
[2020-12-11 18:21:14,625] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sample_dwh_to_file_daily_for_sync_hourly.wait_for_sync_db_to_dwh_TABLE_HOURLY_1 2020-12-09T17:00:00+00:00 [running]> ip-123-456-78-910.dokoka_toku.compute.internal
[2020-12-11 18:21:14,639] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:22:14,701] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:23:14,764] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:24:14,828] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,892] {external_task_sensor.py:117} INFO - Poking for sample_db_to_dwh_hourly.terminal_operator_TABLE_HOURLY_1 on 2020-12-10T16:00:00+00:00 ...
[2020-12-11 18:25:14,898] {taskinstance.py:1145} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/venv/lib/python2.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
    raise AirflowSensorTimeout('Snap. Time is OUT.')
AirflowSensorTimeout: Snap. Time is OUT.
[2020-12-11 18:25:14,899] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=sample_dwh_to_file_daily_for_sync_hourly, task_id=wait_for_sync_db_to_dwh_TABLE_HOURLY_1, execution_date=20201209T170000, start_date=20201211T092114, end_date=20201211T092514
[2020-12-11 18:25:15,070] {logging_mixin.py:112} INFO - [2020-12-11 18:25:15,070] {local_task_job.py:103} INFO - Task exited with return code 1

hourlyに変更して実施したログですが、「Snap. Time is OUT.」と良い感じ(?)にエラーとなってくれました。 業務上では、お目にかかりたくないログ内容ですが...

他にも色々あるようですが、この辺りを抑えておけば基本的な使い方は抑えたことになるかと思います。

所感

実際にAirflow同期処理をやってみて思ったのですが、

  • 同期を取る対象のoperatorをどれにするか
  • 同期を取るoperatorの時刻差はどれだけあるか
  • 非機能要件にあったタイムアウト設定で適切にエラーで落とそう
  • 複雑になるとDAG間の関係性がわかるグラフをWebUIでサクッとみたい

が、気になりました。

「同期を取る対象のoperator」は「terminal_operator_<テーブル名>」としました。
ダミー処理なので冗長にはなってしまいますが、各並列処理毎に加えておいたほうがDAG修正時の同期影響を意識しなくて良くなるのかなと思ったためです。

また、DummyOperator利用時に 「trigger_rule='none_failed'」の引数を付け加えないと、先に実行されたケースもありましたので注意が必要そうです。

「同期を取るoperatorの時刻差」ですが、何度も繰り返してると混乱してしまう可能性もあるので、テストでも十分に気をつけて対応しないといけないですよ(to自分)。実際の業務でもtimedeltaで指定する時刻に誤りをテストで見落としてしまい、その結果、リリース後に同期処理が想定時間通りに終わらず遅延してしまい、外部連携のタイミングに間に合わなくなり問題になってしまいました...。

今回の検証例は、お互いdaily実行だったのですが、頻度が異なると慎重な対応が求められそうです。 ただ、この辺の制御はAirflowのコアな制御なので、今後、利用者が意識しないで済むようにdag_idとオプション引数で渡したら、同期を取ってくれるような機能があると良いなとも思いました。利用頻度が多くなると、そういった実装も検討しないといけないのかもしれません。

あとは、WebUI上にてDAGの関係性がグラフで見れると、意図したoperatorで同期が取れているかの確認がやりやすいと思いました。 見方が分からないだけなのか、未実装なのか把握できてないのですが、WebUIをみる限り今回のAirflowのバージョン1.10.10ではなさそうです。

最後に

Airflowでの同期処理について少しでも伝わればと思い、このテーマで記事を書いてみました。 本記事を通して少しでもイメージを掴めて頂けますと幸いです。 他にも面白そうな機能はあると思いますので、また、機会があれば投稿したいと思います。

私からは以上となります。最後までお読み頂きありがとうございました。

明日の記事の担当はエンジニアの高山さんです。お楽しみに。


株式会社エニグモ 正社員の求人一覧

hrmos.co