Installation¶
In [2]:
!pip3 install -U datacompy
In [3]:
from pathlib import Path
import findspark
findspark.init(str(next(Path("/opt").glob("spark-3*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import (
IntegerType,
StringType,
StructType,
StructField,
ArrayType,
)
import datacompy
spark = SparkSession.builder.appName("datacompy").enableHiveSupport().getOrCreate()
Comparing Two pandas DataFrames¶
In [5]:
from io import StringIO
import pandas as pd
data1 = """acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001238,1.05,Lucille Bluth,,2017-01-01
10000001238,1.05,Loose Seal Bluth,,2017-01-01
"""
data2 = """acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""
df1 = pd.read_csv(StringIO(data1))
df2 = pd.read_csv(StringIO(data2))
In [6]:
df1
Out[6]:
In [7]:
df2
Out[7]:
In [8]:
compare = datacompy.Compare(
df1,
df2,
join_columns="acct_id", # You can also specify a list of columns
abs_tol=0.0001,
rel_tol=0,
df1_name="original",
df2_name="new",
)
print(compare.report())
Comparing Two Spark DataFrames¶
There is no advantage of running datacompy in a local version of Spark ! This approach consumes more memory than running datacompy on pandas DataFrames and costs more time.
If you use datacompy with a local version of Spark, make sure to import datacompy after `findspark.init(...)` .
In [9]:
sdf1 = spark.createDataFrame(df1)
sdf1.show()
In [10]:
sdf2 = spark.createDataFrame(df2)
sdf2.show()
In [11]:
comparison = datacompy.SparkCompare(
spark,
sdf1,
sdf2,
join_columns=["acct_id"], # must use a list of column(s)
cache_intermediates=True,
abs_tol=0.0001,
rel_tol=0,
match_rates=True,
)
In [18]:
type(comparison)
Out[18]:
In [12]:
comparison.report()
SparkCompare.report
takes a file handle to write the report to.
You can pass a StringIO object to SparkCompare.report
to write the report to a string buffer.
In [16]:
with StringIO() as sio:
comparison.report(sio)
report = sio.getvalue()
In [17]:
report
Out[17]:
In [ ]: