雲のメモ帳

猫とクラウドと旅行が好きなインフラエンジニアです。 日々の調べたことや興味が持ったことをこのブログにアウトプットします。

Azure Databricksのpysparkでデータ加工する際によく使うコード集

これはなに?

私がDatabricksのpysparkを使ったデータ加工する際によく使っているコードをメモとして残します。
Databricks特有の部分と一般的なpysparkの部分を分けて書いています。

サンプルデータ

サンプルデータはAWSから提供されているウェブとソーシャルメディア分析のデータを利用しています。

https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/samples/web-and-social-analytics.csv.zip

f:id:ykoomaru:20210403162613p:plain

Azure Databricks 特有

BlobストレージをDatabricksにマウント

Databricksでは、Blobストレージをローカルストレージのようにマウントすることができます。
AzureでDatabricksを利用する場合は、まず最初に設定することになると思います。

手順書こうかと思ったのですが、ナレコムさんのブログがわかりやすいのでリンク貼っておきます。

Azure Databricks: 3-1. DBFSにBlob Storageをマウント – ナレコムAzureレシピ

データフレームからDELTA テーブル作成 [saveAsTable]

使用方法

データフレームを作ったらあとは、テーブル名を指定して、saveAsTable をするだけでDELTAテーブルが作成できます。

df.write.format("delta").saveAsTable('<table_name>')

テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs

df.write.format("delta").saveAsTable('web_and_social_analytics')

テーブル確認

テーブルを作成するとDtabricksコンソールのDataからスキーマやサンプルデータを参照することができるようになっています。

f:id:ykoomaru:20210403173121p:plain

DELTAテーブルをデータフレームにロード [table]

使用方法

テーブルのロードはテーブル名を指定して、spark.tableメソッドを呼び出すだけなので簡単です。
読み込んだデータはSparkのデータフレームになっています。

spark.table('<table_name>')

テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs

df = spark.table('web_and_social_analytics')
display(df)

f:id:ykoomaru:20210403174046p:plain

DELTAテーブルにレコード追加 [saveAsTable]

使用方法

modeでappendを指定することでデータフレームの内容を既存のDELTAテーブルに追記することができます。

df.write.format("delta").mode("append").saveAsTable("events")

テーブルのバッチ読み取りと書き込み-Azure Databricks - Workspace | Microsoft Docs

df2というデータフレームをweb_and_social_analyticsテーブルに追記しています。

df2.write.format('delta').mode('append').saveAsTable('web_and_social_analytics')

別のノートブックを実行 [notebook.run]

データクレンジング処理とかで、引数を変えてノートブックを実行したいときによく使います。

使用方法

dbutils.notebook.run(<notebook_path>, <timeout>, {<argument>: <data>})
項目 備考
notebook_path 実行するノートブックのパス
timeout ノートブックの実行タイムアウト 0を指定するとタイムアウト無し
argument 引数 項目 任意
実行先のノートブックに渡す項目
data 引数 値 任意
実行先のノートブックに渡す値

dbutils.notebook.run('./test_notebook', 0, {'test': 'abcdefg'})

ちなみに、呼びだれた側のノートブックでは、dbutils.widgets.textを使うことで、引数を取得できます。

dbutils.widgets.text("test", "0", "0")
args = getArgument("test")
print(args)
# abcdefg

DELTAテーブル削除

使用方法

公式のドキュメントでpysparkでテーブル削除する方法が見当たらなかったので、Spark sqlでテーブル名を指定して、DELTAテーブルを削除しています。

spark.sql('DROP TABLE <table_name>')

テーブル Azure Databricks の削除 - Workspace | Microsoft Docs

spark.sql('DROP TABLE web_and_social_analytics')

pyspark 関連

新しいカラム作成 [withColumn]

使い方

df.withColumn('<新しいカラム名>','<値>')

'New_visitors_SEO' と 'New_visitors_CPC'の値を足して、'New_Col' という新しいカラムを追加しています。

df2 = df.withColumn("New_Col", df['New_visitors_SEO']+ df['New_visitors_CPC'])
display(df2['New_visitors_SEO','New_visitors_CPC','New_Col'])

f:id:ykoomaru:20210403181045p:plain

カラム名修正 [withColumnRenamed]

使い方

df.withColumnRenamed(<変換元カラム名>, <変換先カラム名>)

df = df.withColumnRenamed('New visitors SEO', 'New_visitors_SEO')
display(df)

f:id:ykoomaru:20210403171615p:plain

カラム名一括修正

DatabricksのDELTAテーブルのカラム名にスペースが入っているとエラーとなるため、スペースをアンダーバーに一括変換するサンプルスクリプトです。

df = spark.read.format("csv").load("web_and_social_analytics.csv",header='true')

new_columns = []
for i in df.columns:
  new_columns.append(i.replace(' ','_'))
df  = df.toDF(*new_columns)

display(df)

カラム削除 [drop]

使用方法

df.drop('<カラム名>')

カンマ区切りで複数のカラムを削除することができます。

df.drop('New_visitors_SEO','New_visitors_CPC')

重複排除 [drop_duplicates]

使用方法

カラム名を指定しない場合は全てのレコードでの重複排除となります。

df.drop_duplicates([<カラム名1>,<カラム名2>])

'Events'で重複排除した結果、1461件あったデータが9件まで減っています。

print(df.count())
df2 = df.drop_duplicates(['Events'])
print(df2.count())

f:id:ykoomaru:20210403184316p:plain

各列に一意のID付与 [monotonically_increasing_id]

使用方法

monotonically_increasing_id を使って各列に一意のIDを振っていきます。
注意点としては、基本的には連続しているのですが、並列で実行するとIDが飛んでしまうので、単純に一意の値が振りたい場合のみ利用してください。

from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("<カラム名>", monotonically_increasing_id())

from pyspark.sql.functions import monotonically_increasing_id
df2 = df.withColumn("ID", monotonically_increasing_id())
display(df2)

f:id:ykoomaru:20210403185052p:plain

テキストファイル名、コンテンツを1レコードで取得する[wholeTextFiles]

使用方法

'wholeTextFiles' を使って、ファイル名と値を2カラム、1レコードで取得します。
形式がRDDになるので、扱いやすいようにtoDF()でデータフレームに置き換えています。

rdd =spark.sparkContext.wholeTextFiles("csv").load("<csv_path>")
tmp_df = rdd.toDF()
display(tmp_df)

実行結果

1にファイル名、2にファイルの中身が入ります。 f:id:ykoomaru:20210403163246p:plain

データクレンジングをやっていると結構フォーマットがバラバラで扱いにくく、ファイルを一件ずつ加工処理しているとすごい時間がかかります。そういう場合に、とりあえずSparkにwholeTextFilesでデータを持ってきて、後からクレンジング処理するようにしています。

最後に

Databricksがデータ加工、分析するのに必要なツールが全て揃ってて、本当に便利。14日間のトライアルがあるので、使ったことなければぜひ一度使ってみてください。