Apache Airflow で実現するSQL ServerからBigQueryへのデータ同期

はじめに

この記事はEnigmo Advent Calendar 2018の11日目です。

Enigmoでは、データウェアハウス(DWH)としてBigQueryを使っていて、サービスのアクセスログやサイト内の行動ログ、データベースのデータをBigQueryへ集約させています。

データベースからBigQueryへのデータ同期にはApache Airflowを使っていて、今日はその仕組みについて紹介します。

Apache Airflowとは

Airflowは、pythonでワークフロー(DAG)を定義すると、そのとおりにタスク(オペレーター) をスケジューリングして起動してくれるツールです。GCPでもGKE上でAirflowを動かすCloud Composerというサービスが提供されていてご存知の方も多いと思います。

データの処理の単位をオペレータで定義し、その処理の依存関係を反映したワークフローをDAGで定義してやればデータ処理のパイプラインを実現することが可能となります。

DBからBigQueryへのデータパイプライン

データの流れ

データの流れとしては、上の図の通り大きく2フェーズに分かれていて、まずはDB(SQL Server)からGoogle Cloud Storage(GCS)へデータをアップロードしています。その次にGCSからBigQueryへそのデータをロードしています。

それぞれのフェーズをAirflowのタスクの単位であるオペレーターで実現していて、さらに2つのオペレーターはそれぞれ同期するテーブルごと別のタスクとして存在し、それらをDAGという1つのワークフローの単位でまとめています。

SQL ServerからGCSへ

JdbcToGoogleCloudStorageOperator

SQL ServerからGCSへのデータの移動は JdbcToGoogleCloudStorageOperator というAirflowのオペレーターが担当します。

DBがMySQLの場合はMySqlToGoogleCloudStorageOperatorというAirflowに組み込みのオペレーターがあるんですが、バイマのデータベースはSQL Serverなので、JDBCのクライアントで同様の働きをするオペレーターを自前で作ったものが JdbcToGoogleCloudStorageOperator です。Airflowのプラグインとして公開しています。

github.com

このオペレータでの処理は、まずDBからSQLでデータを抽出し、一度JSONL形式のファイルとしてのオペレーターが動くサーバーのローカルに保存され、それがGCSへアップロードされるという流れです。BigQueryへロードするときにスキーマ定義が必要なので、データファイルとは別にスキーマ定義のファイルもJSON形式でGCSへアップロードされます。

スケジューリングと更新差分抽出の仕組み

DAGのスケジューリング間隔は1時間に設定しています。するとAirflowは時間を1時間ごとに期間を分けてDAGにその期間の開始時刻(execution_date)、終了時刻(next_execution_date)をテンプレートのパラメーターとして渡してくれます。それらを データ抽出SQLのWHERE句のところでレコードの更新日時を記録するカラム(下の例ではupdated_at)を基準に期間指定すると、その期間に更新があったレコードだけが抽出され、BigQuery側へ送られる仕組みです。

SELECT 
  * 
FROM 
  table1 
WHERE 
  "{{execution_date.strftime('%Y-%m-%d %H:%M:%S')" <= updated_at 
  AND updated_at < "{{next_execution_date).strftime('%Y-%m-%d %H:%M:%S')}}"

もし間隔を変えてもDAGを編集することなくSQLがその期間に合わせて変わってくれるので便利です。

GCSからBigQueryへ

GoogleCloudStorageToBigQueryOperator

GCSからBigQueryへはその名の通りAirflow組み込みのGoogleCloudStorageToBigQueryOperatorというオペレーターがやってくれます。

BigQuery側のデータセットは同期元DBのデータベース単位、テーブルは同期元DBのテーブル単位に分けています。BigQuery側のテーブルはDB側のレコードの更新日ごとに日付分割しています。

BigQueryの更新はDMLは使わずに、ファイルを読み込みジョブで更新されます。そうするとDB側のレコードが更新されるとBigQuery側には重複してレコードが溜まっていくのですが、それは後述の重複除外ビューで解決しています。

BigQuery側でレコードの重複を除外

BigQuery側のテーブルでは、次のようなSQLでビューテーブルを作ることで、同期元のDBでレコードが何度も更新されても常に最新のレコードしか現れない仕組みになっています。

この例は、主キーがidで更新日時のカラムが updated_at の場合のSQLです。同一idに対して常に最新のupdated_at をもつレコードしかこのビューには出てきません。

SELECT *
FROM (
    SELECT *, ROW_NUMBER() OVER (
      PARTITION BY id
      ORDER BY updated_at DESC)  etl_row_num
    FROM
        `db1.table1_*`)
WHERE etl_row_num = 1

Airflowで便利だった機能

Airflowの機能でこの仕組みをつくるのに助けられた機能がいくつかあったので紹介します。書ききれてないですが、ほかにもたくさんあります。

Catchup

DAGのスケジュールを過去の期間にさかのぼって実行してくれる機能なんですが、非常にありがたかったです。 過去のデータの移行でも差分同期の仕組みがそのまま使えましたし、一度に同期せずに、期間を区切って少しずつデータを持っていけたので、同期元のDBにも負荷をかけずにすみました。

Connection、Variable

Connectionは接続先となるDBやGCPへの認証情報を一元管理してくれ、一度設定すればどのDAGからアクセスできて便利でした。次のPoolも同じなんですが、設定はGUIでもCLIでも設定できるので、ansibleなどのプロビジョニングツールでも設定できたのもありがたかったです。

Variableも単なるキーと値を設定できるだけなんですが、DAGを汚すことなくdevやproductionなどリリースステージごとに値を切り替えられて便利でした。

Pool

タスクの同時実行数を制限する機能です。Poolはユーザーが定義でき、そのPoolにオペレーターを紐付けるとそのオペレーターはそのPoolのslot数を超えて同時実行されません。データ抽出のタスクが1つのDBに対して多数同時実行されてしまうとそのDBのコネクションも同時に消費され、枯渇しかねませんが、このPoolで上限数を設定できたので安心でした。

まとめ

最初は手っ取り早くcronとスクリプトで作ってしまおうと思ったのですが、すこしなれるまで時間はかかったもののAirflowで作って良かったです。開発が進むにつれ、特にプロダクション環境で動かすにあたっていろいろ考慮すべきことが出てくると思うのですが、作りながらほしいと思った機能が先回りされているかのように用意されていてとても助かりました。全て使いきれてないですが、ワークフロー運用のノウハウがたくさん詰まった良いプロダクトだと思いました。

OptunaとLightGBMを使って、Kaggle過去コンペにsubmitする

この記事はEnigmo Advent Calendar 2018の10日目です。

はじめに

OptunaはPFN社が公開したハイパーパラメータ自動最適化フレームワークです。

https://research.preferred.jp/2018/12/optuna-release/

目的関数さえ決めれば、直感的に最適化を走らせることが可能のようです。

今回、最適化自体の説明は割愛させていただきますが、機械学習の入門ということを考えるとハイパーパラメータの調整としては、gridsearchやRandomizedSearchCVで行う機会が多いと思います。 スキル、あるいはリソースでなんとかするということになるかと思いますが、特に、kaggleのような0.X%の精度が向上が重要になるような状況では、ハイパーパラメータのチューニングが大きなハードルの一つになります。 そこで、titanicでのsubmitはあるものの、Kaggleの経験がほぼゼロな筆者でも、Optunaで簡単にチューニングができるかどうかを試してみようと思います。

今回の対象コンペ

既にcloseしているコンペの中で、下記のPorto Seguro’s Safe Driver Predictionを選びました。 https://www.kaggle.com/c/porto-seguro-safe-driver-prediction 選定理由は以下の通りです。

  • データがそれほど大きくない
  • 手元(自宅)のラップトップのRAMは8GBと大きくないので、XGboostではなくメモリ消費が抑えられるLightGBMでやってみたい
  • 解法がシンプルかつ、LightGBMで上位のスコアを解法を公開しているカーネルがすぐに見つかった

公開解法の再現

https://www.kaggle.com/xiaozhouwang/2nd-place-lightgbm-solution

