Redashがバージョンアップ(v5)して便利になったこと&不便になったこと

エニグモ データ活用推進室 @kmt47 です。 この記事はEnigmo Advent Calendar 2018の14日目です。

概要

redashがv5にバージョンアップしました。(少し時間経ちますが) この記事では、ユーザ目線でredash v5の新機能を紹介します。 v3→v4へのバージョンアップと比べると、機能的な追加は少なめ&追加された機能の便利度も低めといった印象です。 また、v4で追加になった超便利機能「表示形式(format)」設定において、v5からパーセント表示の仕様が変更になっています。 それでは、上手に新機能を使って、redash作成を効率化していきましょう!

クエリ編(表、グラフ)

クエリを「お気に入り登録」できるようになった(便利度:★★)

クエリを「お気に入り登録」できるようになりました。

「お気に入り登録」したクエリは、redashのトップページに「Favorite Queries」として表示されます。 ※ redashのトップページとは、画面の最上段のリダッシュのマーク(💭)をクリックしたときの画面 また、クエリ一覧画面(Queries)では、右側の「☆Favorites」で絞り込むことができます。 小技として、最上段の「Queries」の右の下のボタンをクリックすると、「お気に入り登録」したクエリが選択できるようになっています。

「お気に入り登録」の方法は簡単で、クエリの画面でクエリ名の左の☆をクリックするだけです。 登録されたクエリは、★ が黄色になります。 もう一度 ★ をクリックすると、登録が解除されます。

良く利用するクエリは「お気に入り登録」しておくことで、トップページから簡単にアクセスできるようになります。

クエリに「タグ」を付けることができるようになった(便利度:★★)

クエリに任意の「タグ」を付けることができるようになりました。

「タグ」を付けることで、クエリ一覧画面(Queries)の右側のリストから選択したタグで絞り込むことができます。

「タグ」を付ける方法は、クエリの画面の最上段にマウスをナビゲーションすると「+Add tag」のボタンが表示されます。 そのボタンを押すとポップアップ画面が表示されるので、既に登録済みのタグを選択、または新たなタグを入力して付与することができます。 1つのクエリにタグは複数付けることができます。 付けたタグを削除したい場合は、タグの右に表示される鉛筆マークをクリックして、削除したいタグの「×」を押してください。

パラメータの作成ボタンが追加になった(便利度:★★)

これまでは、パラメータを作成するときは、クエリ入力画面に波括弧を二つ {{ }} 入力して、その間にパラメータ名を入れていたかと思います。 こんな感じです。→ {{ hoge }}

v5では、パラメータ追加ボタンが追加されました。 クエリ入力画面の下の {{ }} ボタンがそれです。

ボタンをクリックすると「Add Parameter」というポップアップが表示されます。 「Keyword」は、クエリに埋め込むパラメータの名前、「Title」はパラメータの入力エリアの名前です。 「Type」でパラメータのデータ型を選ぶことができます。

なお、これまで通りクエリ入力画面で直接追加することもできます。

パーセント表示の仕様が変更になった(便利度:マイナス☆☆☆☆)

この記事「Redashがバージョンアップ(v4)して便利になったこと」にも記載しましたが、v4で追加された最大の便利機能「数値の表示形式(フォーマット)をredashで設定できるようになった」のパーセント表示に関する仕様が変更になりました。

例えば、前年比を表示する場合、 「売上(今年) 120万円」÷「売上(去年) 100万円」の結果に対して、表示形式で「0.0%」を指定すれば、 120 ÷ 100 = 1.2 で 「120.0%」と表示されていました。

しかし、v5では、このままでは「1.2%」となってしまいます。 つまり、v4では自動的に100倍されましたが、v5でクエリで100倍する必要があります。 個人的な意見としては、v4の仕様の方が良い(正しい)と思います。 Excelでも1.2の値をパーセント表示にしたら120.0%になりますよね。それを敢えて100倍するなんて。

パーセント表示を使っていたクエリは、v5へのバージョンアップによって、全て修正が必要になりました。

ダッシュボード

ダッシュボードを「お気に入り登録」できるようになった(便利度:★★)

使い方はクエリと同じなので、詳細は省略します。

ダッシュボードに「タグ」を付けることができるようになった(便利度:★★)

使い方はクエリと同じなので、詳細は省略します。

ダッシュボードの変更時にグチャってならなくなった。(便利度:★★★★)

v5でもっとも良くなった点は、ここだと思います。 v4のユーザであれば「グチャって」いう意味が分かると思いますが、 v4では、ダッシュボードのクエリのサイズ、特に高さを変更していると、クエリのサイズが崩れて(「グチャって」)、取り返しのつかないことになった経験がある方も多いと思います。 その為、一度変更したら、保存して、また変更したら保存して、といったように崩れても元に戻れる対策を講じながら作業を行う必要がありました。クエリのサイズを自由に変更できる機能はv4で追加された便利な機能なのですが、ちょっと不具合があったようです。 それがv5では修正されています。ちょっと変更するごとに保存して、「グチャって」なったら修正をキャンセルして、といった作業から解放されたのは非常に助かりました。

まとめ

v5へのバージョンアップでは、便利機能の追加は少なめでした。 v4では、かなり便利な機能が多く追加されていたので、偶数バージョンのv6に期待します。

Org-modeを半年くらい使ってみた

Org-modeを半年くらい使ってみた

Enigmo Advent Calendar 2018の12日目の記事です。

こんにちは、エンジニアの@t4kuです。半年ほどorg-modeを使ってメモや、日々のタスク管理を行ってきたのでやってみた感想を共有しようと思います。

org-modeとは何か?

org-modeとはemacs上で動作するアウトライナーです。

アウトライナーは有名なところでいうとMacアプリではOmnioutliner や webアプリでもworkflowyなどがあります。

workflowlyについてはこちらの紹介記事がわかりやすいです(丸投げ) http://goryugo.com/20180412/dynalist-workflowy/

課題やタスクのブレークダウンなど考えをまとめたりするのに org-modeではこのようなツリー構造をプレーンテキストで書いておけば 鞍上いい感じに表示してくれます。

markdownでも同じようなことができますが、ノードを移動したりインデントを変えたりするのが面倒なのでそういう用途でmarkdownを使う人はいないと思います。

また、スケジュール機能やTODOやタグやクロック機能もあるのでこれだけで 見積もりや振り返りがプレーンテキストで完結します。

org-modeのここがいい

自分が使っていて特によいと思った機能です。

テーブル表記の入力が楽

勝手にフィールドの幅を調整してくれたりなかなか便利だなと思いました。

体験すると、qiitaのmarkdownでtableを書くことが苦行というかほとんど罰ゲームに感じるようになってきました。

画像や数式が差し込める

プレーンテキストでありながら画像も入れれるので、gitなどで履歴管理しつつ最低限わかりやすい ビジュアルをキープできるので、プログラマのメモとしてはいいバランスだと思います。

Latex記法で書いたものは数式が表示されます。

ソースコードが実行できる

org-babelという拡張があるのでソースコードブロックで書いたものを評価して、結果を表示できます。

※ob-ipythonというjupyterに繋ぐ拡張が必要ですが ob-ipython org-babel integration with Jupyter for evaluation of (Python by default) code blocks

