Ben Chuanlong Du's Blog

It is never too late to learn.

Handling Complicated Data Types in Python and PySpark

Tips and Traps

  1. An element in a pandas DataFrame can be any (complicated) type in Python. To save a padnas DataFrame with arbitrary (complicated) types as it is, you have to use the pickle module . The method pandas.DataFrame.to_pickle (which is simply a wrapper over pickle.dump) serialize the DataFrame to a pickle file while the method pandas.read_pickle (which is simply a wrapper over pickle.load) deserialize a pickle file into a pandas DataFrame.

  2. Apache Parquet is a binary file format that stores data in a columnar fashion for compressed, efficient columnar data representation. It is a very popular file format when working with big data (Hadoop/Spark, etc.) ecosystem. However, be aware that a Parquet file does not support arbitrary data types in Python! For example, an element of the list type is converted to a numpy array first. This requires types of elements of a column to be consistent. For this reason, numpy.ndarray is preferred to list if you want write the pandas DataFrame to a Parquet file later.

  3. It is good practice to have consistent and specific types when working with Parquet file in Python, especially when you have to deal with the Parquet file in Spark/PySpark later.

    • numpy.ndarray is preferred to list and tuple.
    • Avoid mixing different types (numpy.ndarray, list, tuple, etc.) in the same column, even if it still might work.
    • An empty numpy.ndarray is preferred to None as handling of None can be inconssitent in different situations. Specically, avoid a column with all None's. When written to a Parquet file and then read into Spark/PySpark, a column with all None's is inferred as IntegerType (due to lack of specific type information). This might or might not what you want.
  4. You can specify a schema to help Spark/PySpark to read a Parquet file. However, I don't think this is a good practice. One advantage of Parquet file is that it has schema. The accurate schema should be stored in the Parquet file. Otherwise, it is hard for other people for figure the correct shcema to use.

Types in pandas DataFrame

In [2]:
import pandas as pd
import numpy as np
import pickle

Complicated Data Types

An element in a pandas DataFrame can be any (complicated) type in Python. Below is an example pandas DataFrame with complicated data types.

In [30]:
pdf_1 = pd.DataFrame(
    {
        "x1": [1, 2, 3, 4, 5],
        "x2": [
            None,
            np.array([]),
            {"key": 1},
            np.array([0.1, 0.2, 0.3]),
            ["how", 0.5, 0.6],
        ],
    }
)

pdf_1.head()
Out[30]:
x1 x2
0 1 None
1 2 []
2 3 {'key': 1}
3 4 [0.1, 0.2, 0.3]
4 5 [how, 0.5, 0.6]
In [31]:
pdf_1.dtypes
Out[31]:
x1     int64
x2    object
dtype: object

Mixed Data Types in pandas DataFrame

The pandas DataFrame pdf_1 contains mixed data types in its column x2. It cannot be written into a Parquet file as data types in the column x2 are not compatible in Parquet.

In [32]:
pdf_1.to_parquet("/tmp/pdf_1.parquet")
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
<ipython-input-32-c3bfee9de3a5> in <module>
----> 1 pdf_1.to_parquet("/tmp/pdf_1.parquet")

/usr/local/lib/python3.8/dist-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
    197                 else:
    198                     kwargs[new_arg_name] = new_arg_value
--> 199             return func(*args, **kwargs)
    200 
    201         return cast(F, wrapper)

/usr/local/lib/python3.8/dist-packages/pandas/core/frame.py in to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
   2453         from pandas.io.parquet import to_parquet
   2454 
-> 2455         return to_parquet(
   2456             self,
   2457             path,

/usr/local/lib/python3.8/dist-packages/pandas/io/parquet.py in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, **kwargs)
    388     path_or_buf: FilePathOrBuffer = io.BytesIO() if path is None else path
    389 
