Spark AutoMapper

Fluent API to map data from one view to another in Apache Spark.

Uses native Spark functions underneath so it is just as fast as hand writing the transformations.

Since this is just Python, you can use any Python editor. Everything is typed using Python typings so most editors will auto-complete and warn you when you do something wrong.

SparkAutoMapper Github Repo

Typical Usage

  1. First create an AutoMapper (AutoMapper). The two required parameters are:

    1. source_view: The Spark view to read the source data from

    2. view: The destination Spark view to store the data. This view should not already exist.

  2. Create the mappings. There are two kind of mappings:

    1. Complex (complex()): Using strongly typed classes e.g., FHIR resources. This is the preferred method as it provides syntax checking and auto-complete

    2. Columns (columns()): You can define arbitrary columns in destination view.

  3. In each mapping, access a column in the source view by using A.column(“column_name”) (column()) or specify a literal value with A.text(“apple”) (text()).

  4. Add transformation functions to A.column(“column_name”) to transform the column. (AutoMapperDataTypeBase)

Example

AutoMapper with Strongly typed classes (Recommended)

mapper = AutoMapper(
     view="members",
     source_view="patients"
).complex(
   MyClass(
      name=A.column("last_name"),
      age=A.column("my_age").to_number()
   )
)

The strongly typed class can be created as below (the constructor accepts whatever parameters you want and the class must implement the get_schema() method):

class MyClass(AutoMapperDataTypeComplexBase):
    def __init__(
        self, name: AutoMapperTextLikeBase, age: AutoMapperNumberDataType
    ) -> None:
        super().__init__(name=name, age=age)

    def get_schema(
        self, include_extension: bool
    ) -> Optional[Union[StructType, DataType]]:
        schema: StructType = StructType(
            [
                StructField("name", StringType(), False),
                StructField("age", LongType(), True),
            ]
        )
        return schema

AutoMapper with just columns

mapper = AutoMapper(
    view="members",
    source_view="patients",
).columns(
    dst1=A.column("src1"),
    dst2=AutoMapperList(["address1"]),
    dst3=AutoMapperList(["address1", "address2"]),
    dst4=AutoMapperList([A.complex(use="usual", family=A.column("last_name"))]),
)

Running an AutoMapper

An AutoMapper is a Spark Transformer. So you can call the transform function on an AutoMapper and pass in a dataframe.

result_df: DataFrame = mapper.transform(df=df)

NOTE: AutoMapper ignores the dataframe passed in since it uses views. (This allows more flexibility)

A view can be created in Spark by calling the createOrReplaceTempView function.

df.createOrReplaceTempView("patients")

For example, you can create a view:

spark_session.createDataFrame(
    [
        (1, "Qureshi", "Imran", 45),
        (2, "Vidal", "Michael", 35),
    ],
    ["member_id", "last_name", "first_name", "my_age"],
).createOrReplaceTempView("patients")

Contents:

Indices and tables