1. ホーム
  2. scala

[解決済み] Spark - CSVファイルをDataFrameとして読み込む?

2022-03-13 06:18:19

質問

CSVをsparkで読み込んでDataFrameに変換し、HDFSに格納したいのですが df.registerTempTable("table_name")

試してみました。

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

エラーになりました。

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Apache SparkでCSVファイルをDataFrameとして読み込むための正しいコマンドは何ですか?

どのように解決するのですか?

spark-csvはSparkのコア機能の一部であり、別のライブラリは必要ありません。 そのため、例えば以下のようなことが可能です。

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

scalaでは、(csvなら","、tsvなら" \t" などのフォーマットイン区切り文字に対応します)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")