スケジュール機能(アジェンダ)

ノードにスケジュールを設定しておくと、アジェンダコマンドを利用してその日にスケジュールされたタスク一覧(アジェンダビュー)を表示することができます。

※実際のファイルが出せないのでテキトーなタスクなのでわかりにくくてすいません

アジェンダビューはスケジュール日別に出したり、deadlineごとに出したり、タグごとに出したりいろいろカスタマイズできますが、自分の場合は、オペレーション系のタスクとプロジェクトごとのタスクごとに一覧化するようにして、一日毎の作業を管理する別のorgファイルにコピーします。

名称未設定2.png

一日のタスクを直列に並べると、あんまり余計なことを考えずにただこなしていけるような気がします。

クロック機能が便利

各ノードにTODOステータスやスケジュールを設定するだけでなく、実際に作業をする時にクロックインすると 時間を記録してくれます。また、任意の期間でレポートを作成できます。

f:id:enigmo777:20200415200414g:plain

活用法

何も考えずに単体のorgファイルをそのまま使っててもいいのですが、自分の場合は下記のように Dropbox配下のディレクトリを分けてメモと予定/振り返りを管理しています。

orgファイル間は簡単にリンクを貼って辿れるので、アジェンダファイル(Agenda/work.org)内のトピックから必要なファイルにリンクを張っておけば、だいたい事足ります。

どのようにファイルをオーガナイズするかやどういう単位で分割するかということについては深遠なテーマで、半年くらい使った素人ではまだキャッチアップできない(というか一生できる気がしない)のですが、下記のyoutubeシリーズはすごく勉強になりました。

org-mode tutorials

半年くらい使ってみた感想

何をどこに書くべきかが決まってきてキーバインドにも馴染んでくると、フローを壊さずに開発してるときもも打ち合わせしてるときも、アイデアをためておけるので、何かのインタラプションがあっても、安心して忘れられる他、ググる回数やブラウザで遷移する回数が減ったきがします。

また、テキストなのですべてgitで管理できるので、週次ごとにプルリクエストを作るようにすると diffを見れば振り返るのが一目瞭然です。

あと副産物ですが、普段プログラミングをする際はvimを使っているのですが、org-modeのためだけにemacsを使うようになり少しemacsの良さがわかってきました。そして両方の宗教を理解することで、世界平和に少し貢献できるような気がしてきました。

参考

React DnDでスマホでもドラッグアンドドロップ

Enigmo Advent Calendar 2018の12日目の記事です。

注意: この記事のサンプルコードで使われている各ライブラリのバージョンは下記になります。

react 16.4.0
react-dnd 4.0.2
react-dnd-html5-backend 4.0.2
react-dnd-touch-backend 0.5.1

React DnD

Reactでドラッグアンドドロップでの並び替えを実装する際によく使われるのがReact DnDというライブラリです。 このライブラリではHTML5Drag and Drop APIを利用してドラッグアンドドロップを実現していますが、このAPI自体がスマートフォンなどのタッチデバイスには対応しておらず、スマホでそのままドラッグアンドドロップを実装することができません。

TouchBackend

React DnDを使う際、ドラッグアンドドロップしたいコンポーネントDragDropContext という HOC(Higer Order Component) に渡します。 この DragDropContext の最初の引数に渡すのは通常、 HTML5Backend というバックエンドモジュールです。

import HTML5Backend from 'react-dnd-html5-backend'
import { DragDropContext } from 'react-dnd'

class YourApp {
    /* ... */
}

export default DragDropContext(HTML5Backend)(YourApp)

前述した通りタッチデバイスの場合はこの HTML5Backend は使えません。 しかしタッチデバイス対応した TouchBackendというものがあるのでそちらを使います。

import HTML5Backend from 'react-dnd-html5-backend'
import TouchBackend from 'react-dnd-touch-backend';
import { DragDropContext } from 'react-dnd'

const isTouchDevice = () => {
 /* タッチデバイス判定 */
}

class YourApp {
    /* ... */
}
export default DragDropContext(isTouchDevice() ? TouchBackend : HTML5Backend)(YourApp)

これだけでタッチデバイス対応ができました。 しかし、 HTML5Backend のようにいい感じにプレビューされません。

HTML5Backendではちゃんとプレビューされている


TouchBackendではプレビューされていない!

ChromeのDevToolsでスマートフォンをエミュレートして録画しているためマウスカーソルが表示されています。

DragLayer

React DnD にはDragLayerという、ドラッグ時のプレビュー表示をカスタマイズできるAPIがあります。 これを使うことでタッチデバイスでもいい感じのプレビューを表示することができます。

利用側のサンプルコードは以下です。

import React from 'react'
import DragLayer from 'react-dnd/lib/DragLayer'
import TouchBackend from 'react-dnd-touch-backend';
import { DragDropContext } from 'react-dnd'

function collect(monitor) {
  const item = monitor.getItem()
  return {
    currentOffset: monitor.getSourceClientOffset(),
    previewProps: item && item.previewProps,
    isDragging:
      monitor.isDragging() && monitor.getItemType() === 'IMAGE'
  }
}

function getItemStyles(currentOffset) {
  if (!currentOffset) {
    return {
      display: 'none'
    }
  }

  const x = currentOffset.x
  const y = currentOffset.y
  const transform = `translate(${x}px, ${y}px) scale(1.05)`

  return {
    WebkitTransform: transform,
    transform: transform,
  }
}

class PreviewComponent extends React.Component {
  render() {
    const { isDragging, previewProps, currentOffset } = this.props
    if (!isDragging) {
      return null
    }

    return (
      <div>
        {/*...*/}
      </div>
    )
  }
}

const DragPreview = DragLayer(collect)(PreviewComponent)


class YourApp {
  render() {
    return (
      <div>
        {/* ... */}
        
      </div>
    )
  }
}

export default DragDropContext(TouchBackend)(YourApp)

かんたんに解説

DragLayer の引数 collect 関数ではDragLayerMonitorのオブジェクトが渡されます。 monitor.getItem()DragSource にアクセスすることができ、 任意で渡した props(今回の場合は previewProps という名前で渡していますが、どんな名前でも渡すことができます) にアクセスできます。 また、 monitor.isDragging で実際にドラッグされているか判定することができます。 同一画面の他のコンポーネントでもドラッグアンドドロップするために、 DragDropContext が複数ある場合は monitor.getItemType() でどのコンテキストなのかを判定するとよいでしょう。 プレビューがタッチした部分に追従するように monitor.getSourceClientOffset() を使ってオフセット座標を返しておきます。 collect 関数の返り値のオブジェクトはそのままプレビュー用のコンポーネントprops として受け取ることができます。 getItemStyles 関数では受け取った props.currentOffset を使ってCSSを調整しています。

DragDropContext に渡したコンポーネントDragLayer を描画することで、ドラッグ時にプレビューを表示することができます。


スマホでもプレビューができた!

ChromeのDevToolsでスマートフォンをエミュレートして録画しているためマウスカーソルが表示されています。

最後に

スマートフォンなどのタッチデバイスHTML5のようなドラッグアンドドロップを実現する方法を解説しました。 実際に実装する際は、TouchBackendのリポジトリ に完全に動作するサンプルがあるのでそちらも参考にしてみてください。

