Blog Post

Azure Synapse Analytics Blog
2 MIN READ

How to handle Pandas breaking change on version 0.14

Liliam_C_Leme's avatar
Liliam_C_Leme
Icon for Microsoft rankMicrosoft
Oct 12, 2020

When you are using Pandas in a notebook you may notice there was a change in the Arrow IPC format from 0.15.1 onwards.

You can find more information here: https://arrow.apache.org/blog/2019/10/06/0.15.0-release/

 

So this a small post how to work around this:

 

The customer was using the example available here:

https://spark.apache.org/docs/2.4.0/sql-pyspark-pandas-with-arrow.html

Specifically this one:

 

 

 

 

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

 

 

 

When tried to run in synapse it failed with the error:

Py4JJavaError : An error occurred while calling o205.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 15, c671bd6ddc35b7487900238907316, executor 1): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)

 

Synapse Spark uses the library documented here:

https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-version-support

 

The workaround to use the the legacy Panda version is:

1) Add this to your code:

 

 

    import os
    os.environ ['ARROW_PRE_0_15_IPC_FORMAT=']='1'

 

 

and for that example, specific replace  show per display like:

 

 

#Instead of:
df.select(multiply(col("x"), col("x"))).show()

#use
display(df.select(multiply(col("x"), col("x")))) 

 

You can also replace the whl files:

 https://pypi.org/project/pyarrow/0.14.1/#files

Instructions here:

 https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-portal-add-libraries#manage-a-python-wheel

 

That is it!

Liliam 

UK Engineer

Updated Oct 12, 2020
Version 1.0
  • I tried this following code:

    ref: https://stackoverflow.com/questions/40006395/applying-udfs-on-groupeddata-in-pyspark-with-functioning-python-example

     

    import os
    os.environ ['ARROW_PRE_0_15_IPC_FORMAT']='1'

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    import pandas as pd

    df3 = spark.createDataFrame([('a'10), ('a'-142), ('b'3-1),
                                ('b'10-2)], ('key''value1''value2'))

    from pyspark.sql.types import *

    schema = StructType([StructField('key', StringType()),
                        StructField('avg_value1', DoubleType()),
                        StructField('avg_value2', DoubleType()),
                        StructField('sum_avg', DoubleType()),
                        StructField('sub_avg', DoubleType())])


    @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
    def g(df):
        gr = df['key'].iloc[0]
        x = df.value1.mean()
        y = df.value2.mean()
        w = df.value1.mean() + df.value2.mean()
        z = df.value1.mean() - df.value2.mean()
        return pd.DataFrame([[gr] + [x] + [y] + [w] + [z]])

    y_df = df3.groupby('key').apply(g)
    display(y_df)
     
    Still didn't work.
    Does synapse support the pandas udf?