--> 390     impl.write(
    391         df,
    392         path_or_buf,

/usr/local/lib/python3.8/dist-packages/pandas/io/parquet.py in write(self, df, path, compression, index, storage_options, partition_cols, **kwargs)
    150             from_pandas_kwargs["preserve_index"] = index
    151 
--> 152         table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
    153 
    154         path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(

/usr/local/lib/python3.8/dist-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pandas()

/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in dataframe_to_arrays(df, schema, preserve_index, nthreads, columns, safe)
    588 
    589     if nthreads == 1:
--> 590         arrays = [convert_column(c, f)
    591                   for c, f in zip(columns_to_convert, convert_fields)]
    592     else:

/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in <listcomp>(.0)
    588 
    589     if nthreads == 1:
--> 590         arrays = [convert_column(c, f)
    591                   for c, f in zip(columns_to_convert, convert_fields)]
    592     else:

/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in convert_column(col, field)
    575             e.args += ("Conversion failed for column {!s} with type {!s}"
    576                        .format(col.name, col.dtype),)
--> 577             raise e
    578         if not field_nullable and result.null_count > 0:
    579             raise ValueError("Field {} was non-nullable but pandas column "

/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in convert_column(col, field)
    569 
    570         try:
--> 571             result = pa.array(col, type=type_, from_pandas=True, safe=safe)
    572         except (pa.ArrowInvalid,
    573                 pa.ArrowNotImplementedError,

/usr/local/lib/python3.8/dist-packages/pyarrow/array.pxi in pyarrow.lib.array()

/usr/local/lib/python3.8/dist-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_array()

/usr/local/lib/python3.8/dist-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: ('cannot mix list and non-list, non-null values', 'Conversion failed for column x2 with type object')

However, you can serialize and deserialize the pandas DataFrame using pickle. As a matter of factor, almost all Python objects can be serialized and deserialized using pickle.

In [33]:
pdf_1.to_pickle("/tmp/pdf_1.pickle")
In [39]:
with open("/tmp/pdf_1.pickle", "rb") as fin:
    pdf_1c = pickle.load(fin)
pdf_1c
Out[39]:
x1 x2
0 1 None
1 2 []
2 3 {'key': 1}
3 4 [0.1, 0.2, 0.3]
4 5 [how, 0.5, 0.6]

Some data types are compatible in Parquet. For example, None, numpy.ndarray and list can be mixed in a pandas DataFrame column and can be written into a Parquet file.

In [47]:
pdf_2 = pd.DataFrame(
    {
        "x1": [1, 2, 3, 4, 5],
        "x2": [True, False, True, False, True],
        "x3": [None, np.array([]), [], np.array([0.1, 0.2, 0.3]), [0.4, 0.5, 0.6]],
    }
)

pdf_2.head()
Out[47]:
x1 x2 x3
0 1 True None
1 2 False []
2 3 True []
3 4 False [0.1, 0.2, 0.3]
4 5 True [0.4, 0.5, 0.6]
In [37]:
pdf_2.dtypes
Out[37]:
x1     int64
x2      bool
x3    object
dtype: object
In [38]:
pdf_2.to_parquet("/tmp/pdf_2.parquet", flavor="spark")
In [40]:
pdf_2c = pd.read_parquet("/tmp/pdf_2.parquet")
pdf_2c
Out[40]:
x1 x2 x3
0 1 True None
1 2 False []
2 3 True []
3 4 False [0.1, 0.2, 0.3]
4 5 True [0.4, 0.5, 0.6]
In [41]:
pdf_2c.dtypes
Out[41]:
x1     int64
x2      bool
x3    object
dtype: object
In [42]:
type(pdf_2.x3[0])
Out[42]:
NoneType
In [43]:
type(pdf_2.x3[1])
Out[43]:
numpy.ndarray
In [44]:
type(pdf_2.x3[2])
Out[44]:
list
In [45]:
type(pdf_2.x3[3])
Out[45]:
numpy.ndarray
In [46]:
type(pdf_2.x3[4])
Out[46]:
list

Even if some data types can be mixed together and are compatible in Parquet format, you should avoid doing this. It is good practice to have consistent and specific types when working with Parquet file in Python, especially when you have to deal with the Parquet file in Spark/PySpark later.

  • numpy.ndarray is preferred to list and tuple.
  • Avoid mixing different types (numpy.ndarray, list, tuple, etc.) in the same column, even if it still might work.
  • An empty numpy.ndarray is preferred to None as handling of None can be inconssitent in different situations. Specically, avoid a column with all None's. When written to a Parquet file and then read into Spark/PySpark, a column with all None's is inferred as IntegerType (due to lack of specific type information). This might or might not what you want.

Read the Parquet File into PySpark

In [52]:
import findspark

findspark.init(str(next(Path("/opt").glob("spark-*"))))

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()
In [53]:
df = spark.read.parquet("/tmp/pdf_2.parquet")
df.show()
+---+-----+---------------+
| x1|   x2|             x3|
+---+-----+---------------+
|  1| true|           null|
|  2|false|             []|
|  3| true|             []|
|  4|false|[0.1, 0.2, 0.3]|
|  5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+

In [54]:
df.schema
Out[54]:
StructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))

Notice that the None value is represented as null in the above PySpark DataFrame. The x3 column is represented as an array of double values.

You can provide an customized schema when reading a table into Spark.

In [64]:
schema = StructType(
    [
        StructField("x1", LongType(), False),
        StructField("x2", BooleanType(), False),
        StructField("x3", ArrayType(DoubleType()), True),
    ]
)
In [65]:
df_2 = spark.read.schema(schema).parquet("/tmp/pdf_2.parquet")
df_2.show()
+---+-----+---------------+
| x1|   x2|             x3|
+---+-----+---------------+
|  1| true|           null|
|  2|false|             []|
|  3| true|             []|
|  4|false|[0.1, 0.2, 0.3]|
|  5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+

In [66]:
df_2.schema
Out[66]:
StructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))

