こんにちは、エニグモでデータサイエンティストをしている堀部です。 昨年末から使い始めたdbt x BigQueryについて共有します。 BigQuery歴2年、SQL歴5年ほどになります。 QUALIFY句が好きです。
dbtを使い始めたきっかけ
SQLでの集計は嫌いではないのですが、以下の2点で困っていることがありました。
1点目は、BigQuery特有のエラー Resources exceeded during query execution: Not enough resources for query planning - too many subqueries or query is too complex
(以後、too complex エラー)です。 create temp table
を使って一時的な中間テーブルを挟むことで回避してきたのですが、その都度書き換えるコストがかかっていました。
2点目は、似たようなクエリを以前に書いたこと覚えがあっても、過去の自分が書いたクエリ*1が長く該当箇所を見つけるのに時間がかかってしまうという課題がありました。
この2点をまとめて解決できそうだと感じ、使い始めたのがdbtでした。
使ってみてよかった点
with句を分割して管理できる
→ 部分的なクエリの再利用がしやすくなった
データモデルの種類(view、table、intermediate、ephemeral)を簡単に変更できる
→ too complexエラーの回避が簡単に
yaml(dbt_project.yml)で複数のクエリで共通に利用できる変数(vars)を管理することができ、CLIで変数を上書きして実行することができる
→ 汎用的なクエリを作成して、varsだけを変更することで様々なパターンを試せるようになった
また、jinja2を使ったmacroを前処理〜特徴量生成で利用してみたら便利だったので紹介します。
前処理〜特徴量生成の例
bigqueryの公開データのbigquery-public-data.ml_datasets.census_adult_income
を使って実際に利用したファイルを元に紹介します。*2
dbt_project.yml
modelsとvarsの部分のみを変更しています。
# Name your project! Project names should contain only lowercase characters # and underscores. A good package name should reflect your organization's # name or the intended use of these models name: 'techblog_202201' version: '1.0.0' config-version: 2 # This setting configures which "profile" dbt uses for this project. profile: 'sample' # These configurations specify where dbt should look for different types of files. # The `source-paths` config, for example, states that models in this project can be # found in the "models/" directory. You probably won't need to change these! source-paths: ["models"] analysis-paths: ["analysis"] test-paths: ["tests"] data-paths: ["data"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_modules" # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models # In this example config, we tell dbt to build all models in the example/ directory # as tables. These settings can be overridden in the individual model files # using the `{{ config(...) }}` macro. models: techblog_202201: temp_table: +materialized: table +hours_to_expiration: 1 table: +materialized: table view: +materialized: view vars: base_table: bigquery-public-data.ml_datasets.census_adult_income index_col: id target_col: income_bracket list_agg: - avg - max - min - stddev
macro
macro/get_columns_list.sql
BigQueryのINFORMATION_SCHEMAを利用してカラムの一覧を取得できるmacroを作成して利用しています。typesに型のリストを渡すことで、該当する型のカラムのみを取得することができます。
{% macro get_columns_list(table_name, types=None) -%} {% set columns_query %} select column_name from `{{table_name.dataset}}.INFORMATION_SCHEMA.COLUMNS` where table_schema = "{{table_name.dataset}}" and table_name = "{{table_name.name}}" {%- if types is not none %} and data_type in ( {%- for type in types %} {%- if loop.last %} "{{type}}" {%- else %} "{{type}}", {%- endif %} {%- endfor %} ) {%- endif %} {% endset %} {% set results = run_query(columns_query) %} {% if execute %} {% set list_results = results.columns[0].values() %} {% else %} {% set list_results = [] %} {% endif %} {{ return(list_results) }} {% endmacro %}
models
jinjaで書いたクエリ
↓
dbt compile
で生成されたクエリ
の順で紹介していきます。
models/view/row_census_adult_income.sql
元々のテーブルデータにindexとなるカラムid
を追加しています。
select row_number() over (order by 1) as {{var("index_col")}}, * from `{{var("base_table")}}`
↓ compile
select row_number() over (order by 1) as id, * from `bigquery-public-data.ml_datasets.census_adult_income`
models/temp_table/stg_census_adult_income.sql
カテゴリ変数に対して以下の前処理を実施
- 空白削除
- 小文字化
- 正規化
目的変数(
income_bracket
)- 2値なので0,1に変換*3
{%- set ref_table = "row_census_adult_income" %} {%- set list_numeric_columns = get_columns_list(ref(ref_table),types=["FLOAT64","INT64"]) -%} {%- set list_categorical_columns = get_columns_list(ref(ref_table),types=["STRING"]) -%} select {%- for col in list_numeric_columns %} {{col}}, {%- endfor %} {%- for col in list_categorical_columns %} {%- if col != var("target_col") %} normalize(lower(trim({{col}})), NFKC) as {{col}}, {%- endif %} {%- endfor %} case when {{var("target_col")}} = "<=50K" then 1 else 0 end as {{var("target_col")}}, from {{ref(ref_table)}}
↓ compile
select id, age, functional_weight, education_num, capital_gain, capital_loss, hours_per_week, normalize(lower(trim(workclass)), NFKC) as workclass, normalize(lower(trim(education)), NFKC) as education, normalize(lower(trim(marital_status)), NFKC) as marital_status, normalize(lower(trim(occupation)), NFKC) as occupation, normalize(lower(trim(relationship)), NFKC) as relationship, normalize(lower(trim(race)), NFKC) as race, normalize(lower(trim(sex)), NFKC) as sex, normalize(lower(trim(native_country)), NFKC) as native_country, case when income_bracket = "<=50K" then 1 else 0 end as income_bracket, from `buyma-analytics`.`techblog_202201_dev`.`row_census_adult_income`
models/table/feature_census_adult_income.sql
- 量的変数
- そのまま
- カテゴリ変数
- dense_rank()で擬似的にLabel Encoding
- カテゴリ変数 x 量的変数
- カテゴリ変数ごとに統計量(平均、最小、最大、標準偏差)を取得
{%- set ref_table = "stg_census_adult_income" -%} {%- set list_numeric_columns = get_columns_list(ref(ref_table),types=["FLOAT64","INT64"]) -%} {%- set list_categorical_columns = get_columns_list(ref(ref_table),types=["STRING"]) -%} select {%- for numeric_column in list_numeric_columns %} {%- if numeric_column != var("target_col") %} {{numeric_column}}, {%- endif %} {%- endfor %} {%- for categorical_column in list_categorical_columns %} {%- set loop_index = loop.index0 + 1 %} dense_rank() over (order by {{categorical_column}}) as {{categorical_column}}, {%- for numeric_column in list_numeric_columns %} {%- if numeric_column not in [var("target_col"), var("index_col")] %} {%- for agg in var("list_agg") %} {{agg}}({{numeric_column}}) over (partition by {{categorical_column}}) as {{agg}}_{{numeric_column}}_by_{{categorical_column}}, {%- endfor %} {%- endif %} {%- endfor %} {%- endfor %} {{var("target_col")}} from {{ref(ref_table)}} order by 1
↓ compile
select id, age, functional_weight, education_num, capital_gain, capital_loss, hours_per_week, dense_rank() over (order by workclass) as workclass, avg(age) over (partition by workclass) as avg_age_by_workclass, max(age) over (partition by workclass) as max_age_by_workclass, min(age) over (partition by workclass) as min_age_by_workclass, stddev(age) over (partition by workclass) as stddev_age_by_workclass, avg(functional_weight) over (partition by workclass) as avg_functional_weight_by_workclass, max(functional_weight) over (partition by workclass) as max_functional_weight_by_workclass, min(functional_weight) over (partition by workclass) as min_functional_weight_by_workclass, stddev(functional_weight) over (partition by workclass) as stddev_functional_weight_by_workclass, avg(education_num) over (partition by workclass) as avg_education_num_by_workclass, max(education_num) over (partition by workclass) as max_education_num_by_workclass, min(education_num) over (partition by workclass) as min_education_num_by_workclass, stddev(education_num) over (partition by workclass) as stddev_education_num_by_workclass, avg(capital_gain) over (partition by workclass) as avg_capital_gain_by_workclass, max(capital_gain) over (partition by workclass) as max_capital_gain_by_workclass, min(capital_gain) over (partition by workclass) as min_capital_gain_by_workclass, stddev(capital_gain) over (partition by workclass) as stddev_capital_gain_by_workclass, avg(capital_loss) over (partition by workclass) as avg_capital_loss_by_workclass, max(capital_loss) over (partition by workclass) as max_capital_loss_by_workclass, min(capital_loss) over (partition by workclass) as min_capital_loss_by_workclass, stddev(capital_loss) over (partition by workclass) as stddev_capital_loss_by_workclass, avg(hours_per_week) over (partition by workclass) as avg_hours_per_week_by_workclass, max(hours_per_week) over (partition by workclass) as max_hours_per_week_by_workclass, min(hours_per_week) over (partition by workclass) as min_hours_per_week_by_workclass, stddev(hours_per_week) over (partition by workclass) as stddev_hours_per_week_by_workclass, dense_rank() over (order by education) as education, avg(age) over (partition by education) as avg_age_by_education, max(age) over (partition by education) as max_age_by_education, min(age) over (partition by education) as min_age_by_education, stddev(age) over (partition by education) as stddev_age_by_education, avg(functional_weight) over (partition by education) as avg_functional_weight_by_education, max(functional_weight) over (partition by education) as max_functional_weight_by_education, min(functional_weight) over (partition by education) as min_functional_weight_by_education, stddev(functional_weight) over (partition by education) as stddev_functional_weight_by_education, avg(education_num) over (partition by education) as avg_education_num_by_education, max(education_num) over (partition by education) as max_education_num_by_education, min(education_num) over (partition by education) as min_education_num_by_education, stddev(education_num) over (partition by education) as stddev_education_num_by_education, avg(capital_gain) over (partition by education) as avg_capital_gain_by_education, max(capital_gain) over (partition by education) as max_capital_gain_by_education, min(capital_gain) over (partition by education) as min_capital_gain_by_education, stddev(capital_gain) over (partition by education) as stddev_capital_gain_by_education, avg(capital_loss) over (partition by education) as avg_capital_loss_by_education, max(capital_loss) over (partition by education) as max_capital_loss_by_education, min(capital_loss) over (partition by education) as min_capital_loss_by_education, stddev(capital_loss) over (partition by education) as stddev_capital_loss_by_education, avg(hours_per_week) over (partition by education) as avg_hours_per_week_by_education, max(hours_per_week) over (partition by education) as max_hours_per_week_by_education, min(hours_per_week) over (partition by education) as min_hours_per_week_by_education, stddev(hours_per_week) over (partition by education) as stddev_hours_per_week_by_education, dense_rank() over (order by marital_status) as marital_status, avg(age) over (partition by marital_status) as avg_age_by_marital_status, max(age) over (partition by marital_status) as max_age_by_marital_status, min(age) over (partition by marital_status) as min_age_by_marital_status, stddev(age) over (partition by marital_status) as stddev_age_by_marital_status, avg(functional_weight) over (partition by marital_status) as avg_functional_weight_by_marital_status, max(functional_weight) over (partition by marital_status) as max_functional_weight_by_marital_status, min(functional_weight) over (partition by marital_status) as min_functional_weight_by_marital_status, stddev(functional_weight) over (partition by marital_status) as stddev_functional_weight_by_marital_status, avg(education_num) over (partition by marital_status) as avg_education_num_by_marital_status, max(education_num) over (partition by marital_status) as max_education_num_by_marital_status, min(education_num) over (partition by marital_status) as min_education_num_by_marital_status, stddev(education_num) over (partition by marital_status) as stddev_education_num_by_marital_status, avg(capital_gain) over (partition by marital_status) as avg_capital_gain_by_marital_status, max(capital_gain) over (partition by marital_status) as max_capital_gain_by_marital_status, min(capital_gain) over (partition by marital_status) as min_capital_gain_by_marital_status, stddev(capital_gain) over (partition by marital_status) as stddev_capital_gain_by_marital_status, avg(capital_loss) over (partition by marital_status) as avg_capital_loss_by_marital_status, max(capital_loss) over (partition by marital_status) as max_capital_loss_by_marital_status, min(capital_loss) over (partition by marital_status) as min_capital_loss_by_marital_status, stddev(capital_loss) over (partition by marital_status) as stddev_capital_loss_by_marital_status, avg(hours_per_week) over (partition by marital_status) as avg_hours_per_week_by_marital_status, max(hours_per_week) over (partition by marital_status) as max_hours_per_week_by_marital_status, min(hours_per_week) over (partition by marital_status) as min_hours_per_week_by_marital_status, stddev(hours_per_week) over (partition by marital_status) as stddev_hours_per_week_by_marital_status, dense_rank() over (order by occupation) as occupation, avg(age) over (partition by occupation) as avg_age_by_occupation, max(age) over (partition by occupation) as max_age_by_occupation, min(age) over (partition by occupation) as min_age_by_occupation, stddev(age) over (partition by occupation) as stddev_age_by_occupation, avg(functional_weight) over (partition by occupation) as avg_functional_weight_by_occupation, max(functional_weight) over (partition by occupation) as max_functional_weight_by_occupation, min(functional_weight) over (partition by occupation) as min_functional_weight_by_occupation, stddev(functional_weight) over (partition by occupation) as stddev_functional_weight_by_occupation, avg(education_num) over (partition by occupation) as avg_education_num_by_occupation, max(education_num) over (partition by occupation) as max_education_num_by_occupation, min(education_num) over (partition by occupation) as min_education_num_by_occupation, stddev(education_num) over (partition by occupation) as stddev_education_num_by_occupation, avg(capital_gain) over (partition by occupation) as avg_capital_gain_by_occupation, max(capital_gain) over (partition by occupation) as max_capital_gain_by_occupation, min(capital_gain) over (partition by occupation) as min_capital_gain_by_occupation, stddev(capital_gain) over (partition by occupation) as stddev_capital_gain_by_occupation, avg(capital_loss) over (partition by occupation) as avg_capital_loss_by_occupation, max(capital_loss) over (partition by occupation) as max_capital_loss_by_occupation, min(capital_loss) over (partition by occupation) as min_capital_loss_by_occupation, stddev(capital_loss) over (partition by occupation) as stddev_capital_loss_by_occupation, avg(hours_per_week) over (partition by occupation) as avg_hours_per_week_by_occupation, max(hours_per_week) over (partition by occupation) as max_hours_per_week_by_occupation, min(hours_per_week) over (partition by occupation) as min_hours_per_week_by_occupation, stddev(hours_per_week) over (partition by occupation) as stddev_hours_per_week_by_occupation, dense_rank() over (order by relationship) as relationship, avg(age) over (partition by relationship) as avg_age_by_relationship, max(age) over (partition by relationship) as max_age_by_relationship, min(age) over (partition by relationship) as min_age_by_relationship, stddev(age) over (partition by relationship) as stddev_age_by_relationship, avg(functional_weight) over (partition by relationship) as avg_functional_weight_by_relationship, max(functional_weight) over (partition by relationship) as max_functional_weight_by_relationship, min(functional_weight) over (partition by relationship) as min_functional_weight_by_relationship, stddev(functional_weight) over (partition by relationship) as stddev_functional_weight_by_relationship, avg(education_num) over (partition by relationship) as avg_education_num_by_relationship, max(education_num) over (partition by relationship) as max_education_num_by_relationship, min(education_num) over (partition by relationship) as min_education_num_by_relationship, stddev(education_num) over (partition by relationship) as stddev_education_num_by_relationship, avg(capital_gain) over (partition by relationship) as avg_capital_gain_by_relationship, max(capital_gain) over (partition by relationship) as max_capital_gain_by_relationship, min(capital_gain) over (partition by relationship) as min_capital_gain_by_relationship, stddev(capital_gain) over (partition by relationship) as stddev_capital_gain_by_relationship, avg(capital_loss) over (partition by relationship) as avg_capital_loss_by_relationship, max(capital_loss) over (partition by relationship) as max_capital_loss_by_relationship, min(capital_loss) over (partition by relationship) as min_capital_loss_by_relationship, stddev(capital_loss) over (partition by relationship) as stddev_capital_loss_by_relationship, avg(hours_per_week) over (partition by relationship) as avg_hours_per_week_by_relationship, max(hours_per_week) over (partition by relationship) as max_hours_per_week_by_relationship, min(hours_per_week) over (partition by relationship) as min_hours_per_week_by_relationship, stddev(hours_per_week) over (partition by relationship) as stddev_hours_per_week_by_relationship, dense_rank() over (order by race) as race, avg(age) over (partition by race) as avg_age_by_race, max(age) over (partition by race) as max_age_by_race, min(age) over (partition by race) as min_age_by_race, stddev(age) over (partition by race) as stddev_age_by_race, avg(functional_weight) over (partition by race) as avg_functional_weight_by_race, max(functional_weight) over (partition by race) as max_functional_weight_by_race, min(functional_weight) over (partition by race) as min_functional_weight_by_race, stddev(functional_weight) over (partition by race) as stddev_functional_weight_by_race, avg(education_num) over (partition by race) as avg_education_num_by_race, max(education_num) over (partition by race) as max_education_num_by_race, min(education_num) over (partition by race) as min_education_num_by_race, stddev(education_num) over (partition by race) as stddev_education_num_by_race, avg(capital_gain) over (partition by race) as avg_capital_gain_by_race, max(capital_gain) over (partition by race) as max_capital_gain_by_race, min(capital_gain) over (partition by race) as min_capital_gain_by_race, stddev(capital_gain) over (partition by race) as stddev_capital_gain_by_race, avg(capital_loss) over (partition by race) as avg_capital_loss_by_race, max(capital_loss) over (partition by race) as max_capital_loss_by_race, min(capital_loss) over (partition by race) as min_capital_loss_by_race, stddev(capital_loss) over (partition by race) as stddev_capital_loss_by_race, avg(hours_per_week) over (partition by race) as avg_hours_per_week_by_race, max(hours_per_week) over (partition by race) as max_hours_per_week_by_race, min(hours_per_week) over (partition by race) as min_hours_per_week_by_race, stddev(hours_per_week) over (partition by race) as stddev_hours_per_week_by_race, dense_rank() over (order by sex) as sex, avg(age) over (partition by sex) as avg_age_by_sex, max(age) over (partition by sex) as max_age_by_sex, min(age) over (partition by sex) as min_age_by_sex, stddev(age) over (partition by sex) as stddev_age_by_sex, avg(functional_weight) over (partition by sex) as avg_functional_weight_by_sex, max(functional_weight) over (partition by sex) as max_functional_weight_by_sex, min(functional_weight) over (partition by sex) as min_functional_weight_by_sex, stddev(functional_weight) over (partition by sex) as stddev_functional_weight_by_sex, avg(education_num) over (partition by sex) as avg_education_num_by_sex, max(education_num) over (partition by sex) as max_education_num_by_sex, min(education_num) over (partition by sex) as min_education_num_by_sex, stddev(education_num) over (partition by sex) as stddev_education_num_by_sex, avg(capital_gain) over (partition by sex) as avg_capital_gain_by_sex, max(capital_gain) over (partition by sex) as max_capital_gain_by_sex, min(capital_gain) over (partition by sex) as min_capital_gain_by_sex, stddev(capital_gain) over (partition by sex) as stddev_capital_gain_by_sex, avg(capital_loss) over (partition by sex) as avg_capital_loss_by_sex, max(capital_loss) over (partition by sex) as max_capital_loss_by_sex, min(capital_loss) over (partition by sex) as min_capital_loss_by_sex, stddev(capital_loss) over (partition by sex) as stddev_capital_loss_by_sex, avg(hours_per_week) over (partition by sex) as avg_hours_per_week_by_sex, max(hours_per_week) over (partition by sex) as max_hours_per_week_by_sex, min(hours_per_week) over (partition by sex) as min_hours_per_week_by_sex, stddev(hours_per_week) over (partition by sex) as stddev_hours_per_week_by_sex, dense_rank() over (order by native_country) as native_country, avg(age) over (partition by native_country) as avg_age_by_native_country, max(age) over (partition by native_country) as max_age_by_native_country, min(age) over (partition by native_country) as min_age_by_native_country, stddev(age) over (partition by native_country) as stddev_age_by_native_country, avg(functional_weight) over (partition by native_country) as avg_functional_weight_by_native_country, max(functional_weight) over (partition by native_country) as max_functional_weight_by_native_country, min(functional_weight) over (partition by native_country) as min_functional_weight_by_native_country, stddev(functional_weight) over (partition by native_country) as stddev_functional_weight_by_native_country, avg(education_num) over (partition by native_country) as avg_education_num_by_native_country, max(education_num) over (partition by native_country) as max_education_num_by_native_country, min(education_num) over (partition by native_country) as min_education_num_by_native_country, stddev(education_num) over (partition by native_country) as stddev_education_num_by_native_country, avg(capital_gain) over (partition by native_country) as avg_capital_gain_by_native_country, max(capital_gain) over (partition by native_country) as max_capital_gain_by_native_country, min(capital_gain) over (partition by native_country) as min_capital_gain_by_native_country, stddev(capital_gain) over (partition by native_country) as stddev_capital_gain_by_native_country, avg(capital_loss) over (partition by native_country) as avg_capital_loss_by_native_country, max(capital_loss) over (partition by native_country) as max_capital_loss_by_native_country, min(capital_loss) over (partition by native_country) as min_capital_loss_by_native_country, stddev(capital_loss) over (partition by native_country) as stddev_capital_loss_by_native_country, avg(hours_per_week) over (partition by native_country) as avg_hours_per_week_by_native_country, max(hours_per_week) over (partition by native_country) as max_hours_per_week_by_native_country, min(hours_per_week) over (partition by native_country) as min_hours_per_week_by_native_country, stddev(hours_per_week) over (partition by native_country) as stddev_hours_per_week_by_native_country, income_bracket from `buyma-analytics`.`techblog_202201_dev`.`stg_census_adult_income` order by 1
このようにして、221個の特徴量を生成することができました。
Appendix:packageの利用
dbtにはpackageというライブラリのようなものがあります。*4
例えば、dbt-utilsには、get_column_values
というカラムのユニークな値のリストを取得することができます。one-hot encodingを行いたい場合は、下記のように書くことができます。*5
{%- set categorical_column = "sex" -%} {%- set ref_table = "stg_census_adult_income" -%} {%- set unique_values = dbt_utils.get_column_values(ref(ref_table), categorical_column) -%} select {%- for value in unique_values -%} case when {{categorical_column}} = "{{value}}" then 1 else 0 end as {{categorical_column}}_{{value}}, {%- endfor %} from {{ref(ref_table)}}
↓ compile
select case when sex = "male" then 1 else 0 end as sex_male, case when sex = "female" then 1 else 0 end as sex_female, from `your-project`.`your_dataset`.`stg_census_adult_income`
まとめ
個人で使っているレベルですが、SQLをエディターで書いていた時より効率よくクエリを作成することができとても便利に感じています。今回紹介しきれなかったtestやdocsなども業務では活用しています。今後はBigQueryMLと組み合わせて、前処理〜モデルの学習・推論までを全てdbt x BigQueryで完結させられたらなと考えています。*6
株式会社エニグモ 正社員の求人一覧