Glue Job実行時に発生する ”File already Exists” の原因を調査する

これはなに?

Glue JobでデータフレームからファイルをS3に出力する際に度々おきる ”java.io.IOException: File already exists” の原因調査について、かなりまとまっている記事があったので日本語訳を載せます。

元記事 confusedcoders.com

Code

dataDF.write.partitionBy("year", "month", "date").mode(SaveMode.Append).text("s3://data/test2/events/")

スクリプトの内容は、DataDrameの内容をパーティションに分割して、S3にTextファイルとして出力。

<SaveModeの仕様>

f:id:ykoomaru:20200808113348p:plain

エラー

複数のExecuter がS3にファイルを書き込みに行く際に、”java.io.IOException: File already exists” エラーが発生する。

16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Aborting task.
java.io.IOException: File already exists:s3://path/1839dd1ed38a.gz
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177)
 at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
 at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156)
 at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125)
 at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
 at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424)
 at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
16/07/06 02:15:05 INFO output.DirectFileOutputCommitter: Nothing to clean up on abort since there are no temporary files written
16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Task attempt attempt_201607060215_0004_m_001709_3 aborted.
16/07/06 02:15:05 ERROR executor.Executor: Exception in task 1709.3 in stage 4.0 (TID 12093)
org.apache.spark.SparkException: Task failed while writing rows.
 at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: File already exists:s3://path/a984-1839dd1ed38a.gz
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177)
 at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
 at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156)
 at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125)
 at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
 at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424)
 at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356)
 ... 8 more

原因と対処方法

1. 一般的には、このエラーと一緒に ArithmeticException/UnknownHostException のような他の例外が隠されています。Yarn のログを確認してください。

yarn logs -applicationId <application_id>

java - Spark Exception : Task failed while writing rows - Stack Overflow

ーーーーーーーーーーーー

2. 古いバージョンのParquet Writerを使用しています。これは、Text Writerではなく、Parquet Writerのみに適用されます。(出力ファイルがParquetだった場合のみ)

[SPARK-8413] DirectParquetOutputCommitter doesn't clean up the file on task failure - ASF JIRA

ーーーーーーーーーーーー

3. S3やGSからスローされる例外の種類が間違っています。Spark は FileNotFoundException を期待していますが、バックエンドシステムからはより一般的な例外 IOException がスローされます。

https://groups.google.com/forum/#!topic/cloud-dataproc-discuss/jNP7fkJdD5A

ーーーーーーーーーーーー

4. S3同期の問題 - S3はファイルを同期できず、ランダムにこのエラーをスローします。

S3 のSlowDownの可能性もあるっぽいので、S3のリクエストメトリクス有効化すると問題の切り分けになりますね。

Why do I get 'java.io.IOException: File already exists' for saveAsTable with Overwrite mode? - Databricks Community Forum

ーーーーーーーーーーーー

5. Sparkの投機的実行の問題。なぜかS3へのSpark投機モードの書き込みが壊れています。複数のExecuterが同じファイルを書こうとするとエラーとなる。

IOException writing Parquet file with SaveMode.Append when speculation kicks in - Databricks Community Forum

最後に

”File already Exists” が起きた場合は大体この中のどれかが原因だと思います。エラーの内容は同じでも原因は沢山あるので、一つ一つ辺りをつけて対処する必要があります。Glue使う場合は、Sparkの利用を正確に理解していると問題解決が早そう。