上記をそのままコピペして一回submitします。 Python2対応のようなので、下記のようにPython3で動くように修正しました。

# part of 2nd place solution: lightgbm model with private score 0.29124 and public lb score 0.28555

import lightgbm as lgbm
from scipy import sparse as ssp
from sklearn.model_selection import StratifiedKFold
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

def Gini(y_true, y_pred):
    # check and get number of samples
    assert y_true.shape == y_pred.shape
    n_samples = y_true.shape[0]

    # sort rows on prediction column
    # (from largest to smallest)
    arr = np.array([y_true, y_pred]).transpose()
    true_order = arr[arr[:, 0].argsort()][::-1, 0]
    pred_order = arr[arr[:, 1].argsort()][::-1, 0]

    # get Lorenz curves
    L_true = np.cumsum(true_order) * 1. / np.sum(true_order)
    L_pred = np.cumsum(pred_order) * 1. / np.sum(pred_order)
    L_ones = np.linspace(1 / n_samples, 1, n_samples)

    # get Gini coefficients (area between curves)
    G_true = np.sum(L_ones - L_true)
    G_pred = np.sum(L_ones - L_pred)

    # normalize to true Gini coefficient
    return G_pred * 1. / G_true


cv_only = True
save_cv = True
full_train = False


def evalerror(preds, dtrain):
    labels = dtrain.get_label()
    return 'gini', Gini(labels, preds), True


path = "input/"

train = pd.read_csv(path+'train.csv')
train_label = train['target']
train_id = train['id']
test = pd.read_csv(path+'test.csv')
test_id = test['id']

NFOLDS = 5
kfold = StratifiedKFold(n_splits=NFOLDS, shuffle=True, random_state=218)

y = train['target'].values
drop_feature = [
    'id',
    'target'
]

X = train.drop(drop_feature,axis=1)
feature_names = X.columns.tolist()
cat_features = [c for c in feature_names if ('cat' in c and 'count' not in c)]
num_features = [c for c in feature_names if ('cat' not in c and 'calc' not in c)]

train['missing'] = (train==-1).sum(axis=1).astype(float)
test['missing'] = (test==-1).sum(axis=1).astype(float)
num_features.append('missing')

for c in cat_features:
    le = LabelEncoder()
    le.fit(train[c])
    train[c] = le.transform(train[c])
    test[c] = le.transform(test[c])

enc = OneHotEncoder(categories='auto')
enc.fit(train[cat_features])
X_cat = enc.transform(train[cat_features])
X_t_cat = enc.transform(test[cat_features])

ind_features = [c for c in feature_names if 'ind' in c]
count=0
for c in ind_features:
    if count==0:
        train['new_ind'] = train[c].astype(str)+'_'
        test['new_ind'] = test[c].astype(str)+'_'
        count+=1
    else:
        train['new_ind'] += train[c].astype(str)+'_'
        test['new_ind'] += test[c].astype(str)+'_'

cat_count_features = []
for c in cat_features+['new_ind']:
    d = pd.concat([train[c],test[c]]).value_counts().to_dict()
    train['%s_count'%c] = train[c].apply(lambda x:d.get(x,0))
    test['%s_count'%c] = test[c].apply(lambda x:d.get(x,0))
    cat_count_features.append('%s_count'%c)

train_list = [train[num_features+cat_count_features].values,X_cat,]
test_list = [test[num_features+cat_count_features].values,X_t_cat,]

X = ssp.hstack(train_list).tocsr()
X_test = ssp.hstack(test_list).tocsr()

learning_rate = 0.1
num_leaves = 15
min_data_in_leaf = 2000
feature_fraction = 0.6
num_boost_round = 10000
params = {"objective": "binary",
          "boosting_type": "gbdt",
          "learning_rate": learning_rate,
          "num_leaves": num_leaves,
           "max_bin": 256,
          "feature_fraction": feature_fraction,
          "verbosity": 0,
          "drop_rate": 0.1,
          "is_unbalance": False,
          "max_drop": 50,
          "min_child_samples": 10,
          "min_child_weight": 150,
          "min_split_gain": 0,
          "subsample": 0.9
          }

x_score = []
final_cv_train = np.zeros(len(train_label))
final_cv_pred = np.zeros(len(test_id))
for s in range(16):
    cv_train = np.zeros(len(train_label))
    cv_pred = np.zeros(len(test_id))

    params['seed'] = s

    if cv_only:
        kf = kfold.split(X, train_label)

        best_trees = []
        fold_scores = []

        for i, (train_fold, validate) in enumerate(kf):
            X_train, X_validate, label_train, label_validate = \
                X[train_fold, :], X[validate, :], train_label[train_fold], train_label[validate]
            dtrain = lgbm.Dataset(X_train, label_train)
            dvalid = lgbm.Dataset(X_validate, label_validate, reference=dtrain)
            bst = lgbm.train(params, dtrain, num_boost_round, valid_sets=dvalid, feval=evalerror, verbose_eval=100,
                            early_stopping_rounds=100, )
            best_trees.append(bst.best_iteration)
            cv_pred += bst.predict(X_test, num_iteration=bst.best_iteration)
            cv_train[validate] += bst.predict(X_validate)

            score = Gini(label_validate, cv_train[validate])
            print(score)
            fold_scores.append(score)

        cv_pred /= NFOLDS
        final_cv_train += cv_train
        final_cv_pred += cv_pred

        print("cv score:")
        print(Gini(train_label, cv_train))
        print("current score:", Gini(train_label, final_cv_train / (s + 1.)), s+1)
        print(fold_scores)
        print(best_trees, np.mean(best_trees))

        x_score.append(Gini(train_label, cv_train))

print(x_score)
pd.DataFrame({'id': test_id, 'target': final_cv_pred / 16.}).to_csv('model/lgbm3_pred_avg.csv', index=False)
pd.DataFrame({'id': train_id, 'target': final_cv_train / 16.}).to_csv('model/lgbm3_cv_avg.csv', index=False)

公開解法でのsubmit

Private Scoreで0.29097。5169チーム中46位のスコアとなり、シルバーメダル圏内に入りました。 コンペは終了しているので、もちろんスコアボードの本体は更新はされません。

なお、実際のコンペでは、カーネルの著書から他のNeral Networkでの予測値の平均と記載があるので、2位のsubmitの再現というわけにならないようです。

しかし、このようなシンプルな方法でシルバーメダルのスコアを取れるのは、個人的にもKaggleに積極してみたいという励みになったと感じています。

ハイパーパラメータのチューニング

さて、ハイパーパラメータのチューニングをフレームワークの力を借りて、ハードルをぐっと下げようという、本題に移ります。

他のKaggleのコンペや、Stack over flowで雑に調査し、パラメータの範囲を決めました。 そうしてできた修正したソースコードが、以下のようになります。

import lightgbm as lgbm
import optuna
from scipy import sparse as ssp
from sklearn.model_selection import StratifiedKFold
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

def Gini(y_true, y_pred):
    # check and get number of samples
    assert y_true.shape == y_pred.shape
    n_samples = y_true.shape[0]

    # sort rows on prediction column
    # (from largest to smallest)
    arr = np.array([y_true, y_pred]).transpose()
    true_order = arr[arr[:, 0].argsort()][::-1, 0]
    pred_order = arr[arr[:, 1].argsort()][::-1, 0]

    # get Lorenz curves
    L_true = np.cumsum(true_order) * 1. / np.sum(true_order)
    L_pred = np.cumsum(pred_order) * 1. / np.sum(pred_order)
    L_ones = np.linspace(1 / n_samples, 1, n_samples)

    # get Gini coefficients (area between curves)
    G_true = np.sum(L_ones - L_true)
    G_pred = np.sum(L_ones - L_pred)

    # normalize to true Gini coefficient
    return G_pred * 1. / G_true

cv_only = True
save_cv = True
full_train = False

def evalerror(preds, dtrain):
    labels = dtrain.get_label()
    return 'gini', Gini(labels, preds), True

path = "input/"

