これはなに?
Glueのデータカタログにカラムを追加した時に、Dynamicframeでカラムを認識できずにコケることがあったので、その対象方法と検証結果について書きました。
問題発生環境と現象
- ソースファイルは、Parquet。
- ソースファイルにカラムを追加した後に、Glue Clawlerでデータカタログ更新。
- 更新したデータカタログをAthenaでクエリを実行するとカラムが認識されている。
上記の環境でGlueのジョブ内で、Dynamic frameを利用した際に、追加したカラムが認識されずにカラムがないとジョブが実行エラーとなりました。
対処方法
調査したところ、Dynamicframeを作成する際に additional_options={"mergeSchema": "true"} を指定することで問題解決するとあり、オプション追加したところ無事カラムを認識するようになりました。
Sparkの仕様が問題の原因みたいです。
source = glueContext.create_dynamic_frame.from_catalog( database="db", table_name="table", additional_options={"mergeSchema": "true"} )
検証
発生条件について、もう少し調査してみてみました。
調査してわかったのは、ソースのParquetの構造が影響しているようです。
- 全て?のParquetファイルに新規のカラムが存在している場合は、 mergeSchema を指定しなくてもデータカタログを更新すれば認識される
- 特定のParquetに新規のカラムが存在しない場合、データカタログを更新しただけでは認識されず、mergeSchema のオプション指定が必要
- 全てのParquetファイルに新規のカラムが存在していない場合は、 mergeSchema のオプション指定を指定しても新規カラムを認識しない
検証条件
- 初回登録時から、new_column というカラムをソースのParquetに追加して、Clawlerでデータカタログ更新後のDynamicframe上でnew_columnを認識しているか確認します。
- ソースのParquetファイルは、2分割しています。
初回登録
root |-- ID: long |-- Name: string |-- Value: long
カラム追加
root |-- ID: long |-- Name: string |-- Value: long |-- new_column: string
全てのParquetファイルに新規のカラムが存在している場合 (mergeSchema: 指定なし)
Athena 実行結果
Dynamicframe 実行結果
mergeSchema: 指定なし
→新しいカラムを認識できます。
root |-- ID: long |-- Name: string |-- Value: long |-- new_column: string
+---+----+-----+----------+ | ID|Name|Value|new_column| +---+----+-----+----------+ | 1| aaa| 100| efj| | 2| bbb| 200| efj| | 3| ccc| 300| efj| | 6| eee| 100| abc| | 5| hhh| 500| abc| | 4| hhh| 700| abc| +---+----+-----+----------+
特定のParquetに新規のカラムが存在しない場合(mergeSchema: 指定なし)
Athena 実行結果
id1-3のParquetファイルからnew_column列を削除したので、NULLで表示されています。Athenaでは新規のカラムを認識できています。
Dynamicframe 実行結果
mergeSchema: 指定なし
→新しいカラムを認識できていません。Athenaの実行結果をみると新しいカラムを認識できていますが、Dynamicframeでは認識できていません。
root |-- ID: long |-- Name: string |-- Value: long
+---+----+-----+ | ID|Name|Value| +---+----+-----+ | 6| eee| 100| | 5| hhh| 500| | 4| hhh| 700| | 3| ccc| 300| | 1| aaa| 100| | 2| bbb| 200| +---+----+-----+
特定のParquetに新規のカラムが存在しない場合(mergeSchema: 有効化)
Athena 実行結果
id1-3のParquetファイルからnew_column列を削除したので、NULLで表示されています。
実行結果
mergeSchema: 有効化
→mergeSchemaオプションを有効化したので、新しいカラムを認識できます。
root |-- ID: long |-- Name: string |-- Value: long |-- new_column: string
| ID|Name|Value|new_column| +---+----+-----+----------+ | 6| eee| 100| abc| | 5| hhh| 500| abc| | 4| hhh| 700| abc| | 1| aaa| 100| null| | 3| ccc| 300| null| | 2| bbb| 200| null| +---+----+-----+----------+
全てのParquetに新規のカラムが存在しない場合(mergeSchema: 有効化)
Athena 実行結果
全てのParquetファイルからnew_column列を削除したので、NULLで表示されています。Athenaでは新規のカラムを認識できています。
Dynamicframe 実行結果
mergeSchema: 有効化
→mergeSchemaを有効にしていますが、新しいカラムのデータがすべてNULLの場合、Dynamicframeでは認識できないようです。
root |-- ID: long |-- Name: string |-- Value: long
+---+----+-----+ | ID|Name|Value| +---+----+-----+ | 6| eee| 100| | 5| hhh| 500| | 4| hhh| 700| | 3| ccc| 300| | 1| aaa| 100| | 2| bbb| 200| +---+----+-----+
stackoverflowを見ると同じ現象の人がいたので、Sparkの仕様なのかもしれない。 stackoverflow.com
ちなみに対処方法としては、ソースのParquetファイルに新しいカラムのヘッター行を入れるようにすれば、全てNULLでもDynamicframeでは認識できるようになりました。
検証結果まとめ
mergeSchemaで救えるのは、特定のParquetの新規カラムが存在しない場合と、 値はNULLでも新規カラムのヘッダー行が存在する場合ってことみたい。
mergeSchema: 指定なし | mergeSchema: 有効化 | |
---|---|---|
全てのParquetファイルに新規のカラムが存在している | ○ | ○ |
特定のParquetに新規のカラムが存在しない | ○ | ○ |
全てのParquetファイルに新規のカラムが存在しない | × | × ※1 |
※1 少なくとも1つのParquetファイルにヘッダー行があれば、値が全てNULLでも読み込み可能
mergeSchemaを有効化する場合の注意点
Stackoverflowに記載があるように実行時間が伸びる可能性があるので注意が必要です。
Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by
今回の検証環境では、ソースのParquetファイル数が少ないので、mergeSchemaを有効化しても実行時間は変わりませんでした。
最後に
思わぬ仕様でハマってしまったけど、検証してみて理解が深まってよかった。