Forum Discussion
Piyush_Thakur
Aug 28, 2020Copper Contributor
Unable to write CSV file to Azure Blob Storage using Pyspark
Hi,
I am trying to write CSV file to an Azure Blob Storage using Pyspark andI have installed Pyspark on my VM but I am getting this error.
org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:245)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:89)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:182)
at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
... 20 more
My Pyspark code is as below: -
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
storage_account_name = "######"
storage_account_access_key = "####################################3"
spark.conf.set("fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",storage_account_access_key)
spark._jsc.hadoopConfiguration().set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark._jsc.hadoopConfiguration().set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark._jsc.hadoopConfiguration().set("fs.azure.account.key.bdpccadlsdev2.blob.core.windows.net", storage_account_access_key)
df = spark.read.format("csv").option("inferSchema", "true").option("header","true").load("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/<path_to_csv>")
df.show()
df=df.withColumn("c1",lit("1"))
df.show()
df.coalesce(1).write.mode("overwrite").option("header", "true").format("csv").save("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/<path_to_write_csv>")
I have used hadoop-azure-2.7.0.jar and azure-storage-2.2.0.jar JARS to read the CSV from my Blob. But I am not able to write back to the blob storage.
Thanks,
Piyush Thakur
2 Replies
- malfar1984Copper ContributorTry:
spark = SparkSession.builder \
.config('spark.master', 'local[*]') \
.config('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2') \
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped", "true") \
.getOrCreate()- Amrinder_Singh
Microsoft
One thing to check is whether you are using a blob storage account or a ADLS Gen 2 (HNS) account. If you are making use of ADLS Gen2 kindly try connecting with ABFS driver instead of WASBS driver.