train = pd.read_csv(path+'train.csv')
#train = train.sample(frac=0.1, random_state=0).reset_index(drop=True)
train_label = train['target']
train_id = train['id']
test = pd.read_csv(path+'test.csv')
#test = test.sample(frac=0.1, random_state=0).reset_index(drop=True)
test_id = test['id']

NFOLDS = 4
kfold = StratifiedKFold(n_splits=NFOLDS, shuffle=True, random_state=218)

y = train['target'].values
drop_feature = [
    'id',
    'target'
]

X = train.drop(drop_feature,axis=1)
feature_names = X.columns.tolist()
cat_features = [c for c in feature_names if ('cat' in c and 'count' not in c)]
num_features = [c for c in feature_names if ('cat' not in c and 'calc' not in c)]

train['missing'] = (train==-1).sum(axis=1).astype(float)
test['missing'] = (test==-1).sum(axis=1).astype(float)
num_features.append('missing')
train.shape
for c in cat_features:
    le = LabelEncoder()
    le.fit(train[c])
    train[c] = le.transform(train[c])
    test[c] = le.transform(test[c])

# 事前にlabelEncoderを行っているから、この使い方でユニークな値で割り当てられる。引数categories = 'auto'で警告を消す
enc = OneHotEncoder(categories='auto')
enc.fit(train[cat_features])
X_cat = enc.transform(train[cat_features])
X_t_cat = enc.transform(test[cat_features])


ind_features = [c for c in feature_names if 'ind' in c]
count=0
for c in ind_features:
    if count == 0:
        train['new_ind'] = train[c].astype(str)+'_'
        test['new_ind'] = test[c].astype(str)+'_'
        count += 1
    else:
        train['new_ind'] += train[c].astype(str)+'_'
        test['new_ind'] += test[c].astype(str)+'_'

cat_count_features = []
for c in cat_features+['new_ind']:
    d = pd.concat([train[c],test[c]]).value_counts().to_dict()
    train['%s_count'%c] = train[c].apply(lambda x:d.get(x,0))
    test['%s_count'%c] = test[c].apply(lambda x:d.get(x,0))
    cat_count_features.append('%s_count'%c)

train_list = [train[num_features+cat_count_features].values, X_cat]
test_list = [test[num_features+cat_count_features].values, X_t_cat]

X = ssp.hstack(train_list).tocsr()
X_test = ssp.hstack(test_list).tocsr()


def objective(trial):
    drop_rate = trial.suggest_uniform('drop_rate', 0, 1.0)
    feature_fraction = trial.suggest_uniform('feature_fraction', 0, 1.0)
    learning_rate = trial.suggest_uniform('learning_rate', 0, 1.0)
    subsample = trial.suggest_uniform('subsample', 0.8, 1.0)
    num_leaves = trial.suggest_int('num_leaves', 5, 1000)
    verbosity = trial.suggest_int('verbosity', -1, 1)
    num_boost_round = trial.suggest_int('num_boost_round', 10, 100000)
    min_data_in_leaf = trial.suggest_int('min_data_in_leaf', 10, 100000)
    min_child_samples = trial.suggest_int('min_child_samples', 5, 500)
    min_child_weight = trial.suggest_int('min_child_weight', 5, 500)

    params = {"objective": "binary",
              "boosting_type": "gbdt",
              "learning_rate": learning_rate,
              "num_leaves": num_leaves,
              "max_bin": 256,
              "feature_fraction": feature_fraction,
              "verbosity": verbosity,
              "drop_rate": drop_rate,
              "is_unbalance": False,
              "max_drop": 50,
              "min_child_samples": min_child_samples,
              "min_child_weight": min_child_weight,
              "min_split_gain": 0,
              "min_data_in_leaf": min_data_in_leaf,
              "subsample": subsample
              }

    x_score = []
    final_cv_train = np.zeros(len(train_label))
    final_cv_pred = np.zeros(len(test_id))

    cv_train = np.zeros(len(train_label))
    cv_pred = np.zeros(len(test_id))

    params['seed'] = 0

    kf = kfold.split(X, train_label)

    best_trees = []
    fold_scores = []

    for i, (train_fold, validate) in enumerate(kf):
        print('kfold_index:', i)
        X_train, X_validate, label_train, label_validate = \
            X[train_fold, :], X[validate, :], train_label[train_fold], train_label[validate]
        dtrain = lgbm.Dataset(X_train, label_train)
        dvalid = lgbm.Dataset(X_validate, label_validate, reference=dtrain)
        bst = lgbm.train(params, dtrain, num_boost_round, valid_sets=dvalid, feval=evalerror, verbose_eval=100,
                        early_stopping_rounds=100)
        best_trees.append(bst.best_iteration)
        cv_pred += bst.predict(X_test, num_iteration=bst.best_iteration)
        cv_train[validate] += bst.predict(X_validate)

        score = Gini(label_validate, cv_train[validate])
        print(score)
        fold_scores.append(score)


    cv_pred /= NFOLDS
    final_cv_train += cv_train
    final_cv_pred += cv_pred

    print("cv score:")
    print(Gini(train_label, cv_train))
    print("current score:", Gini(train_label, final_cv_train / (s + 1.)), s+1)
    print(fold_scores)
    print(best_trees, np.mean(best_trees))

    x_score.append(Gini(train_label, cv_train))
    print(x_score)


    pd.DataFrame({'id': test_id, 'target': final_cv_pred / 16.}).to_csv('model/lgbm3_pred_avg_2.csv', index=False)
    pd.DataFrame({'id': train_id, 'target': final_cv_train / 16.}).to_csv('model/lgbm3_cv_avg_2.csv', index=False)

    return (1 - x_score[0])

study = optuna.create_study()
study.optimize(objective, n_trials=150)

パラメータの設定の範囲を抜粋すると以下のようになります。

drop_rate = trial.suggest_uniform('drop_rate', 0, 1.0)
feature_fraction = trial.suggest_uniform('feature_fraction', 0, 1.0)
learning_rate = trial.suggest_uniform('learning_rate', 0, 1.0)
subsample = trial.suggest_uniform('subsample', 0.8, 1.0)
num_leaves = trial.suggest_int('num_leaves', 5, 1000)
verbosity = trial.suggest_int('verbosity', -1, 1)
num_boost_round = trial.suggest_int('num_boost_round', 10, 100000)
min_data_in_leaf = trial.suggest_int('min_data_in_leaf', 10, 100000)
min_child_samples = trial.suggest_int('min_child_samples', 5, 500)
min_child_weight = trial.suggest_int('min_child_weight', 5, 500)

なお、Optuna自体の使用方法は、下記の記事と公式リファレンスを参考させていただきした。

https://qiita.com/ryota717/items/28e2167ea69bee7e250d https://optuna.readthedocs.io/en/stable/index.html

(18/12/11 19:41追記) コメントいただけた通り、'verbosity'は、警告レベルの表示を制御するパラメータであり、予測性能の最適化としては意味の無いパラメータでした。ですので、チューニングの対象にはすべきではありませんでした。

以下のように試行回数を定めていますが、

n_trials=150 

時間が足りなくなった関係で、その時点で計算されたパラメータで最適化を中断しております。 20時間ほど回し回しましたが、ハイパーパラメータによって検証の時間は1分から60分程度となり、 100回くらいの試行数だったようです。

そうしてできてパラメータが、以下のように、2位の解法と比較すると以下のようになります。

ハイパーパラメータ 今回のチューニング結果 2位の解法
drop_rate 0.3015600134599976 0.1
feature_fraction 0.46650703511665226 0.6
learning_rate 0.004772377676601769 0.1
subsample 0.8080720420805803 0.9
num_leaves 718 15
verbosity -1 0
num_boost_round 1942 10000
min_data_in_leaf 212 150
min_child_samples 68 10
min_child_weight 151 150

2位コンペとの解法とは、雰囲気が異なるセットとなり、公開解法の再現ということにはならないようです。 K_fold=4 でやっていることも異なる要因になると思います。

算出できたハイパーパラメータでsubmit

最初のpython3のスクリプトからパラメータを入れ替え、予測値を算出しました。 K_fold =4, また、ランダムシートの数を16から4に減らしております。

結果

スコアは下がってます。

1176位相当。。ハイパーパラメータ次第でシルバーメダル圏内ということを考えると、微妙な結果です。

所感

結果としては残念ですが、grid searchだけに頼らない、ハイパーパラメータの最適化方法の導入のきっかけになりました。 また、非常に手軽に使えたというのもあり、今後もチューニングの場面でOptunaを活用してみたいと思います。

反省としては、探索するハイパーパラメータの設定が悪く、計算の効率化が著しく悪くなった恐れがあります。 validationの際に、fold数の全て計算するのではなく、スコアが下がらなそうなら、そのハイパーパラメータの計算をやめるとか、一定時間以上かかってしまったらまた、次に試行に移るとかできれば効率化できたように思えます。 フレームワークブラックボックスでもある程度は動かすことができますが、やはり中身をある程度理解しないと遠回りしてしまうというのは、当然の結果と言えます。 もっと使いこなせるよう精進しなければと思いました。

公式リファレンスでも、OptunaでLightGBMをチューニングする例が出ており、そちらの例も参考にしながらリベンジしたいと思います。

github.com

最後にですが、この記事が何かの役に経てば幸いです。

Kotlin はじめてのコルーチン

0. はじめに

18年10月にKotlinのコルーチンがexperimentalからstableになりました。 遅ればせながら、コルーチンを触ってみました。

この記事は、これからコルーチンを学習する人向けの記事です。

*Kotlin1.3、 kotlinx-coroutines1.0.1の環境です。 *Kotlinが初めての方は、こちらで気軽に試せるので触ってみてください。先頭にimport kotlinx.coroutines.*を忘れずに。

1. コルーチンとは

Wikipediaから引用します。

コルーチン(英: co-routine)とはプログラミングの構造の一種。サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる。

どういうことなのか。簡単なプログラムを例にして説明をします。

fun main() {
    /* ここからコルーチン */
    println("start foo")
    時間のかかる処理
    println("end foo")
    /* ここまでコルーチン */

    println("bar")
}

例えば"start foo"から"end foo"をコルーチンとして実行することで、時間のかかる処理のタイミングでmainスレッドがその処理を中断し、中断中は別の処理をすることができます。 ここでは、中断中は"bar"を表示させることにします。 よって、出力結果をこのようなります。

start foo
bar
end foo

2.初めてのコルーチン

それでは実際にコルーチンを作成して、スレッドが中断して再開するところをみてみます。 作成するプログラムは、1.コルーチンとはのプログラムに、コルーチンを適用します。説明通りの結果になるか確認します。

コルーチンを作成するにはコルーチンビルダーというものを使います。 コルーチンビルダーには様々ありますが、ここではもっともシンプルなlaunch関数を使います。 使い方は簡単です。launch関数にコルーチンとして実行するラムダを渡します。

fun main() {
    GlobalScope.launch {
        println("start foo")
        delay(1000)
        println("end foo")
    }
    println("bar")
}

これで"start foo"から"end foo"まではコルーチンとして実行されます。

なお、GlobalScopedelay関数はあとで説明します。 delay関数Thread.sleepメソッドのようなものだと現時点では思っておいてください。 「時間のかかる処理」をdelay関数で代替しています。引数として中断したい時間をミリ秒単位で指定できます。

結果はこのようになります。("start foo"が表示されないこともあります。)

bar
start foo

想定した出力結果になりませんでした。 まず、"bar"が先に表示されてしまいました。 これはlaunch関数がコルーチンの実行をスケジュール化だけして、処理を先に進めてしまうからです。 また、"end foo"が表示されませんでした。原因は、"end foo"から処理を再開をする前にmain関数からリターンして、プログラム自体が終了してしまうからです。

launch関数では、mainスレッドの実行を止めることできないので、何か工夫が必要です。 launch関数の代わりにrunBlocking関数というコルーチンビルダーを使うことにします。 runBlocking関数は、コルーチンが完了するまで呼び出し出し元のスレッドを停止させるコルーチンビルダーです。

fun main() {
    runBlocking {
        println("start foo")
        delay(1000)
        println("end foo")
    }
    println("bar")
}

当然ですが、これでも期待した出力結果にはなりません。

start foo
end foo
bar

なぜならrunBlocking関数をコールした時点で、コルーチンの処理が終わるまで呼び出し元のスレッドがブロックされるからです。(出力結果として想定したものではありませんが、delay関数のポイントで中断および再開はしています。)

それでは先のlaunch関数と組み合わせたらどうなるでしょうか。

fun main() {
    runBlocking {
        launch {
            println("start foo")
            delay(1000)
            println("end foo")
        }
        delay(500)
        println("bar")
    }
}

先述したようにlaunch関数はコルーチンの実行をスケジュール化して処理を先に進めてしまうので、"start foo"が表示される前に"bar"が表示されてしまいます。 これを防ぐために"bar"の直前にdelay(500)を置きます。 (前回と違い、launch関数を呼び出す際にGlobalScopeがない理由はあとで説明します。)

結果はこのようになりました。

start foo
bar
end foo

想定した出力結果になりました。

どのスレッドで各々が実行されているか調べてみましょう。 また、少しだけKotlinっぽく書いてみます。

fun main() = runBlocking {
    launch {
        println("$threadName:start foo")
        delay(1000)
        println("$threadName:end foo")
    }
    delay(500)
    println("$threadName:bar")
}

val threadName: String
    get() = Thread.currentThread().name
main:start foo
main:bar
main:end foo

中断する前の処理、中断中の処理、中断から再開した処理、全てmainスレッドで実行されていることが確認できました。

なお、このプログラムは2回中断が発生しています。

launchコルーチンのスケジュール化 → delay(500)で中断(1回目) → launchの実行開始 → delay(1000)で中断(2回目) → delay(500) から再開 → delay(1000)から再開

3.中断はいつ発生するのか

コルーチンの実行が中断され、そして再開される様子を見ることができましたが、中断とはどういう時に発生するのでしょうか。 ドキュメントにこのような記載があります。

Suspending functions can be used inside coroutines just like regular functions, but their additional feature is that they can, in turn, use other suspending functions, like delay in this example, to suspend execution of a coroutine. サスペンド関数は、コルーチンの中で通常の関数のように使えます。通常の関数との違いは、サスペンド関数はコルーチンの実行を中断するために、他のサスペンド関数を使うことです。(この例のdelayのように)

サスペンド関数という新しい用語が出てきました。サスペンド関数とはこのように関数の先頭にsusupend修飾子がついた関数のことです。

suspend fun hoge()

このドキュメントによるとサスペンド関数をコールすることで中断が発生するようです。 確かにdelay関数の定義にもこのようにsuspend修飾子がついています。

public suspend fun delay(timeMillis: Long)

それでは、delay関数のように中断を起こすサスペンド関数を作成してみましょう。 せっかくなので、中断から再開するときに値を返すサスペンド関数を作成してみます。 今回は4096bitで表現可能な素数を返すgetPrimeNumber関数を作成します。

getPrimeNumber関数の利用側はこのようにします。

fun main() = runBlocking {
    println("$threadName:start runBlocking")
    launch {
        println("$threadName:start launch")
        val prime = getPrimeNumber()
        println("$threadName:prime number = $prime")
        println("$threadName:end launch")
    }
    delay(500)
    println("$threadName:end runBlocking")
}

大体の流れは、 getPrimeNumber関数をコールしたらmainスレッドはコルーチンを中断 → その間に"end Blocking"を表示 → 素数が求め終わったら、素数を表示させるところから再開 です。

次に、サスペンド関数であるgetPrimeNumber関数はどう作成すればいいのでしょうか。 まずは、素数を求めるコードを書く必要がありますが、BigInterger.probablePrimeという素数を求めるのに便利なメソッドがあります。 このメソッドの詳しい使い方は割愛しますが、BigInterger.probablePrime(4096, Random())素数(正確には「おそらく素数」)を返してくれます。私の手元のマシンでは呼び出してから返ってくるまでに10秒程度かかりました。

次に実際に中断を起こすコードを書いていきます。

