Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Tips and Traps¶
An element in a pandas DataFrame can be any (complicated) type in Python. To save a padnas DataFrame with arbitrary (complicated) types as it is, you have to use the pickle module . The method
pandas.DataFrame.to_pickle(which is simply a wrapper overpickle.dump) serialize the DataFrame to a pickle file while the methodpandas.read_pickle(which is simply a wrapper overpickle.load) deserialize a pickle file into a pandas DataFrame.Apache Parquet is a binary file format that stores data in a columnar fashion for compressed, efficient columnar data representation. It is a very popular file format when working with big data (Hadoop/Spark, etc.) ecosystem. However, be aware that a Parquet file does not support arbitrary data types in Python! For example, an element of the list type is converted to a numpy array first. This requires types of elements of a column to be consistent. For this reason,
numpy.ndarrayis preferred tolistif you want write the pandas DataFrame to a Parquet file later.It is good practice to have consistent and specific types when working with Parquet file in Python, especially when you have to deal with the Parquet file in Spark/PySpark later.
numpy.ndarrayis preferred tolistandtuple.Avoid mixing different types (
numpy.ndarray,list,tuple, etc.) in the same column, even if it still might work.An empty
numpy.ndarrayis preferred toNoneas handling ofNonecan be inconssitent in different situations. Specically, avoid a column with allNone’s. When written to a Parquet file and then read into Spark/PySpark, a column with allNone’s is inferred asIntegerType(due to lack of specific type information). This might or might not what you want.
You can specify a schema to help Spark/PySpark to read a Parquet file. However, I don’t think this is a good practice. One advantage of Parquet file is that it has schema. The accurate schema should be stored in the Parquet file. Otherwise, it is hard for other people for figure the correct shcema to use.
Types in pandas DataFrame¶
import pandas as pd
import numpy as np
import pickleComplicated Data Types¶
An element in a pandas DataFrame can be any (complicated) type in Python. Below is an example pandas DataFrame with complicated data types.
pdf_1 = pd.DataFrame(
{
"x1": [1, 2, 3, 4, 5],
"x2": [
None,
np.array([]),
{"key": 1},
np.array([0.1, 0.2, 0.3]),
["how", 0.5, 0.6],
],
}
)
pdf_1.head()pdf_1.dtypesx1 int64
x2 object
dtype: objectMixed Data Types in pandas DataFrame¶
The pandas DataFrame pdf_1 contains mixed data types in its column x2.
It cannot be written into a Parquet file
as data types in the column x2 are not compatible in Parquet.
pdf_1.to_parquet("/tmp/pdf_1.parquet")---------------------------------------------------------------------------
ArrowInvalid Traceback (most recent call last)
<ipython-input-32-c3bfee9de3a5> in <module>
----> 1 pdf_1.to_parquet("/tmp/pdf_1.parquet")
/usr/local/lib/python3.8/dist-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
197 else:
198 kwargs[new_arg_name] = new_arg_value
--> 199 return func(*args, **kwargs)
200
201 return cast(F, wrapper)
/usr/local/lib/python3.8/dist-packages/pandas/core/frame.py in to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
2453 from pandas.io.parquet import to_parquet
2454
-> 2455 return to_parquet(
2456 self,
2457 path,
/usr/local/lib/python3.8/dist-packages/pandas/io/parquet.py in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, **kwargs)
388 path_or_buf: FilePathOrBuffer = io.BytesIO() if path is None else path
389
--> 390 impl.write(
391 df,
392 path_or_buf,
/usr/local/lib/python3.8/dist-packages/pandas/io/parquet.py in write(self, df, path, compression, index, storage_options, partition_cols, **kwargs)
150 from_pandas_kwargs["preserve_index"] = index
151
--> 152 table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
153
154 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
/usr/local/lib/python3.8/dist-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pandas()
/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in dataframe_to_arrays(df, schema, preserve_index, nthreads, columns, safe)
588
589 if nthreads == 1:
--> 590 arrays = [convert_column(c, f)
591 for c, f in zip(columns_to_convert, convert_fields)]
592 else:
/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in <listcomp>(.0)
588
589 if nthreads == 1:
--> 590 arrays = [convert_column(c, f)
591 for c, f in zip(columns_to_convert, convert_fields)]
592 else:
/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in convert_column(col, field)
575 e.args += ("Conversion failed for column {!s} with type {!s}"
576 .format(col.name, col.dtype),)
--> 577 raise e
578 if not field_nullable and result.null_count > 0:
579 raise ValueError("Field {} was non-nullable but pandas column "
/usr/local/lib/python3.8/dist-packages/pyarrow/pandas_compat.py in convert_column(col, field)
569
570 try:
--> 571 result = pa.array(col, type=type_, from_pandas=True, safe=safe)
572 except (pa.ArrowInvalid,
573 pa.ArrowNotImplementedError,
/usr/local/lib/python3.8/dist-packages/pyarrow/array.pxi in pyarrow.lib.array()
/usr/local/lib/python3.8/dist-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_array()
/usr/local/lib/python3.8/dist-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
ArrowInvalid: ('cannot mix list and non-list, non-null values', 'Conversion failed for column x2 with type object')However, you can serialize and deserialize the pandas DataFrame using pickle. As a matter of factor, almost all Python objects can be serialized and deserialized using pickle.
pdf_1.to_pickle("/tmp/pdf_1.pickle")with open("/tmp/pdf_1.pickle", "rb") as fin:
pdf_1c = pickle.load(fin)
pdf_1cSome data types are compatible in Parquet.
For example,
None, numpy.ndarray and list
can be mixed in a pandas DataFrame column
and can be written into a Parquet file.
pdf_2 = pd.DataFrame(
{
"x1": [1, 2, 3, 4, 5],
"x2": [True, False, True, False, True],
"x3": [None, np.array([]), [], np.array([0.1, 0.2, 0.3]), [0.4, 0.5, 0.6]],
}
)
pdf_2.head()pdf_2.dtypesx1 int64
x2 bool
x3 object
dtype: objectpdf_2.to_parquet("/tmp/pdf_2.parquet", flavor="spark")pdf_2c = pd.read_parquet("/tmp/pdf_2.parquet")
pdf_2cpdf_2c.dtypesx1 int64
x2 bool
x3 object
dtype: objecttype(pdf_2.x3[0])NoneTypetype(pdf_2.x3[1])numpy.ndarraytype(pdf_2.x3[2])listtype(pdf_2.x3[3])numpy.ndarraytype(pdf_2.x3[4])listEven if some data types can be mixed together and are compatible in Parquet format, you should avoid doing this. It is good practice to have consistent and specific types when working with Parquet file in Python, especially when you have to deal with the Parquet file in Spark/PySpark later.
numpy.ndarrayis preferred tolistandtuple.Avoid mixing different types (
numpy.ndarray,list,tuple, etc.) in the same column, even if it still might work.An empty
numpy.ndarrayis preferred toNoneas handling ofNonecan be inconssitent in different situations. Specically, avoid a column with allNone’s. When written to a Parquet file and then read into Spark/PySpark, a column with allNone’s is inferred asIntegerType(due to lack of specific type information). This might or might not what you want.
Read the Parquet File into PySpark¶
import findspark
findspark.init(str(next(Path("/opt").glob("spark-*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("PySpark").enableHiveSupport().getOrCreate()df = spark.read.parquet("/tmp/pdf_2.parquet")
df.show()+---+-----+---------------+
| x1| x2| x3|
+---+-----+---------------+
| 1| true| null|
| 2|false| []|
| 3| true| []|
| 4|false|[0.1, 0.2, 0.3]|
| 5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+
df.schemaStructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))Notice that the None value is represented as null in the above PySpark DataFrame.
The x3 column is represented as an array of double values.
You can provide an customized schema when reading a table into Spark.
schema = StructType(
[
StructField("x1", LongType(), False),
StructField("x2", BooleanType(), False),
StructField("x3", ArrayType(DoubleType()), True),
]
)df_2 = spark.read.schema(schema).parquet("/tmp/pdf_2.parquet")
df_2.show()+---+-----+---------------+
| x1| x2| x3|
+---+-----+---------------+
| 1| true| null|
| 2|false| []|
| 3| true| []|
| 4|false|[0.1, 0.2, 0.3]|
| 5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+
df_2.schemaStructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))An empty array is NOT considered as null!
df_2.select(col("x1"), col("x2"), col("x3"), col("x3").isNull().alias("is_null")).show()+---+-----+---------------+-------+
| x1| x2| x3|is_null|
+---+-----+---------------+-------+
| 1| true| null| true|
| 2|false| []| false|
| 3| true| []| false|
| 4|false|[0.1, 0.2, 0.3]| false|
| 5| true|[0.4, 0.5, 0.6]| false|
+---+-----+---------------+-------+
You can write the PySpark DataFrame into a Parquet file and then load it into a pandas DataFrame.
nullis converted toNone.An array is represented as
numpy.ndarray.
df_2.write.mode("overwrite").parquet("/tmp/df_2.parquet")pdf_3 = pd.read_parquet("/tmp/df_2.parquet")
pdf_3pdf_3.dtypesx1 int64
x2 bool
x3 object
dtype: objecttype(pdf_3.x3[0])NoneTypetype(pdf_3.x3[1])numpy.ndarraytype(pdf_3.x3[2])numpy.ndarraytype(pdf_3.x3[3])numpy.ndarraytype(pdf_3.x3[4])numpy.ndarraySchema of PySpark DataFrames¶
df.show()+---+-----+---------------+
| x1| x2| x3|
+---+-----+---------------+
| 1| true| null|
| 2|false| []|
| 3| true| []|
| 4|false|[0.1, 0.2, 0.3]|
| 5| true|[0.4, 0.5, 0.6]|
+---+-----+---------------+
df.schemaStructType(List(StructField(x1,LongType,true),StructField(x2,BooleanType,true),StructField(x3,ArrayType(DoubleType,true),true)))DataFrame.schema is of the pyspark.sql.types.StructType type.
type(df.schema)pyspark.sql.types.StructTypeA StructType is iterable
and each element is of the StructField type.
for field in df.schema:
print(field)StructField(x1,LongType,true)
StructField(x2,BooleanType,true)
StructField(x3,ArrayType(DoubleType,true),true)
f = df.schema[0]
fStructField(x1,LongType,true)f.name'x1'f.dataTypeLongTypetype(f.dataType)pyspark.sql.types.LongTypef.dataType.typeName()'long'f.dataType.simpleString()'bigint'f.nullableTruef.simpleString()'x1:bigint'DecimalType(18, 3)DecimalType(18,3)f2 = StructField("gmb", DecimalType(18, 2), False)
f2StructField(gmb,DecimalType(18,2),false)f2.name'gmb'f2.dataTypeDecimalType(18,2)f2.dataType.typeName()'decimal'f2.dataType.simpleString()'decimal(18,2)'f2.simpleString()'gmb:decimal(18,2)'for field in df.schema:
print(f"{field.name} {field.dataType.simpleString()}")x1 bigint
x2 boolean
x3 array<double>