参考リンク

http://react-dnd.github.io/react-dnd/about https://github.com/yahoo/react-dnd-touch-backend

Apache Airflow で実現するSQL ServerからBigQueryへのデータ同期

はじめに

この記事はEnigmo Advent Calendar 2018の11日目です。

Enigmoでは、データウェアハウス(DWH)としてBigQueryを使っていて、サービスのアクセスログやサイト内の行動ログ、データベースのデータをBigQueryへ集約させています。

データベースからBigQueryへのデータ同期にはApache Airflowを使っていて、今日はその仕組みについて紹介します。

Apache Airflowとは

Airflowは、pythonでワークフロー(DAG)を定義すると、そのとおりにタスク(オペレーター) をスケジューリングして起動してくれるツールです。GCPでもGKE上でAirflowを動かすCloud Composerというサービスが提供されていてご存知の方も多いと思います。

データの処理の単位をオペレータで定義し、その処理の依存関係を反映したワークフローをDAGで定義してやればデータ処理のパイプラインを実現することが可能となります。

DBからBigQueryへのデータパイプライン

データの流れ

データの流れとしては、上の図の通り大きく2フェーズに分かれていて、まずはDB(SQL Server)からGoogle Cloud Storage(GCS)へデータをアップロードしています。その次にGCSからBigQueryへそのデータをロードしています。

それぞれのフェーズをAirflowのタスクの単位であるオペレーターで実現していて、さらに2つのオペレーターはそれぞれ同期するテーブルごと別のタスクとして存在し、それらをDAGという1つのワークフローの単位でまとめています。

SQL ServerからGCSへ

JdbcToGoogleCloudStorageOperator

SQL ServerからGCSへのデータの移動は JdbcToGoogleCloudStorageOperator というAirflowのオペレーターが担当します。

DBがMySQLの場合はMySqlToGoogleCloudStorageOperatorというAirflowに組み込みのオペレーターがあるんですが、バイマのデータベースはSQL Serverなので、JDBCのクライアントで同様の働きをするオペレーターを自前で作ったものが JdbcToGoogleCloudStorageOperator です。Airflowのプラグインとして公開しています。

github.com

このオペレータでの処理は、まずDBからSQLでデータを抽出し、一度JSONL形式のファイルとしてのオペレーターが動くサーバーのローカルに保存され、それがGCSへアップロードされるという流れです。BigQueryへロードするときにスキーマ定義が必要なので、データファイルとは別にスキーマ定義のファイルもJSON形式でGCSへアップロードされます。

スケジューリングと更新差分抽出の仕組み

DAGのスケジューリング間隔は1時間に設定しています。するとAirflowは時間を1時間ごとに期間を分けてDAGにその期間の開始時刻(execution_date)、終了時刻(next_execution_date)をテンプレートのパラメーターとして渡してくれます。それらを データ抽出SQLのWHERE句のところでレコードの更新日時を記録するカラム(下の例ではupdated_at)を基準に期間指定すると、その期間に更新があったレコードだけが抽出され、BigQuery側へ送られる仕組みです。

SELECT 
  * 
FROM 
  table1 