suspend fun getPrimeNumber() = BigInterger.probablePrime(4096, Random())

このように書ければシンプルですが、このようにしてもgetPrimeNumber関数で中断されず、mainスレッドが素数を求めるために停止してしまいます。

スレッドを中断させるにはsuspendCoroutine関数をコールする必要があります。 suspendCroutine関数はこのように定義されています。この関数もサスペンド関数です。

inline suspend fun  suspendCoroutine(
    crossinline block: (Continuation) -> Unit
): T

ラムダが受け取るContinuationインターフェースにはこのような拡張関数が定義されています。

fun  Continuation.resume(value: T)

このresumeメソッドをコールすることで、コルーチンが再開します。

それでは中断はいつ発生するのでしょうか。 あえて、resumeメソッドをコールせず、このようにして実行してみてください。

suspend fun getPrimeNumber() {
    println("$threadName:hoge")
    suspendCoroutine {
        println("$threadName:fuga")
    }
    println("$threadName:piyo")
}

結果はこのようになります。

main:start runBlocking
main:start launch
main:hoge
main:fuga
main:end runBlocking

また、このプログラムは永遠に終了しません。なぜなら、コルーチンが再開しないためです。

この結果をみると、"fuga"の後に"end ranBlocking"が表示されているので、"fuga"を表示後、つまりsuspendCoroutine関数に渡したラムダの実行終了後に中断が発生していることがわかります。 これが中断が発生するタイミングです。

今度は、中断が発生後、約1秒経過してからresumeメソッドをコールして再開してみます。

suspend fun getPrimeNumber() {
    println("$threadName:hoge")
    suspendCoroutine { cont ->
        println("$threadName:fuga")
        Thread {
            Thread.sleep(1000)
            cont.resume(1234)
        }.start()
    }
    println("$threadName:piyo")
}

中断から再開しました。

main:start runBlocking
main:start launch
main:hoge
main:fuga
main:end runBlocking
main:piyo
main:prime number = kotlin.Unit
main:end launch

また、getPrimeNumber関数素数を返さないのでkotlin.Unitと表示されてしまっています。

それではgetPrimeNumber関数素数を返すように変更します。resumeメソッドに渡した値がsuspendCoroutine関数の戻り値になるので、このように書けます。

suspend fun getPrimeNumber(): BigInteger = suspendCoroutine { cont ->
    Thread {
        cont.resume(BigInteger.probablePrime(4096, Random()))
    }.start()
}

これで、先ほどの結果でkotlin.Unitとなっていた箇所に素数が表示されます。

目的である中断の発生タイミングについて、確認できました。

4.コルーチンビルダーについて少し詳しく

これまでで、launch関数runBlocking関数の2つのコルーチンビルダーを使いました。 この2つ以外にも様々なコルーチンビルダーが提供されています。 例えば、先ほど素数を求めるために作成したgetPrimeNumber関数ですが、withContext関数というコルーチンビルダーを使うとこのように書けます。

suspend fun getPrimeNumber() = withContext(Dispatchers.Default) {
        BigInteger.probablePrime(4096, Random())
    }

このコルーチンビルダーは値を返すことができます。 また、第一引数に値を指定することで、コルーチンを実行するスレッドを切り替えています。

実用的なコルーチンビルダーは他にもありますが、この記事ではそれらを紹介しません。 ここでは、この記事でまだ触れていない重要な2つの内容について説明します。

  1. コルーチンビルダーは、中断可能な世界へのエントリーポイントのようなもの
  2. コルーチンスコープが必要なコルーチンビルダー

中断可能な世界へのエントリーポイント

まずは1つ目です。 サスペンド関数としてgetPrimeNumber関数を作成し、コールすることでコルーチンが中断されることを見ましたが、このようなコードはコンパイルエラーになります。

fun main() {
    val primeNumber = getPrimeNumber()
}

理由は、サスペンド関数はサスペンド関数もしくはサスペンドラムダからしかコールできないというルールがあるからです。 通常のラムダとサスペンドラムダの違いは、関数と同様にsuspend修飾子の有無です。 例えば、launch関数の定義はこのようになっています。

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> Unit
): Job (source)

blockの型をみるとsuspend修飾子がついているのがわかります。これはサスペンドラムダを受け取ることを表しています。 このようにコルーチンビルダーはサスペンドラムダを受けることで中断可能な世界へのエントリーポイントを提供しています。

コルチーンスコープ

次に2つ目のコルチーンスコープについて。 launch関数の定義を見ていただくと、launch関数CoroutineScopeインターフェースの拡張関数として定義されているのがわかります。

fun CoroutineScope.launch(..)

よって、launch関数をコールするにはCoroutineScopeインスタンスが必要です。 最初の方のコードでGlobalScope.launchと書いていたのはそのためです。 GlobalScopeCoroutineScopeインターフェースを実装したインスタンスです。

object GlobalScope : CoroutineScope

launch関数をコールするにはCoroutineScopeインスタンスが必要ですが、 runBlocking関数に渡すラムダ内ではGlobalScope.launch{..}ではなく、シンプルにlaunch{..}と書けます。 この理由はrunBlocking関数の定義をみるとわかります。

fun  runBlocking(
    context: CoroutineContext = EmptyCoroutineContext, 
    block: suspend CoroutineScope.() -> T
): T (source)

blockのレシーバの型はCoroutineScopeとなっています。 これが理由で、runBlocking関数に渡すラムダ内では、GlobalScope.launchと書く必要がなかったのです。

レシーバ付きラムダに馴染みがない方のために、少し補足します。 あえてthisを使って書くとこのようになります。

runBlocking {
    this.launch {..}
}

このthisは、runBlocking関数が作成したCoroutineScopeインスタンスを参照しています。

補足

コルーチンスコープが導入されたのはkotlinx-coroutines0.26.0からです。 0.26.0がマークされたのが18年9月です。0.26.0より古いバージョンを前提に書かれた記事ではGlobalSopeがないコードを見ることがあるかもしれません。

// 0.26.0より前
fun main() {
    launch {..}
}

// 0.26.0以降
fun main() {
    GlobalScope.launch {..}
}

5.終わりに

予定ではCoroutineScopeCoroutineContextJobについても書くつもりでしたが、記事が長くなってしまったので、全く触れられませんでした。 コルーチンを使った実用的なコードも同様です。

コルーチンを勉強をしている身ではありますが、何かの機会があれば、それらについても書いてみたいと思います。

この記事が、これからコルーチンを初める方に少しでも役に立てば幸いです。

Amazon Dash Button をHackして「社内ビール制度」を活性化したい

こんにちは、Enigmo 新卒エンジニアの@sean0628_iです。 Enigmo Advent Calendar 2018 8日目の記事です。

ちなみに、Enigmo は、海外通販サイト BUYMA や、世界中のトレンドをお届けするファッションメディア STYLE HAUS を運営する会社です。

Enigmo では、 「社内ビール制度」が存在し、定時の18:30以降 ビールが飲み放題 です。

毎晩定時後は宴会が繰り広げられて、、、ということはなく、日によっては金曜日ですらも人が疎ら、なんていうこともあります。。。