An empty array is NOT considered as null!

In [69]:
df_2.select(col("x1"), col("x2"), col("x3"), col("x3").isNull().alias("is_null")).show()
+---+-----+---------------+-------+
| x1|   x2|             x3|is_null|
+---+-----+---------------+-------+
|  1| true|           null|   true|
|  2|false|             []|  false|
|  3| true|             []|  false|
|  4|false|[0.1, 0.2, 0.3]|  false|
|  5| true|[0.4, 0.5, 0.6]|  false|
+---+-----+---------------+-------+

You can write the PySpark DataFrame into a Parquet file and then load it into a pandas DataFrame.

  1. null is converted to None.
  2. An array is represented as numpy.ndarray.
In [70]:
df_2.write.mode("overwrite").parquet("/tmp/df_2.parquet")
In [71]:
pdf_3 = pd.read_parquet("/tmp/df_2.parquet")
pdf_3
Out[71]:
x1 x2 x3
0 1 True None
1 2 False []
2 3 True []
3 4 False [0.1, 0.2, 0.3]
4 5 True [0.4, 0.5, 0.6]
In [72]:
pdf_3.dtypes
Out[72]:
x1     int64
x2      bool
x3    object
dtype: object
In [73]:
type(pdf_3.x3[0])
Out[73]:
NoneType
In [74]:
type(pdf_3.x3[1])
Out[74]:
numpy.ndarray
In [75]:
type(pdf_3.x3[2])
Out[75]:
numpy.ndarray
In [76]:
type(pdf_3.x3[3])
Out[76]:
numpy.ndarray
In [77]:
type(pdf_3.x3[4])
Out[77]:
numpy.ndarray

Schema of PySpark DataFrames

In [79]:
df.show()
+---+-----+---------------+
| x1|   x2|             x3|
+---+-----+---------------+
|  1| true|           null|
|  2|false|             []|
|  3| true|             []|
|  4|false|[0.1, 0.2, 0.3]|
|  5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+

In [80]:
df.schema
Out[80]:
StructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))

DataFrame.schema is of the pyspark.sql.types.StructType type.

In [81]:
type(df.schema)
Out[81]:
pyspark.sql.types.StructType

A StructType is iterable and each element is of the StructField type.

In [83]:
for field in df.schema:
    print(field)
StructField(x1,LongType,true)
StructField(x2,BooleanType,true)
StructField(x3,ArrayType(DoubleType,true),true)
In [85]:
f = df.schema[0]
f
Out[85]:
StructField(x1,LongType,true)
In [86]:
f.name
Out[86]:
'x1'
In [87]:
f.dataType
Out[87]:
LongType
In [92]:
type(f.dataType)
Out[92]:
pyspark.sql.types.LongType
In [94]:
f.dataType.typeName()
Out[94]:
'long'
In [95]:
f.dataType.simpleString()
Out[95]:
'bigint'
In [91]:
f.nullable
Out[91]:
True
In [96]:
f.simpleString()
Out[96]:
'x1:bigint'
In [99]:
DecimalType(18, 3)
Out[99]:
DecimalType(18,3)
In [101]:
f2 = StructField("gmb", DecimalType(18, 2), False)
f2
Out[101]:
StructField(gmb,DecimalType(18,2),false)
In [102]:
f2.name
Out[102]:
'gmb'
In [103]:
f2.dataType
Out[103]:
DecimalType(18,2)
In [105]:
f2.dataType.typeName()
Out[105]:
'decimal'
In [106]:
f2.dataType.simpleString()
Out[106]:
'decimal(18,2)'
In [107]:
f2.simpleString()
Out[107]:
'gmb:decimal(18,2)'
In [109]:
for field in df.schema:
    print(f"{field.name}    {field.dataType.simpleString()}")
x1    bigint
x2    boolean
x3    array<double>

Comments