WHERE 
  "{{execution_date.strftime('%Y-%m-%d %H:%M:%S')" &lt;= updated_at 
  AND updated_at &lt; &quot;{{next_execution_date).strftime(&#039;%Y-%m-%d %H:%M:%S&#039;)}}&quot;

もし間隔を変えてもDAGを編集することなくSQLがその期間に合わせて変わってくれるので便利です。

GCSからBigQueryへ

GoogleCloudStorageToBigQueryOperator

GCSからBigQueryへはその名の通りAirflow組み込みのGoogleCloudStorageToBigQueryOperatorというオペレーターがやってくれます。

BigQuery側のデータセットは同期元DBのデータベース単位、テーブルは同期元DBのテーブル単位に分けています。BigQuery側のテーブルはDB側のレコードの更新日ごとに日付分割しています。

BigQueryの更新はDMLは使わずに、ファイルを読み込みジョブで更新されます。そうするとDB側のレコードが更新されるとBigQuery側には重複してレコードが溜まっていくのですが、それは後述の重複除外ビューで解決しています。

BigQuery側でレコードの重複を除外

BigQuery側のテーブルでは、次のようなSQLでビューテーブルを作ることで、同期元のDBでレコードが何度も更新されても常に最新のレコードしか現れない仕組みになっています。

この例は、主キーがidで更新日時のカラムが updated_at の場合のSQLです。同一idに対して常に最新のupdated_at をもつレコードしかこのビューには出てきません。

SELECT *
FROM (
    SELECT *, ROW_NUMBER() OVER (
      PARTITION BY id
      ORDER BY updated_at DESC)  etl_row_num
    FROM
        `db1.table1_*`)
WHERE etl_row_num = 1

Airflowで便利だった機能

Airflowの機能でこの仕組みをつくるのに助けられた機能がいくつかあったので紹介します。書ききれてないですが、ほかにもたくさんあります。

Catchup

DAGのスケジュールを過去の期間にさかのぼって実行してくれる機能なんですが、非常にありがたかったです。 過去のデータの移行でも差分同期の仕組みがそのまま使えましたし、一度に同期せずに、期間を区切って少しずつデータを持っていけたので、同期元のDBにも負荷をかけずにすみました。

Connection、Variable

Connectionは接続先となるDBやGCPへの認証情報を一元管理してくれ、一度設定すればどのDAGからアクセスできて便利でした。次のPoolも同じなんですが、設定はGUIでもCLIでも設定できるので、ansibleなどのプロビジョニングツールでも設定できたのもありがたかったです。

Variableも単なるキーと値を設定できるだけなんですが、DAGを汚すことなくdevやproductionなどリリースステージごとに値を切り替えられて便利でした。

Pool

タスクの同時実行数を制限する機能です。Poolはユーザーが定義でき、そのPoolにオペレーターを紐付けるとそのオペレーターはそのPoolのslot数を超えて同時実行されません。データ抽出のタスクが1つのDBに対して多数同時実行されてしまうとそのDBのコネクションも同時に消費され、枯渇しかねませんが、このPoolで上限数を設定できたので安心でした。

まとめ

最初は手っ取り早くcronとスクリプトで作ってしまおうと思ったのですが、すこしなれるまで時間はかかったもののAirflowで作って良かったです。開発が進むにつれ、特にプロダクション環境で動かすにあたっていろいろ考慮すべきことが出てくると思うのですが、作りながらほしいと思った機能が先回りされているかのように用意されていてとても助かりました。全て使いきれてないですが、ワークフロー運用のノウハウがたくさん詰まった良いプロダクトだと思いました。

OptunaとLightGBMを使って、Kaggle過去コンペにsubmitする

この記事はEnigmo Advent Calendar 2018の10日目です。

はじめに

OptunaはPFN社が公開したハイパーパラメータ自動最適化フレームワークです。

https://research.preferred.jp/2018/12/optuna-release/

目的関数さえ決めれば、直感的に最適化を走らせることが可能のようです。

今回、最適化自体の説明は割愛させていただきますが、機械学習の入門ということを考えるとハイパーパラメータの調整としては、gridsearchやRandomizedSearchCVで行う機会が多いと思います。 スキル、あるいはリソースでなんとかするということになるかと思いますが、特に、kaggleのような0.X%の精度が向上が重要になるような状況では、ハイパーパラメータのチューニングが大きなハードルの一つになります。 そこで、titanicでのsubmitはあるものの、Kaggleの経験がほぼゼロな筆者でも、Optunaで簡単にチューニングができるかどうかを試してみようと思います。

今回の対象コンペ

既にcloseしているコンペの中で、下記のPorto Seguro’s Safe Driver Predictionを選びました。 https://www.kaggle.com/c/porto-seguro-safe-driver-prediction 選定理由は以下の通りです。

  • データがそれほど大きくない
  • 手元(自宅)のラップトップのRAMは8GBと大きくないので、XGboostではなくメモリ消費が抑えられるLightGBMでやってみたい
  • 解法がシンプルかつ、LightGBMで上位のスコアを解法を公開しているカーネルがすぐに見つかった

公開解法の再現

https://www.kaggle.com/xiaozhouwang/2nd-place-lightgbm-solution

上記をそのままコピペして一回submitします。 Python2対応のようなので、下記のようにPython3で動くように修正しました。

# part of 2nd place solution: lightgbm model with private score 0.29124 and public lb score 0.28555

import lightgbm as lgbm
from scipy import sparse as ssp
from sklearn.model_selection import StratifiedKFold
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

def Gini(y_true, y_pred):
    # check and get number of samples
    assert y_true.shape == y_pred.shape
    n_samples = y_true.shape[0]

    # sort rows on prediction column
    # (from largest to smallest)
    arr = np.array([y_true, y_pred]).transpose()
    true_order = arr[arr[:, 0].argsort()][::-1, 0]
    pred_order = arr[arr[:, 1].argsort()][::-1, 0]

    # get Lorenz curves
    L_true = np.cumsum(true_order) * 1. / np.sum(true_order)
    L_pred = np.cumsum(pred_order) * 1. / np.sum(pred_order)
    L_ones = np.linspace(1 / n_samples, 1, n_samples)

    # get Gini coefficients (area between curves)
    G_true = np.sum(L_ones - L_true)
    G_pred = np.sum(L_ones - L_pred)

    # normalize to true Gini coefficient
    return G_pred * 1. / G_true


cv_only = True
save_cv = True
full_train = False


def evalerror(preds, dtrain):
    labels = dtrain.get_label()
    return 'gini', Gini(labels, preds), True


path = "input/"

train = pd.read_csv(path+'train.csv')
train_label = train['target']
train_id = train['id']
test = pd.read_csv(path+'test.csv')
test_id = test['id']

NFOLDS = 5
kfold = StratifiedKFold(n_splits=NFOLDS, shuffle=True, random_state=218)

y = train['target'].values
drop_feature = [
    'id',
    'target'
]

X = train.drop(drop_feature,axis=1)
feature_names = X.columns.tolist()
cat_features = [c for c in feature_names if ('cat' in c and 'count' not in c)]
num_features = [c for c in feature_names if ('cat' not in c and 'calc' not in c)]

train['missing'] = (train==-1).sum(axis=1).astype(float)
test['missing'] = (test==-1).sum(axis=1).astype(float)
num_features.append('missing')

for c in cat_features:
    le = LabelEncoder()
    le.fit(train[c])
    train[c] = le.transform(train[c])
    test[c] = le.transform(test[c])

enc = OneHotEncoder(categories='auto')
enc.fit(train[cat_features])
X_cat = enc.transform(train[cat_features])
X_t_cat = enc.transform(test[cat_features])

ind_features = [c for c in feature_names if 'ind' in c]
count=0
for c in ind_features:
    if count==0:
        train['new_ind'] = train[c].astype(str)+'_'
        test['new_ind'] = test[c].astype(str)+'_'
        count+=1
    else:
        train['new_ind'] += train[c].astype(str)+'_'
        test['new_ind'] += test[c].astype(str)+'_'

cat_count_features = []
for c in cat_features+['new_ind']:
    d = pd.concat([train[c],test[c]]).value_counts().to_dict()
    train['%s_count'%c] = train[c].apply(lambda x:d.get(x,0))
    test['%s_count'%c] = test[c].apply(lambda x:d.get(x,0))
    cat_count_features.append('%s_count'%c)

train_list = [train[num_features+cat_count_features].values,X_cat,]
test_list = [test[num_features+cat_count_features].values,X_t_cat,]

X = ssp.hstack(train_list).tocsr()
X_test = ssp.hstack(test_list).tocsr()

learning_rate = 0.1
num_leaves = 15
min_data_in_leaf = 2000
feature_fraction = 0.6
num_boost_round = 10000
params = {"objective": "binary",
          "boosting_type": "gbdt",
          "learning_rate": learning_rate,
          "num_leaves": num_leaves,
           "max_bin": 256,
          "feature_fraction": feature_fraction,
          "verbosity": 0,
          "drop_rate": 0.1,
          "is_unbalance": False,
          "max_drop": 50,
          "min_child_samples": 10,
          "min_child_weight": 150,
          "min_split_gain": 0,
          "subsample": 0.9
          }

x_score = []
final_cv_train = np.zeros(len(train_label))
final_cv_pred = np.zeros(len(test_id))
for s in range(16):
    cv_train = np.zeros(len(train_label))
    cv_pred = np.zeros(len(test_id))

    params['seed'] = s

    if cv_only:
        kf = kfold.split(X, train_label)

        best_trees = []
        fold_scores = []

        for i, (train_fold, validate) in enumerate(kf):
            X_train, X_validate, label_train, label_validate = \
                X[train_fold, :], X[validate, :], train_label[train_fold], train_label[validate]
            dtrain = lgbm.Dataset(X_train, label_train)
            dvalid = lgbm.Dataset(X_validate, label_validate, reference=dtrain)
            bst = lgbm.train(params, dtrain, num_boost_round, valid_sets=dvalid, feval=evalerror, verbose_eval=100,
                            early_stopping_rounds=100, )
            best_trees.append(bst.best_iteration)
            cv_pred += bst.predict(X_test, num_iteration=bst.best_iteration)
            cv_train[validate] += bst.predict(X_validate)

            score = Gini(label_validate, cv_train[validate])
            print(score)
            fold_scores.append(score)

        cv_pred /= NFOLDS
        final_cv_train += cv_train
        final_cv_pred += cv_pred

        print("cv score:")
        print(Gini(train_label, cv_train))
        print("current score:", Gini(train_label, final_cv_train / (s + 1.)), s+1)
        print(fold_scores)
        print(best_trees, np.mean(best_trees))

        x_score.append(Gini(train_label, cv_train))

print(x_score)
pd.DataFrame({'id': test_id, 'target': final_cv_pred / 16.}).to_csv('model/lgbm3_pred_avg.csv', index=False)
pd.DataFrame({'id': train_id, 'target': final_cv_train / 16.}).to_csv('model/lgbm3_cv_avg.csv', index=False)

公開解法でのsubmit

Private Scoreで0.29097。5169チーム中46位のスコアとなり、シルバーメダル圏内に入りました。 コンペは終了しているので、もちろんスコアボードの本体は更新はされません。

なお、実際のコンペでは、カーネルの著書から他のNeral Networkでの予測値の平均と記載があるので、2位のsubmitの再現というわけにならないようです。

しかし、このようなシンプルな方法でシルバーメダルのスコアを取れるのは、個人的にもKaggleに積極してみたいという励みになったと感じています。

ハイパーパラメータのチューニング

さて、ハイパーパラメータのチューニングをフレームワークの力を借りて、ハードルをぐっと下げようという、本題に移ります。

他のKaggleのコンペや、Stack over flowで雑に調査し、パラメータの範囲を決めました。 そうしてできた修正したソースコードが、以下のようになります。

import lightgbm as lgbm
import optuna
from scipy import sparse as ssp
from sklearn.model_selection import StratifiedKFold
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

def Gini(y_true, y_pred):
    # check and get number of samples
    assert y_true.shape == y_pred.shape
    n_samples = y_true.shape[0]

    # sort rows on prediction column
    # (from largest to smallest)
    arr = np.array([y_true, y_pred]).transpose()
    true_order = arr[arr[:, 0].argsort()][::-1, 0]
    pred_order = arr[arr[:, 1].argsort()][::-1, 0]

    # get Lorenz curves
    L_true = np.cumsum(true_order) * 1. / np.sum(true_order)
    L_pred = np.cumsum(pred_order) * 1. / np.sum(pred_order)
    L_ones = np.linspace(1 / n_samples, 1, n_samples)

    # get Gini coefficients (area between curves)
    G_true = np.sum(L_ones - L_true)
    G_pred = np.sum(L_ones - L_pred)

    # normalize to true Gini coefficient
    return G_pred * 1. / G_true

cv_only = True
save_cv = True
full_train = False

def evalerror(preds, dtrain):
    labels = dtrain.get_label()
    return 'gini', Gini(labels, preds), True

path = "input/"

train = pd.read_csv(path+'train.csv')
#train = train.sample(frac=0.1, random_state=0).reset_index(drop=True)
train_label = train['target']
train_id = train['id']
test = pd.read_csv(path+'test.csv')
#test = test.sample(frac=0.1, random_state=0).reset_index(drop=True)
test_id = test['id']

NFOLDS = 4
kfold = StratifiedKFold(n_splits=NFOLDS, shuffle=True, random_state=218)

y = train['target'].values
drop_feature = [
    'id',
    'target'
]

X = train.drop(drop_feature,axis=1)
feature_names = X.columns.tolist()
cat_features = [c for c in feature_names if ('cat' in c and 'count' not in c)]
num_features = [c for c in feature_names if ('cat' not in c and 'calc' not in c)]

train['missing'] = (train==-1).sum(axis=1).astype(float)
test['missing'] = (test==-1).sum(axis=1).astype(float)
num_features.append('missing')
train.shape
for c in cat_features:
    le = LabelEncoder()
    le.fit(train[c])
    train[c] = le.transform(train[c])
    test[c] = le.transform(test[c])

# 事前にlabelEncoderを行っているから、この使い方でユニークな値で割り当てられる。引数categories = 'auto'で警告を消す
enc = OneHotEncoder(categories='auto')
enc.fit(train[cat_features])
X_cat = enc.transform(train[cat_features])
X_t_cat = enc.transform(test[cat_features])


ind_features = [c for c in feature_names if 'ind' in c]
count=0
for c in ind_features:
    if count == 0:
        train['new_ind'] = train[c].astype(str)+'_'
        test['new_ind'] = test[c].astype(str)+'_'
        count += 1
    else:
        train['new_ind'] += train[c].astype(str)+'_'
        test['new_ind'] += test[c].astype(str)+'_'

cat_count_features = []
for c in cat_features+['new_ind']:
    d = pd.concat([train[c],test[c]]).value_counts().to_dict()
    train['%s_count'%c] = train[c].apply(lambda x:d.get(x,0))
    test['%s_count'%c] = test[c].apply(lambda x:d.get(x,0))
    cat_count_features.append('%s_count'%c)

train_list = [train[num_features+cat_count_features].values, X_cat]
test_list = [test[num_features+cat_count_features].values, X_t_cat]

X = ssp.hstack(train_list).tocsr()
X_test = ssp.hstack(test_list).tocsr()


def objective(trial):
    drop_rate = trial.suggest_uniform('drop_rate', 0, 1.0)
    feature_fraction = trial.suggest_uniform('feature_fraction', 0, 1.0)
    learning_rate = trial.suggest_uniform('learning_rate', 0, 1.0)
    subsample = trial.suggest_uniform('subsample', 0.8, 1.0)
    num_leaves = trial.suggest_int('num_leaves', 5, 1000)
    verbosity = trial.suggest_int('verbosity', -1, 1)
    num_boost_round = trial.suggest_int('num_boost_round', 10, 100000)
    min_data_in_leaf = trial.suggest_int('min_data_in_leaf', 10, 100000)
    min_child_samples = trial.suggest_int('min_child_samples', 5, 500)
    min_child_weight = trial.suggest_int('min_child_weight', 5, 500)

    params = {"objective": "binary",
              "boosting_type": "gbdt",
              "learning_rate": learning_rate,
              "num_leaves": num_leaves,
              "max_bin": 256,
              "feature_fraction": feature_fraction,
              "verbosity": verbosity,
              "drop_rate": drop_rate,
              "is_unbalance": False,
              "max_drop": 50,
              "min_child_samples": min_child_samples,
              "min_child_weight": min_child_weight,
              "min_split_gain": 0,
              "min_data_in_leaf": min_data_in_leaf,
              "subsample": subsample
              }

    x_score = []
    final_cv_train = np.zeros(len(train_label))
    final_cv_pred = np.zeros(len(test_id))

    cv_train = np.zeros(len(train_label))
    cv_pred = np.zeros(len(test_id))

    params['seed'] = 0

    kf = kfold.split(X, train_label)

    best_trees = []
    fold_scores = []

    for i, (train_fold, validate) in enumerate(kf):
        print('kfold_index:', i)
        X_train, X_validate, label_train, label_validate = \
            X[train_fold, :], X[validate, :], train_label[train_fold], train_label[validate]
        dtrain = lgbm.Dataset(X_train, label_train)
        dvalid = lgbm.Dataset(X_validate, label_validate, reference=dtrain)
        bst = lgbm.train(params, dtrain, num_boost_round, valid_sets=dvalid, feval=evalerror, verbose_eval=100,
                        early_stopping_rounds=100)
        best_trees.append(bst.best_iteration)
        cv_pred += bst.predict(X_test, num_iteration=bst.best_iteration)
        cv_train[validate] += bst.predict(X_validate)

        score = Gini(label_validate, cv_train[validate])
        print(score)
        fold_scores.append(score)


    cv_pred /= NFOLDS
    final_cv_train += cv_train
    final_cv_pred += cv_pred

    print("cv score:")
    print(Gini(train_label, cv_train))
    print("current score:", Gini(train_label, final_cv_train / (s + 1.)), s+1)
    print(fold_scores)
    print(best_trees, np.mean(best_trees))

    x_score.append(Gini(train_label, cv_train))
    print(x_score)


    pd.DataFrame({'id': test_id, 'target': final_cv_pred / 16.}).to_csv('model/lgbm3_pred_avg_2.csv', index=False)
    pd.DataFrame({'id': train_id, 'target': final_cv_train / 16.}).to_csv('model/lgbm3_cv_avg_2.csv', index=False)

    return (1 - x_score[0])

study = optuna.create_study()
study.optimize(objective, n_trials=150)

パラメータの設定の範囲を抜粋すると以下のようになります。

drop_rate = trial.suggest_uniform('drop_rate', 0, 1.0)
feature_fraction = trial.suggest_uniform('feature_fraction', 0, 1.0)
learning_rate = trial.suggest_uniform('learning_rate', 0, 1.0)
subsample = trial.suggest_uniform('subsample', 0.8, 1.0)
num_leaves = trial.suggest_int('num_leaves', 5, 1000)
verbosity = trial.suggest_int('verbosity', -1, 1)
num_boost_round = trial.suggest_int('num_boost_round', 10, 100000)
min_data_in_leaf = trial.suggest_int('min_data_in_leaf', 10, 100000)
min_child_samples = trial.suggest_int('min_child_samples', 5, 500)
min_child_weight = trial.suggest_int('min_child_weight', 5, 500)

なお、Optuna自体の使用方法は、下記の記事と公式リファレンスを参考させていただきした。

https://qiita.com/ryota717/items/28e2167ea69bee7e250d https://optuna.readthedocs.io/en/stable/index.html

(18/12/11 19:41追記) コメントいただけた通り、'verbosity'は、警告レベルの表示を制御するパラメータであり、予測性能の最適化としては意味の無いパラメータでした。ですので、チューニングの対象にはすべきではありませんでした。

以下のように試行回数を定めていますが、

n_trials=150 

時間が足りなくなった関係で、その時点で計算されたパラメータで最適化を中断しております。 20時間ほど回し回しましたが、ハイパーパラメータによって検証の時間は1分から60分程度となり、 100回くらいの試行数だったようです。

そうしてできてパラメータが、以下のように、2位の解法と比較すると以下のようになります。

ハイパーパラメータ 今回のチューニング結果 2位の解法
drop_rate 0.3015600134599976 0.1
feature_fraction 0.46650703511665226 0.6
learning_rate 0.004772377676601769 0.1
subsample 0.8080720420805803 0.9
num_leaves 718 15
verbosity -1 0
num_boost_round 1942 10000
min_data_in_leaf 212 150
min_child_samples 68 10
min_child_weight 151 150

2位コンペとの解法とは、雰囲気が異なるセットとなり、公開解法の再現ということにはならないようです。 K_fold=4 でやっていることも異なる要因になると思います。

算出できたハイパーパラメータでsubmit

最初のpython3のスクリプトからパラメータを入れ替え、予測値を算出しました。 K_fold =4, また、ランダムシートの数を16から4に減らしております。

結果

スコアは下がってます。

1176位相当。。ハイパーパラメータ次第でシルバーメダル圏内ということを考えると、微妙な結果です。

所感

結果としては残念ですが、grid searchだけに頼らない、ハイパーパラメータの最適化方法の導入のきっかけになりました。 また、非常に手軽に使えたというのもあり、今後もチューニングの場面でOptunaを活用してみたいと思います。

反省としては、探索するハイパーパラメータの設定が悪く、計算の効率化が著しく悪くなった恐れがあります。 validationの際に、fold数の全て計算するのではなく、スコアが下がらなそうなら、そのハイパーパラメータの計算をやめるとか、一定時間以上かかってしまったらまた、次に試行に移るとかできれば効率化できたように思えます。 フレームワークブラックボックスでもある程度は動かすことができますが、やはり中身をある程度理解しないと遠回りしてしまうというのは、当然の結果と言えます。 もっと使いこなせるよう精進しなければと思いました。

公式リファレンスでも、OptunaでLightGBMをチューニングする例が出ており、そちらの例も参考にしながらリベンジしたいと思います。

github.com

最後にですが、この記事が何かの役に経てば幸いです。

Kotlin はじめてのコルーチン

0. はじめに

18年10月にKotlinのコルーチンがexperimentalからstableになりました。 遅ればせながら、コルーチンを触ってみました。

この記事は、これからコルーチンを学習する人向けの記事です。

*Kotlin1.3、 kotlinx-coroutines1.0.1の環境です。 *Kotlinが初めての方は、こちらで気軽に試せるので触ってみてください。先頭にimport kotlinx.coroutines.*を忘れずに。

1. コルーチンとは

Wikipediaから引用します。

コルーチン(英: co-routine)とはプログラミングの構造の一種。サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる。

どういうことなのか。簡単なプログラムを例にして説明をします。

fun main() {
    /* ここからコルーチン */
    println("start foo")
    時間のかかる処理
    println("end foo")
    /* ここまでコルーチン */

    println("bar")
}

例えば"start foo"から"end foo"をコルーチンとして実行することで、時間のかかる処理のタイミングでmainスレッドがその処理を中断し、中断中は別の処理をすることができます。 ここでは、中断中は"bar"を表示させることにします。 よって、出力結果をこのようなります。

start foo
bar
end foo

2.初めてのコルーチン

それでは実際にコルーチンを作成して、スレッドが中断して再開するところをみてみます。 作成するプログラムは、1.コルーチンとはのプログラムに、コルーチンを適用します。説明通りの結果になるか確認します。

コルーチンを作成するにはコルーチンビルダーというものを使います。 コルーチンビルダーには様々ありますが、ここではもっともシンプルなlaunch関数を使います。 使い方は簡単です。launch関数にコルーチンとして実行するラムダを渡します。

fun main() {
    GlobalScope.launch {
        println("start foo")
        delay(1000)
        println("end foo")
    }
    println("bar")
}

これで"start foo"から"end foo"まではコルーチンとして実行されます。

なお、GlobalScopedelay関数はあとで説明します。 delay関数Thread.sleepメソッドのようなものだと現時点では思っておいてください。 「時間のかかる処理」をdelay関数で代替しています。引数として中断したい時間をミリ秒単位で指定できます。

結果はこのようになります。("start foo"が表示されないこともあります。)

bar
start foo

想定した出力結果になりませんでした。 まず、"bar"が先に表示されてしまいました。 これはlaunch関数がコルーチンの実行をスケジュール化だけして、処理を先に進めてしまうからです。 また、"end foo"が表示されませんでした。原因は、"end foo"から処理を再開をする前にmain関数からリターンして、プログラム自体が終了してしまうからです。

launch関数では、mainスレッドの実行を止めることできないので、何か工夫が必要です。 launch関数の代わりにrunBlocking関数というコルーチンビルダーを使うことにします。 runBlocking関数は、コルーチンが完了するまで呼び出し出し元のスレッドを停止させるコルーチンビルダーです。

fun main() {
    runBlocking {
        println("start foo")
        delay(1000)
        println("end foo")
    }
    println("bar")
}

当然ですが、これでも期待した出力結果にはなりません。

start foo
end foo
bar

なぜならrunBlocking関数をコールした時点で、コルーチンの処理が終わるまで呼び出し元のスレッドがブロックされるからです。(出力結果として想定したものではありませんが、delay関数のポイントで中断および再開はしています。)

それでは先のlaunch関数と組み合わせたらどうなるでしょうか。

fun main() {
    runBlocking {
        launch {
            println("start foo")
            delay(1000)
            println("end foo")
        }
        delay(500)
        println("bar")
    }
}

先述したようにlaunch関数はコルーチンの実行をスケジュール化して処理を先に進めてしまうので、"start foo"が表示される前に"bar"が表示されてしまいます。 これを防ぐために"bar"の直前にdelay(500)を置きます。 (前回と違い、launch関数を呼び出す際にGlobalScopeがない理由はあとで説明します。)

結果はこのようになりました。

start foo
bar
end foo

想定した出力結果になりました。

どのスレッドで各々が実行されているか調べてみましょう。 また、少しだけKotlinっぽく書いてみます。

fun main() = runBlocking {
    launch {
        println("$threadName:start foo")
        delay(1000)
        println("$threadName:end foo")
    }
    delay(500)
    println("$threadName:bar")
}

val threadName: String
    get() = Thread.currentThread().name
main:start foo
main:bar
main:end foo

中断する前の処理、中断中の処理、中断から再開した処理、全てmainスレッドで実行されていることが確認できました。

なお、このプログラムは2回中断が発生しています。

launchコルーチンのスケジュール化 → delay(500)で中断(1回目) → launchの実行開始 → delay(1000)で中断(2回目) → delay(500) から再開 → delay(1000)から再開

3.中断はいつ発生するのか

コルーチンの実行が中断され、そして再開される様子を見ることができましたが、中断とはどういう時に発生するのでしょうか。 ドキュメントにこのような記載があります。

Suspending functions can be used inside coroutines just like regular functions, but their additional feature is that they can, in turn, use other suspending functions, like delay in this example, to suspend execution of a coroutine. サスペンド関数は、コルーチンの中で通常の関数のように使えます。通常の関数との違いは、サスペンド関数はコルーチンの実行を中断するために、他のサスペンド関数を使うことです。(この例のdelayのように)

サスペンド関数という新しい用語が出てきました。サスペンド関数とはこのように関数の先頭にsusupend修飾子がついた関数のことです。

suspend fun hoge()

このドキュメントによるとサスペンド関数をコールすることで中断が発生するようです。 確かにdelay関数の定義にもこのようにsuspend修飾子がついています。

public suspend fun delay(timeMillis: Long)

それでは、delay関数のように中断を起こすサスペンド関数を作成してみましょう。 せっかくなので、中断から再開するときに値を返すサスペンド関数を作成してみます。 今回は4096bitで表現可能な素数を返すgetPrimeNumber関数を作成します。

getPrimeNumber関数の利用側はこのようにします。

fun main() = runBlocking {
    println("$threadName:start runBlocking")
    launch {
        println("$threadName:start launch")
        val prime = getPrimeNumber()
        println("$threadName:prime number = $prime")
        println("$threadName:end launch")
    }
    delay(500)
    println("$threadName:end runBlocking")
}

大体の流れは、 getPrimeNumber関数をコールしたらmainスレッドはコルーチンを中断 → その間に"end Blocking"を表示 → 素数が求め終わったら、素数を表示させるところから再開 です。

次に、サスペンド関数であるgetPrimeNumber関数はどう作成すればいいのでしょうか。 まずは、素数を求めるコードを書く必要がありますが、BigInterger.probablePrimeという素数を求めるのに便利なメソッドがあります。 このメソッドの詳しい使い方は割愛しますが、BigInterger.probablePrime(4096, Random())素数(正確には「おそらく素数」)を返してくれます。私の手元のマシンでは呼び出してから返ってくるまでに10秒程度かかりました。

次に実際に中断を起こすコードを書いていきます。

suspend fun getPrimeNumber() = BigInterger.probablePrime(4096, Random())

このように書ければシンプルですが、このようにしてもgetPrimeNumber関数で中断されず、mainスレッドが素数を求めるために停止してしまいます。

スレッドを中断させるにはsuspendCoroutine関数をコールする必要があります。 suspendCroutine関数はこのように定義されています。この関数もサスペンド関数です。

inline suspend fun  suspendCoroutine(
    crossinline block: (Continuation) -&gt; Unit
): T

ラムダが受け取るContinuationインターフェースにはこのような拡張関数が定義されています。

fun  Continuation.resume(value: T)

このresumeメソッドをコールすることで、コルーチンが再開します。

それでは中断はいつ発生するのでしょうか。 あえて、resumeメソッドをコールせず、このようにして実行してみてください。

suspend fun getPrimeNumber() {
    println("$threadName:hoge")
    suspendCoroutine {
        println("$threadName:fuga")
    }
    println("$threadName:piyo")
}

結果はこのようになります。

main:start runBlocking
main:start launch
main:hoge
main:fuga
main:end runBlocking

また、このプログラムは永遠に終了しません。なぜなら、コルーチンが再開しないためです。

この結果をみると、"fuga"の後に"end ranBlocking"が表示されているので、"fuga"を表示後、つまりsuspendCoroutine関数に渡したラムダの実行終了後に中断が発生していることがわかります。 これが中断が発生するタイミングです。

今度は、中断が発生後、約1秒経過してからresumeメソッドをコールして再開してみます。

suspend fun getPrimeNumber() {
    println("$threadName:hoge")
    suspendCoroutine { cont -&gt;
        println("$threadName:fuga")
        Thread {
            Thread.sleep(1000)
            cont.resume(1234)
        }.start()
    }
    println("$threadName:piyo")
}

中断から再開しました。

main:start runBlocking
main:start launch
main:hoge
main:fuga
main:end runBlocking
main:piyo
main:prime number = kotlin.Unit
main:end launch

また、getPrimeNumber関数素数を返さないのでkotlin.Unitと表示されてしまっています。

それではgetPrimeNumber関数素数を返すように変更します。resumeメソッドに渡した値がsuspendCoroutine関数の戻り値になるので、このように書けます。

suspend fun getPrimeNumber(): BigInteger = suspendCoroutine { cont -&gt;
    Thread {
        cont.resume(BigInteger.probablePrime(4096, Random()))
    }.start()
}

これで、先ほどの結果でkotlin.Unitとなっていた箇所に素数が表示されます。

目的である中断の発生タイミングについて、確認できました。

4.コルーチンビルダーについて少し詳しく

これまでで、launch関数runBlocking関数の2つのコルーチンビルダーを使いました。 この2つ以外にも様々なコルーチンビルダーが提供されています。 例えば、先ほど素数を求めるために作成したgetPrimeNumber関数ですが、withContext関数というコルーチンビルダーを使うとこのように書けます。

suspend fun getPrimeNumber() = withContext(Dispatchers.Default) {
        BigInteger.probablePrime(4096, Random())
    }

このコルーチンビルダーは値を返すことができます。 また、第一引数に値を指定することで、コルーチンを実行するスレッドを切り替えています。

実用的なコルーチンビルダーは他にもありますが、この記事ではそれらを紹介しません。 ここでは、この記事でまだ触れていない重要な2つの内容について説明します。

  1. コルーチンビルダーは、中断可能な世界へのエントリーポイントのようなもの
  2. コルーチンスコープが必要なコルーチンビルダー

中断可能な世界へのエントリーポイント

まずは1つ目です。 サスペンド関数としてgetPrimeNumber関数を作成し、コールすることでコルーチンが中断されることを見ましたが、このようなコードはコンパイルエラーになります。

fun main() {
    val primeNumber = getPrimeNumber()
}

理由は、サスペンド関数はサスペンド関数もしくはサスペンドラムダからしかコールできないというルールがあるからです。 通常のラムダとサスペンドラムダの違いは、関数と同様にsuspend修飾子の有無です。 例えば、launch関数の定義はこのようになっています。

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -&gt; Unit
): Job (source)

blockの型をみるとsuspend修飾子がついているのがわかります。これはサスペンドラムダを受け取ることを表しています。 このようにコルーチンビルダーはサスペンドラムダを受けることで中断可能な世界へのエントリーポイントを提供しています。

コルチーンスコープ

次に2つ目のコルチーンスコープについて。 launch関数の定義を見ていただくと、launch関数CoroutineScopeインターフェースの拡張関数として定義されているのがわかります。

fun CoroutineScope.launch(..)

よって、launch関数をコールするにはCoroutineScopeインスタンスが必要です。 最初の方のコードでGlobalScope.launchと書いていたのはそのためです。 GlobalScopeCoroutineScopeインターフェースを実装したインスタンスです。

object GlobalScope : CoroutineScope

launch関数をコールするにはCoroutineScopeインスタンスが必要ですが、 runBlocking関数に渡すラムダ内ではGlobalScope.launch{..}ではなく、シンプルにlaunch{..}と書けます。 この理由はrunBlocking関数の定義をみるとわかります。

fun  runBlocking(
    context: CoroutineContext = EmptyCoroutineContext, 
    block: suspend CoroutineScope.() -&gt; T
): T (source)

blockのレシーバの型はCoroutineScopeとなっています。 これが理由で、runBlocking関数に渡すラムダ内では、GlobalScope.launchと書く必要がなかったのです。

レシーバ付きラムダに馴染みがない方のために、少し補足します。 あえてthisを使って書くとこのようになります。

runBlocking {
    this.launch {..}
}

このthisは、runBlocking関数が作成したCoroutineScopeインスタンスを参照しています。

補足

コルーチンスコープが導入されたのはkotlinx-coroutines0.26.0からです。 0.26.0がマークされたのが18年9月です。0.26.0より古いバージョンを前提に書かれた記事ではGlobalSopeがないコードを見ることがあるかもしれません。

// 0.26.0より前
fun main() {
    launch {..}
}

// 0.26.0以降
fun main() {
    GlobalScope.launch {..}
}

5.終わりに

予定ではCoroutineScopeCoroutineContextJobについても書くつもりでしたが、記事が長くなってしまったので、全く触れられませんでした。 コルーチンを使った実用的なコードも同様です。

コルーチンを勉強をしている身ではありますが、何かの機会があれば、それらについても書いてみたいと思います。

この記事が、これからコルーチンを初める方に少しでも役に立てば幸いです。

Amazon Dash Button をHackして「社内ビール制度」を活性化したい

こんにちは、Enigmo 新卒エンジニアの@sean0628_iです。 Enigmo Advent Calendar 2018 8日目の記事です。

ちなみに、Enigmo は、海外通販サイト BUYMA や、世界中のトレンドをお届けするファッションメディア STYLE HAUS を運営する会社です。

Enigmo では、 「社内ビール制度」が存在し、定時の18:30以降 ビールが飲み放題 です。

毎晩定時後は宴会が繰り広げられて、、、ということはなく、日によっては金曜日ですらも人が疎ら、なんていうこともあります。。。

さてはEnigmo 社員はビールが嫌いなのか?とも思いましたが、実際に話を聞いてみると当然そんなことはありませんでした(安心

実際の声

みんなで楽しくビール飲みたいけど、みんな忙しそうだし。。。 声を掛けるのは気が引ける。。。 社員Tさん

さらに時々こんな声も、

呑んでたなら教えてよ。。。言ってくれれば参加したのに。。。 社員Yさん

ニーズはあるのに、多数障壁があるようです これは、「社内ビール制度」始まって以来の危機だと思い、新卒エンジニアは立ち上がりました。

そうだ、Amazon Dash Button を使ってこの危機を乗り越えよう。

内容

  • Amazon Dash Button(以下、Button)とは?
  • Buttonの仕組み
  • Hackの流れ
  • 気づき・学び
  • 今後の展望

Amazon Dash Button とは?

Amazonダッシュボタンは、注文商品を簡単かつ迅速に作成できるように設計された小型の電子デバイスである。 出典: Wikipedia - Amazon Dash

一般的な利用方法

  1. Amazon 公式アプリにWi-FI&Bluetooth 経由で連動させる
  2. アプリ上でButton に登録したい商品を選択
  3. Step2 で設定した商品がほしくなったらButton を押す
  4. 数日後、商品が手元に届く

Button の仕組み

  1. Buttonの押下により、電源オン
  2. アプリで設定しておいた、Wi-Fiに接続
  3. DHCPでIPを取得
  4. 商品注文リクエス

とここで文系新卒の私は DHCP とはなんぞやと思ったわけです。。。 で、調べました。

利用技術

DHCP とは?

  • コンピューターにIPアドレスを自動割り当てする仕組み
  • IPアドレスは、いわゆる住所のようなもの (IPは 8.8.8.8 こんな感じの数字のやつ)

と、DHCP はなんとなく理解したものの、次なる敵が現れました。

MAC addressARP です。。。

もう横文字嫌だー、と思いながらも開発に必要なため背に腹は代えられず調べました。

MAC address とは?

ARP とは?

  • IP からMAC を調べる仕組み

のことらしい。。。

image.png

出典: What is ARP and ARP spoofing?

Hack の流れ

  1. ARPを利用し、ButtonのMACアドレスを取得*
  2. ButtonのMACアドレスを監視
  3. ButtonからのARP Requestをキャッチ
  4. 任意の処理をする(今回はSlackにmessageを投下)

*: 初回のみ。実際Buttonが押されてからの流れは2~3

実際の実装

長くなるので割愛します。 ソースコード(Sean0628/dash_button): https://github.com/Sean0628/dash_button

テスト運用

↓こんな感じ :tada:

*: リゾート とは、Enigmo 社員の憩いの場所

気づき・学び

普段業務では主にRuby を使っていて、node を初めて使いました。 しかし、やってみると意外に簡単。メンタルブロックをいかに外すかが大切なのだと感じました。

業務では関わることのできないネットワークの分野も面白いなぁと思いました。 機会があれば深掘りしてみたいなぁと。

今後の展望

勉強も兼ねて不要なPCにCentOS を入れて、CentOS をサーバにしてButton を活用したいと思っています。

まだ、本格的なリリースまでは行けてないので、早く始動させてみんなで楽しくビールを飲みたいと思います。

最後まで読んでいただきありがとうございます。


参考: Dash Buttonの設定を管理する

参考: Enabling Interactions with Bots