さてはEnigmo 社員はビールが嫌いなのか?とも思いましたが、実際に話を聞いてみると当然そんなことはありませんでした(安心

実際の声

みんなで楽しくビール飲みたいけど、みんな忙しそうだし。。。 声を掛けるのは気が引ける。。。 社員Tさん

さらに時々こんな声も、

呑んでたなら教えてよ。。。言ってくれれば参加したのに。。。 社員Yさん

ニーズはあるのに、多数障壁があるようです これは、「社内ビール制度」始まって以来の危機だと思い、新卒エンジニアは立ち上がりました。

そうだ、Amazon Dash Button を使ってこの危機を乗り越えよう。

内容

  • Amazon Dash Button(以下、Button)とは?
  • Buttonの仕組み
  • Hackの流れ
  • 気づき・学び
  • 今後の展望

Amazon Dash Button とは?

Amazonダッシュボタンは、注文商品を簡単かつ迅速に作成できるように設計された小型の電子デバイスである。 出典: Wikipedia - Amazon Dash

一般的な利用方法

  1. Amazon 公式アプリにWi-FI&Bluetooth 経由で連動させる
  2. アプリ上でButton に登録したい商品を選択
  3. Step2 で設定した商品がほしくなったらButton を押す
  4. 数日後、商品が手元に届く

Button の仕組み

  1. Buttonの押下により、電源オン
  2. アプリで設定しておいた、Wi-Fiに接続
  3. DHCPでIPを取得
  4. 商品注文リクエス

とここで文系新卒の私は DHCP とはなんぞやと思ったわけです。。。 で、調べました。

利用技術

DHCP とは?

  • コンピューターにIPアドレスを自動割り当てする仕組み
  • IPアドレスは、いわゆる住所のようなもの (IPは 8.8.8.8 こんな感じの数字のやつ)

と、DHCP はなんとなく理解したものの、次なる敵が現れました。

MAC addressARP です。。。

もう横文字嫌だー、と思いながらも開発に必要なため背に腹は代えられず調べました。

MAC address とは?

ARP とは?

  • IP からMAC を調べる仕組み

のことらしい。。。

image.png

出典: What is ARP and ARP spoofing?

Hack の流れ

  1. ARPを利用し、ButtonのMACアドレスを取得*
  2. ButtonのMACアドレスを監視
  3. ButtonからのARP Requestをキャッチ
  4. 任意の処理をする(今回はSlackにmessageを投下)

*: 初回のみ。実際Buttonが押されてからの流れは2~3

実際の実装

長くなるので割愛します。 ソースコード(Sean0628/dash_button): https://github.com/Sean0628/dash_button

テスト運用

↓こんな感じ :tada:

*: リゾート とは、Enigmo 社員の憩いの場所

気づき・学び

普段業務では主にRuby を使っていて、node を初めて使いました。 しかし、やってみると意外に簡単。メンタルブロックをいかに外すかが大切なのだと感じました。

業務では関わることのできないネットワークの分野も面白いなぁと思いました。 機会があれば深掘りしてみたいなぁと。

今後の展望

勉強も兼ねて不要なPCにCentOS を入れて、CentOS をサーバにしてButton を活用したいと思っています。

まだ、本格的なリリースまでは行けてないので、早く始動させてみんなで楽しくビールを飲みたいと思います。

最後まで読んでいただきありがとうございます。


参考: Dash Buttonの設定を管理する

参考: Enabling Interactions with Bots

Ruby の関数型プログラミングの特徴

Enigmo Advent Calendar 2018 の7日目の記事です。

概要

Enigmo の Steven です。 プログラミング言語に対して興味を持ってますので、今日は Ruby について話したいと思います。

Rubyオブジェクト指向だと言ったら、反対する人は多分いないと思いますが、関数型言語の特徴も持ってると言ったら、ピンとこない人はそれほど少なくはないかと思います。 それでも、Ruby プログラマーでしたら、関数型言語から受け継がれたそういう機能はおそらく毎日使っています。 そういう機能がなかったら、Ruby は世界中で使われてる現在の言語にならなかったかもしれません。

この記事ではその機能を説明して、Ruby の理解と関数型言語に関する知識が少し深まる機会になれればと思います。

Ruby の特徴について

Wikipedia によると、Ruby は 10個以上のプログラミング言語から影響を受けて作られました。 その中には PythonC++Smalltalk などのオブジェクト指向プログラミング言語は複数ありますが、その中には Lisp もあります。

Lisp というのは現在、一つの言語というより、言語のファミリーですが、そのファミリーの言語は特徴的な文法を用いてることで有名でしょう。 ただし、Ruby では括弧は Lisp と比べて極めて少ないですし、文法も全然違うので、最初は関係を疑うかと思いますが、言語の根本的なところで Lisp の一部の特徴を確認できます。

Lisp から引き継がれたもの

条件式の場合、Ruby の世界では nil と false は偽として解釈されます。それ以外の値はすべて真になります。 PHP などの言語と比べて、極めて簡単でわかりやすいルールですが、その特徴は Lisp からそのまま引き継がれました。 それに関しては Clojurejvm 上の Lisp)は Ruby と全く一緒です。Common Lisp では false という値はないですが、それ以外は一緒です。

もう一つの特徴としては、Lisp と同じく Ruby には文はなく、すべては式です。 C などの言語では if文、関数の定義などは文であって、値として扱えないのですが、Ruby ではどんなものも式であって、値として扱えます。 if then else endは該当のブランチの値を返しますし、多くの場合はそんなに役に立たないと思いますが、def foo; endはメソッド名をシンボルとして返します。

以上は Lisp 由来の特徴の一部ですが、引き継がれたものの中で一番影響が大きかったのは、関数型プログラミングだと言えます。

Ruby の関数の扱い

どんな段階で特定の言語が関数型言語になるかというのは定かではなくて、判断しにくい時がありますが、Ruby ではやはりオブジェクト指向の面が一番強いので、関数型言語だと言えません。 ただし、Ruby ではある程度関数型プログラミングが可能だと主張しても、誤りではないでしょう。

Ruby関数型プログラミングができるのは Ruby の関数の扱いのおかげです。 厳密に言うと、Ruby には関数はなく、すべてはメソッドですが、説明がより簡潔になるよう、以下の説明では両方が同じだと一旦みなしてください。

第一級関数(first-class functions)

第一級関数を提供する言語では関数を単なる値として扱えます。他の関数に引数として渡すこともできれば、関数から返すのはもちろん、変数に保存することもできます。 Ruby では既存のメソッドを値として扱うにはまずメソッドをMethodProcオブジェクトとして抽出する必要がありますが、それができれば、そのオブジェクトを他のオブジェクトと何の違いもなく自由に扱えます。

def add(a, b)
  a + b
end

def apply(fn, *args)
  fn.call(*args)
end

add = method(:add)

apply(add, 22, 44)
# => 66

無名関数(anonymous functions)

無名関数はその名前の通り、名前のない関数を表しています。それだけです。 名前を与える必要がなくなると、整数と文字列と同じく、関数はただのリテラルになります。 Ruby ではproclambdaで無名な関数オブジェクトをもちろん生成できますが、ブロックも無名関数の一種だと言えます。

my_proc = proc { puts 1 }
my_lambda = lambda { puts 2 }

def with_my_block
  yield 3
end

with_my_block { |x| puts x }

クロージャ(closures)

関数を値として使える言語では、クロージャは、関数の内側から、関数定義時に関数の外側にしか存在しなかった変数名(グローバル以外にも)を参照することを可能にします。 Ruby でもクロージャのサポートを確認できます。

def foo(n)
  o = n * 2
  lambda { |p| o + p } # o は lambda の引数から受けられず、foo のローカル変数を指しています
end

bar = foo(10)
bar.call(5)
# => 25

無名関数とクロージャは時々一緒くたにされることがありますが、違うものです。 無名関数がないが、クロージャがある言語と、その逆の言語を想像できます。

高階関数(higher-order functions)

高階関数は以上のものと違って、機能ではなく関数の一種類の名前です。 高階関数というのは、他の関数を引数として受けるか、関数を返り値として返すか、それともその両方を行う関数を表しています。 どちらかが欠けてると、多少不便になるので、以上で紹介した機能をベースに、高階関数を可能にするプログラミング言語は多いです。 Ruby では高階関数は多いですが、一番始めに頭に浮かぶのはEnumerableのメソッドです。

multiplier = 5
# map という関数はブロックを関数として受け取ってます
[1, 2, 3].map { |n| n * multiplier }
# => [5, 10, 15]

したがって、Ruby でプログラミングをしているのであれば、おそらく毎日、ある程度の関数型プログラミングをしているということになります。

関数型プログラミングのいいところ

Enumerableのメソッドを好む人は Ruby プログラマーのほぼ全員だと思いますが、そのようなプログラミングがなぜいいのかをもっと具体的に説明しましょう。

  • 関数を組み合わせることで量の少ないコードでもかなりな処理を行えます
  • 高階関数になると、再利用できる関数は多く、アルゴリズムをそれぞれの関数の組み合わせとして実装できます
  • 命令型プログラミングとオブジェクト指向プログラミングと比べて、ステートを扱うことが少ないので、ステートによるバグがより少ない

特徴は他にありますが、まとめて言いますと、関数型言語では数学により近い形でプログラムを実装すると言えます。

関数型プログラミング向けの gem

Ruby で以上のように関数型プログラミングができますが、Ruby はあくまでもオブジェクト指向の言語なので、それなりの限界があります。 高階関数をライブラリーに追加することは簡単ですが、HaskellOCaml のような関数型言語に近づかせるにはかなりの努力が要ります。 それも度を越えると、RubyRuby じゃなくなって、デメリットの方が大きくなることがあるかと思います。

それでも場合によってはほかの関数型言語にある機能を Ruby に持ってくるメリットがありますので、以下ではいくつかの機能とそれを提供する gem を紹介します。

永続データ構造

永続データ構造では既存の値を変更することは不可能で、もとの値から新しい値を生成することしかできません。 新しい値の生成時に既存のデータ構造と一部の構造が共有されるので、HashArrayなどをまるごとコピーするより早いです。

一見では追加の制限しかかけられてないと見えますが、新しい値をしか生成できないというのは逆に保障でもあって、変更してはならないデータ構造を気づかずに変更してしまうというバグが完全になくなります。 並行計算ではスレッドの間でデータ交換を行う時に伴う心配もなくなるほか、たまに見かける#dup#freezeのメソッド呼び出しも不要になります。

関数型言語に限る機能ではなく、それぞれのオブジェクト指向言語にも普及しつつある機能であって、Ruby でもこれからStringが完全にイミュータブルになることから見て、Ruby では将来的にそのデータ構造が公式的に導入されるかもしれません。

パターンマッチング

Ruby では値を比べるには==はもちろん、case when endで使われる===もあります。 ただし、そのメソッドで値が同じであるかどうかを簡単に判断できても、データの一部だけ(いわゆるデータのパターン)を簡単に比べるのはもっと難しいです。 その問題を解決するため、データの比較をより汎用的に行えるように、パターンマッチングという機能が存在します。

パターンマッチングでは、比較したいパターンを定義することで、データがパターンにマッチした場合、もしくはマッチしなかった場合の処理を指定できます。 パターンがいくら複雑でも定義可能なので、複雑なデータ構造の比較時に特に役に立ちます。 例のパターンとして以下のはどうでしょうか?

  • 値が配列で、最初の要素がハッシュである必要があるが、ほかの要素は気にしません
  • ハッシュに:foo:barのキーが必須ですが、追加で他のキーがあって問題にしません
  • :fooに対する値は 10 の整数である必要があります

Ruby でパターンマッチングを可能にする gem の中には以下のがあります。

モナド

Arrayに対して使える#map#flat_mapメソッドは人気でしょう。 ただし、Arrayでの使い方が言語を問わず一番普及しているとはいえ、#map#flat_mapは配列限定のメソッドではありません。

#mapはコレクションの各要素に対して任意な関数を適用して、その結果をもとの要素に置き換える処理を行います。 Arrayの場合、ここでいうコレクションは配列ですが、#mapを提供できるコレクションは他にもいろいろあります。 配列は 0 以上の要素を含むとして、Maybeは 0 か一つの要素を含むコレクションです。 フューチャーでしたら、将来的に一つの要素を含むことになりますが、現在はまだないかもしれません。 全然違うものに見えるかもしれませんが、以上のものはすべて要素を含むコレクションとして一緒です。 そのコレクションでも#mapは含まれてる要素に対して任意の関数を適用する処理を行うことになります。

#flat_mapはコレクションの各要素に、その要素を引数にしながら、同じ種類のコレクションを返す関数を、適用する処理を行います。 関数から返された各コレクションを、コレクションの種類によって、連結したり、そのまま返したりします。 #flat_mapと一緒に使われる時、最終的の結果を変更しない関数も使用可能な場合は(以下は#pureと呼ぶ)、#map#pure#flat_mapだけを使って実装可能です(配列の場合はpure = ->(x) { [x] })。 #pure#flat_mapを提供するデータタイプ(コレクション)はモナドと呼ばれています。

#map#flat_mapのいいところの中には以下のがあります。

  • 高階関数なので、再利用性は高いですが、その中でも#map#flat_mapはかなり根本的な関数なので、それをベースに、何のコレクションにも対応する関数を、実装できます。
  • #map#flat_mapは通常の実装で全域写像(total function)なので、渡された関数も全域写像であれば、どんなコレクションに対して適用されても、エラーになりません(Maybeのコレクションで#map#flat_mapを使えば、nil のチェックは完全に不要になります)。

モナドを本格的に使うには協力な型を提供する静的型付けな言語が大体必要になりますが(Haskell など)、簡単なユースケースなら、Ruby でも以下のライブラリーなどでモナドを使えます。

言語とライブラリーによって#map#flat_mapは違う名前になったりします。他によく使われる名前としては fmap(map)と bind(flat_map)はあります。

終わりに

プログラミング言語の世界では、最近の一つの流れとして、今まで関数型言語にしかなかった機能が少しずつ他の言語でも採用されるようになってます。 Ruby ではかなり以前から高階関数を使えますが、もっと最近な例として Java 8 があります。新しい言語なら、Swift と Rust もあります。 当分はその流れはおそらく止まらないので、これからも関数型プログラミングの機能がどんどん普及して、もしかすると Ruby の方にも新しい機能が現れるかもしれません。 そうなれば、今以上にも Ruby関数型プログラミングのいいところを活かせるのでしょう。

Redashがバージョンアップ(v4)して便利になったこと

エニグモ データ活用推進室 @kmt47 です。 この記事はEnigmo Advent Calendar 2018の6日目です。

概要

redashがv4にバージョンしました。 redash v4の新機能を紹介します。 上手に新機能を使って、redash作成を効率化していきましょう!

クエリ(レポート(表、グラフ))

数値の表示形式(フォーマット)をredashで設定できるようになった(便利度:★★★★★(MAX))

これまでは、SQLで行っていた(面倒な)表示形式の変換を、redashの設定で可能となりました。

これまでは、文字列変換(CONVERT)や、文字置換(REPLACE)を駆使して金額の12,000.00の「.00」の除去や、 YoYなどにパーセンテージ(%)の付加を行って表示形式を整形していましたが、v4ではredashの設定で可能です。

手順

・「Table」画面の左下にある「Edit Visualization」をクリック   ※ 自分が編集できる(自分が作ったor編集権限のある)クエリの場合に「Edit Visualization」が表示されます。  ⇒ 「Visualization Editor」が表示されます

・表示形式を変更したい項目の「Number format」を変更(数値項目のデフォルト値は「0,0.00」)

「0.0」に変更することで、小数点以下が除去され「12,000」のように表示されます。 ※ 表示形式を変えると、右側の表示の形式も瞬時に変更されますので、確認ができます。

YoYのようなパーセントの項目の場合は、「0.0%」に変えることで、「123.4%」のように表示されます。 ※ パーセンテージの場合の元の値は、100を掛ける必要はない(100掛けちゃダメ)

表示形式は、以下のURLを参照してください。 http://numeraljs.com/

項目を非表示にできるようになった(便利度:★★★★)

v4以前はSQLから項目を削除する必要があったが、SQLには残したまま、redash上で非表示にできるようなった。

表(Table)では表示させたくないが、グラフでは使いたい、といった場合も、クエリを分ける必要がなく、 1つのクエリで実現できるようになった。(開発生産性、運用保守性が格段にあがりますね!)

手順

・「Table」画面の左下にある「Edit Visualization」をクリック

・非表示にしたい項目の項目名にあるチェックボックスからチェックを外す

項目の表示順を変更できるようになった(便利度:★★)

項目の表示順をSQLの変更なしに、redashで変更できるようになりました。

手順

・「Table」画面の左下にある「Edit Visualization」をクリック

・順番を変更したい項目をクリックして、移動したい場所に持っていきます。

項目の表示を右寄せ、中央寄せ(?)、右寄せの設定が可能になった(便利度:★★)

