これはなに?
私がDatabricksのpysparkを使ったデータ加工する際によく使っているコードをメモとして残します。
Databricks特有の部分と一般的なpysparkの部分を分けて書いています。
サンプルデータ
サンプルデータはAWSから提供されているウェブとソーシャルメディア分析のデータを利用しています。
https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/samples/web-and-social-analytics.csv.zip
Azure Databricks 特有
BlobストレージをDatabricksにマウント
Databricksでは、Blobストレージをローカルストレージのようにマウントすることができます。
AzureでDatabricksを利用する場合は、まず最初に設定することになると思います。
手順書こうかと思ったのですが、ナレコムさんのブログがわかりやすいのでリンク貼っておきます。
Azure Databricks: 3-1. DBFSにBlob Storageをマウント – ナレコムAzureレシピ
データフレームからDELTA テーブル作成 [saveAsTable]
使用方法
データフレームを作ったらあとは、テーブル名を指定して、saveAsTable をするだけでDELTAテーブルが作成できます。
df.write.format("delta").saveAsTable('<table_name>')
テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs
例
df.write.format("delta").saveAsTable('web_and_social_analytics')
テーブル確認
テーブルを作成するとDtabricksコンソールのDataからスキーマやサンプルデータを参照することができるようになっています。
DELTAテーブルをデータフレームにロード [table]
使用方法
テーブルのロードはテーブル名を指定して、spark.tableメソッドを呼び出すだけなので簡単です。
読み込んだデータはSparkのデータフレームになっています。
spark.table('<table_name>')
テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs
例
df = spark.table('web_and_social_analytics') display(df)
DELTAテーブルにレコード追加 [saveAsTable]
使用方法
modeでappendを指定することでデータフレームの内容を既存のDELTAテーブルに追記することができます。
df.write.format("delta").mode("append").saveAsTable("events")
テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs
例
df2というデータフレームをweb_and_social_analyticsテーブルに追記しています。
df2.write.format('delta').mode('append').saveAsTable('web_and_social_analytics')
別のノートブックを実行 [notebook.run]
データクレンジング処理とかで、引数を変えてノートブックを実行したいときによく使います。
使用方法
dbutils.notebook.run(<notebook_path>, <timeout>, {<argument>: <data>})
項目 | 値 | 備考 |
---|---|---|
notebook_path | 実行するノートブックのパス | |
timeout | ノートブックの実行タイムアウト | 0を指定するとタイムアウト無し |
argument | 引数 項目 | 任意 実行先のノートブックに渡す項目 |
data | 引数 値 | 任意 実行先のノートブックに渡す値 |
例
dbutils.notebook.run('./test_notebook', 0, {'test': 'abcdefg'})
ちなみに、呼びだれた側のノートブックでは、dbutils.widgets.textを使うことで、引数を取得できます。
dbutils.widgets.text("test", "0", "0") args = getArgument("test") print(args) # abcdefg
DELTAテーブル削除
使用方法
公式のドキュメントでpysparkでテーブル削除する方法が見当たらなかったので、Spark sqlでテーブル名を指定して、DELTAテーブルを削除しています。
spark.sql('DROP TABLE <table_name>')
テーブル Azure Databricks の削除 - Workspace | Microsoft Docs
例
spark.sql('DROP TABLE web_and_social_analytics')
pyspark 関連
新しいカラム作成 [withColumn]
使い方
df.withColumn('<新しいカラム名>','<値>')
例
'New_visitors_SEO' と 'New_visitors_CPC'の値を足して、'New_Col' という新しいカラムを追加しています。
df2 = df.withColumn("New_Col", df['New_visitors_SEO']+ df['New_visitors_CPC']) display(df2['New_visitors_SEO','New_visitors_CPC','New_Col'])
カラム名修正 [withColumnRenamed]
使い方
df.withColumnRenamed(<変換元カラム名>, <変換先カラム名>)
例
df = df.withColumnRenamed('New visitors SEO', 'New_visitors_SEO') display(df)
カラム名一括修正
例
DatabricksのDELTAテーブルのカラム名にスペースが入っているとエラーとなるため、スペースをアンダーバーに一括変換するサンプルスクリプトです。
df = spark.read.format("csv").load("web_and_social_analytics.csv",header='true') new_columns = [] for i in df.columns: new_columns.append(i.replace(' ','_')) df = df.toDF(*new_columns) display(df)
カラム削除 [drop]
使用方法
df.drop('<カラム名>')
例
カンマ区切りで複数のカラムを削除することができます。
df.drop('New_visitors_SEO','New_visitors_CPC')
重複排除 [drop_duplicates]
使用方法
カラム名を指定しない場合は全てのレコードでの重複排除となります。
df.drop_duplicates([<カラム名1>,<カラム名2>])
例
'Events'で重複排除した結果、1461件あったデータが9件まで減っています。
print(df.count()) df2 = df.drop_duplicates(['Events']) print(df2.count())
各列に一意のID付与 [monotonically_increasing_id]
使用方法
monotonically_increasing_id を使って各列に一意のIDを振っていきます。
注意点としては、基本的には連続しているのですが、並列で実行するとIDが飛んでしまうので、単純に一意の値が振りたい場合のみ利用してください。
from pyspark.sql.functions import monotonically_increasing_id df.withColumn("<カラム名>", monotonically_increasing_id())
例
from pyspark.sql.functions import monotonically_increasing_id df2 = df.withColumn("ID", monotonically_increasing_id()) display(df2)
テキストファイル名、コンテンツを1レコードで取得する[wholeTextFiles]
使用方法
'wholeTextFiles' を使って、ファイル名と値を2カラム、1レコードで取得します。
形式がRDDになるので、扱いやすいようにtoDF()でデータフレームに置き換えています。
rdd =spark.sparkContext.wholeTextFiles("csv").load("<csv_path>") tmp_df = rdd.toDF() display(tmp_df)
実行結果
1にファイル名、2にファイルの中身が入ります。
データクレンジングをやっていると結構フォーマットがバラバラで扱いにくく、ファイルを一件ずつ加工処理しているとすごい時間がかかります。そういう場合に、とりあえずSparkにwholeTextFilesでデータを持ってきて、後からクレンジング処理するようにしています。
最後に
Databricksがデータ加工、分析するのに必要なツールが全て揃ってて、本当に便利。14日間のトライアルがあるので、使ったことなければぜひ一度使ってみてください。