Ben Chuanlong Du's Blog

It is never too late to learn.

Rust and Spark

Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!

The simplest and best way is to leverage pandas_udf in PySpark. In the pandas UDF, you can call subprocess.run to run any shell command and capture its output.

from pathlib import Path
import subprocess as sp
import pandas as pd

CMD = "./pineapple test --id1-path {} --htype1 3 --n0 2 --n2 5 --ratio2 0.001"


def run_cmd(cmd: str) -> str:
    try:
        proc = sp.run(cmd, shell=True, check=True, capture_output=True)
    except sp.CalledProcessError as err:
        print(f"Here: {err}\nOutput: {err.stdout}\nError: {err.stderr}")
        print("Content of Directory:")
        for p in Path(".").glob("*"):
            print(f"    {p}")
        raise err
    return proc.stdout.strip().decode()


@pandas_udf("string", PandasUDFType.SCALAR)
def test_score_r4(id1):
    path = Path(tempfile.mkdtemp()) / "id1.txt"
    path.write_text("\n".join(str(id_) for id_ in id1))
    output = run_cmd(CMD.format(path))
    return pd.Series(output.split())

References

Comments