v4以前は、数値でも、文字列でも、なんでも右寄せでしたが(なので特に数値は読みずらかった)、 項目ごとに変更できるようになりました。

手順

・「Table」画面の左下にある「Edit Visualization」をクリック

・項目名の下にあるボタンで切り替える(説明が雑ですが、見れば分かると思います。)

表に表示する件数を変更できるようになった(便利度:★★★)

これまでは15件(?)だった表示件数を変更できるようになりました。

手順

・「Table」画面の左下にある「Edit Visualization」をクリック

・「Visuralization Name」の下の「Grid」をクリック

・「items per page」で表示したい件数を選択  ※ いまいま5, 10, 15, 20, 25から選択可能   (1か月の日数分表示するために50とか欲しいなぁ)

ダッシュボード

クエリの追加・削除・レイアウト変更の方法が変更

ダッシュボードにクエリ(表やグラフ)を追加・削除・レイアウト変更する方法が変更になりました。

手順

ダッシュボードの右上の「・・・」をクリック

・「Edit」をクリック

クエリの追加

・画面右下の「Add Widget」をクリック

・追加したクエリを選択  ※ v4以前で選択できた、表示サイズ(幅)は、ここでは選択できなくなりました。    追加した後にサイズを調整できるように変更となりました。

クエリの削除

・削除したいクエリの右上の「×」をクリック

クエリのレイアウト変更

・移動したいクエリを選択して、マウスで移動したい場所まで移動

クエリのサイズのバリエーションが増えた(便利度:★★★)

v4以前は、通常(画面の幅の半分)と2倍(画面の幅の全部)のみ選択できました。 v4になって、新たに画面の1/3のサイズにすることができるようになりました。

手順

ダッシュボードの右上の「・・・」をクリック

・「Edit」をクリック

・サイズを変更したいクエリを選んで、クエリの枠にカーソルを置くと、カーソルが「⇔」に変わるので、クリックしてサイズを変更します。(説明が雑ですが、やれば分かります。)

クエリの高さが変更できるようになった(便利度:★★★★)

上記と似てますが、幅だけではなく、v4から新たに高さを自由に変更することができるようになりました。

手順

ダッシュボードの右上の「・・・」をクリック

・「Edit」をクリック

・高さを変更したいクエリを選んで、クエリの枠にカーソルを置くと、カーソルが「↕」に変わるので、クリックしてサイズを変更します。(説明が雑ですが、やれば分かります。)

GoogleスプレッドシートとGoogleAppsScript(GAS) はじめの一歩(非エンジニア向け)

エニグモ データ活用推進室 @kmt47 です。 この記事はEnigmo Advent Calendar 2018の5日目です。

自己紹介

2018年4月にエニグモに入社しました。 社会人経験、エンジニア経験は、かれこれ20年を超えました。 あえて分類するとデータベースエンジニアになるでしょうか。 SIer時代は、ご多分に漏れず、PG, PL, PMなど経験しましたが… その頃は「DBといえばOracle」という時代でしたので、Oracleは7から9iくらいまでかなりやりました。 そこからネット系の事業会社を転々と。

この記事の目的

タイトルに「はじめの一歩」とあるように、はじめてGoogle SpreadsheetでGoogle Apps Script(GAS)を利用する方向けの内容です。 どちらかというと非エンジニア向けの内容です。

新しいことを習得しようとしたときに、誰もが最初は初心者です。いろいろな壁にぶつかると思います。何時間も悩んだ挙句、気づいてみたらとても些細なことだった、なんてことも一度や二度ではないでしょう。 ただ、ある程度コツが掴めてくると、そこからは順調に事が進むようにもなりますよね。(エラーが出たときの対処の方法や、分からないことが出てきた時の調べ方のコツが身についてくるんだと思います。)

ただ、コツを掴む前に挫折してしまう人もいるでしょう。 そもそも、最初の一歩すら踏み出せない方もいるでしょう。

本記事は、そんな一歩を踏み出すための「何となくできそうかも」を感じてもらうことを目的としています。なので、最小限の手順で、最小限のプログラムとし、難しい解説は省いています。

まずは「はじめの一歩」を踏み出していただければ幸いです。

GASとは?

Google Apps Script(GAS)とは、Google社が提供するスクリプト言語です。以上。

GASで出来ること

GASでは、GoogleスプレッドシートGoogleカレンダーなどGoogle社が提供するサービスに対して操作(処理)することができます。まぁ、いろいろできますが、それは追々分かっていけばよいと思います。

本題

目次

  1. Google Spreadsheetを開く
  2. スクリプトエディタを開く
  3. GASを実行する
  4. スプレッドシートの値を取得する
  5. スプレッドシートに値を出力する

Google Spreadsheetを開く

Googleドライブから「新規」>「Googleスプレッドシート」>「空のスプレッドシート」を選択

空のスプレッドシートを開くことができました!!(GAS以前の話ですね)

スクリプトエディタを開く

「ツール」>「スクリプトエディタ」を選択

別のタブにスクリプトエディタが開きます。

プログラムを書くエディタらしき画面ができていましたね。(パチパチ)

GASを実行する

2行目に以下を記載して、「▶」ボタンを選択

Logger.log("Hello, World!");

Logger.logは、引数の値をログに出力する関数です。

「表示」>「ログ」を選択、ログ画面が表示

おめでとうございます。あなたの作成したGASが見事に実行されました!(パチパチパチ)

スプレッドシートの値を取得する

A1のセルの値を読み込んで、ログに出力してみましょう。

ここから少しプログラミングっぽくなってきますよ。

function myFunction() {
  //アクティブのシートを取得
  var sheet = SpreadsheetApp.getActiveSheet();
  
  //セルA1を変数rangeに取得する
  var range = sheet.getRange('A1');
  
  //rangeの値をログに出力する
  Logger.log(range.getValue());
}

SpreadsheetApp.getActiveSheet()で対象のシートを変数(sheet)に格納します。 getActiveSheet()では、アクティブ(いま選択されている)シートが選択されます。 getSheetByName(シート名)では、シート名からシートを選択できます。

次に対象のセルを変数(range)に格納します。 sheet.getRange('A1')のように、セルのアドレスを指定する方法と、 sheet.getRange(1, 1)のように、セルの行番号と列番号を指定する方法があります。 sheet.getRange(行番号, 列番号)の順で指定します。

Logger.log(range.getValue())では、指定したセルの値をログに出力しています。

A1のセルの値がログに出力されました。

スプレッドシートに値を出力する

次は、スプレッドシートに値を出力するプログラムです。

function myFunction() {
  //アクティブのシートを取得
  var sheet = SpreadsheetApp.getActiveSheet();
  
  //セルA3を変数rangeに取得する
  var range = sheet.getRange('A3');
  
  //A3に"Hello, GAS!"を入力
  range.setValue("Hello, GAS!");
}

スプレッドシートの値を取得する」と同様に SpreadsheetApp.getActiveSheet()でシートを変数(sheet)に格納し、 sheet.getRange('A3')で対象のセルを変数(range)に格納します。

そして、range.setValue("Hello, GAS!")で対象のセルに値を入力します。

A3のセルに値を出力することができました。

まとめ

データを読み込んで、書き出すというプログラミングの基本を通して、GASによるGoogleスプレッドシートの操作について見てきました。

「何となくできそうかも」と感じていただけたでしょうか?

本格的にGASを利用するとなると、関数、変数、データ型、オブジェクト、メソッド、配列などなど、知っておく必要のあることは、いろいろあります。

ただ、いっぺんに全て理解する必要はありません。必要なことから少しずつ理解を進めていけば大丈夫です。

私はその過程で以下のサイトを大変参考にさせていただきました。 例題をもとに分かりやすく解説されていますので、是非参考にしていただければと思います。

「いつも隣にITのお仕事」 https://tonari-it.com/

【保存版】初心者向け実務で使えるGoogle Apps Script完全マニュアル https://tonari-it.com/google-apps-script-manual/

それでは、GASへの「はじめの一歩」を踏み出していただければ幸いです。