これはなに?
Glue JobでデータフレームからファイルをS3に出力する際に度々おきる ”java.io.IOException: File already exists” の原因調査について、かなりまとまっている記事があったので日本語訳を載せます。
- これはなに?
- Code
- エラー
- 原因と対処方法
- 1. 一般的には、このエラーと一緒に ArithmeticException/UnknownHostException のような他の例外が隠されています。Yarn のログを確認してください。
- 2. 古いバージョンのParquet Writerを使用しています。これは、Text Writerではなく、Parquet Writerのみに適用されます。(出力ファイルがParquetだった場合のみ)
- 3. S3やGSからスローされる例外の種類が間違っています。Spark は FileNotFoundException を期待していますが、バックエンドシステムからはより一般的な例外 IOException がスローされます。
- 4. S3同期の問題 - S3はファイルを同期できず、ランダムにこのエラーをスローします。
- 5. Sparkの投機的実行の問題。なぜかS3へのSpark投機モードの書き込みが壊れています。複数のExecuterが同じファイルを書こうとするとエラーとなる。
- 最後に
Code
dataDF.write.partitionBy("year", "month", "date").mode(SaveMode.Append).text("s3://data/test2/events/")
スクリプトの内容は、DataDrameの内容をパーティションに分割して、S3にTextファイルとして出力。
<SaveModeの仕様>
エラー
複数のExecuter がS3にファイルを書き込みに行く際に、”java.io.IOException: File already exists” エラーが発生する。
16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Aborting task. java.io.IOException: File already exists:s3://path/1839dd1ed38a.gz at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177) at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135) at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156) at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/07/06 02:15:05 INFO output.DirectFileOutputCommitter: Nothing to clean up on abort since there are no temporary files written 16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Task attempt attempt_201607060215_0004_m_001709_3 aborted. 16/07/06 02:15:05 ERROR executor.Executor: Exception in task 1709.3 in stage 4.0 (TID 12093) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: File already exists:s3://path/a984-1839dd1ed38a.gz at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177) at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135) at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156) at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356) ... 8 more
原因と対処方法
1. 一般的には、このエラーと一緒に ArithmeticException/UnknownHostException のような他の例外が隠されています。Yarn のログを確認してください。
yarn logs -applicationId <application_id>
java - Spark Exception : Task failed while writing rows - Stack Overflow
ーーーーーーーーーーーー
2. 古いバージョンのParquet Writerを使用しています。これは、Text Writerではなく、Parquet Writerのみに適用されます。(出力ファイルがParquetだった場合のみ)
[SPARK-8413] DirectParquetOutputCommitter doesn't clean up the file on task failure - ASF JIRA
ーーーーーーーーーーーー
3. S3やGSからスローされる例外の種類が間違っています。Spark は FileNotFoundException を期待していますが、バックエンドシステムからはより一般的な例外 IOException がスローされます。
https://groups.google.com/forum/#!topic/cloud-dataproc-discuss/jNP7fkJdD5A
ーーーーーーーーーーーー
4. S3同期の問題 - S3はファイルを同期できず、ランダムにこのエラーをスローします。
S3 のSlowDownの可能性もあるっぽいので、S3のリクエストメトリクス有効化すると問題の切り分けになりますね。
ーーーーーーーーーーーー
5. Sparkの投機的実行の問題。なぜかS3へのSpark投機モードの書き込みが壊れています。複数のExecuterが同じファイルを書こうとするとエラーとなる。
最後に
”File already Exists” が起きた場合は大体この中のどれかが原因だと思います。エラーの内容は同じでも原因は沢山あるので、一つ一つ辺りをつけて対処する必要があります。Glue使う場合は、Sparkの利用を正確に理解していると問題解決が早そう。