Spark AutoMapper¶
Fluent API to map data from one view to another in Apache Spark.
Uses native Spark functions underneath so it is just as fast as hand writing the transformations.
Since this is just Python, you can use any Python editor. Everything is typed using Python typings so most editors will auto-complete and warn you when you do something wrong.
Typical Usage¶
First create an AutoMapper (
AutoMapper). The two required parameters are:source_view: The Spark view to read the source data from
view: The destination Spark view to store the data. This view should not already exist.
Create the mappings. There are two kind of mappings:
In each mapping, access a column in the source view by using A.column(“column_name”) (
column()) or specify a literal value with A.text(“apple”) (text()).Add transformation functions to A.column(“column_name”) to transform the column. (
AutoMapperDataTypeBase)
Example¶
AutoMapper with Strongly typed classes (Recommended)
mapper = AutoMapper(
view="members",
source_view="patients"
).complex(
MyClass(
name=A.column("last_name"),
age=A.column("my_age").to_number()
)
)
The strongly typed class can be created as below (the constructor accepts whatever parameters you want and the class must implement the get_schema() method):
class MyClass(AutoMapperDataTypeComplexBase):
def __init__(
self, name: AutoMapperTextLikeBase, age: AutoMapperNumberDataType
) -> None:
super().__init__(name=name, age=age)
def get_schema(
self, include_extension: bool
) -> Optional[Union[StructType, DataType]]:
schema: StructType = StructType(
[
StructField("name", StringType(), False),
StructField("age", LongType(), True),
]
)
return schema
AutoMapper with just columns
mapper = AutoMapper(
view="members",
source_view="patients",
).columns(
dst1=A.column("src1"),
dst2=AutoMapperList(["address1"]),
dst3=AutoMapperList(["address1", "address2"]),
dst4=AutoMapperList([A.complex(use="usual", family=A.column("last_name"))]),
)
Running an AutoMapper¶
An AutoMapper is a Spark Transformer. So you can call the transform function on an AutoMapper and pass in a dataframe.
result_df: DataFrame = mapper.transform(df=df)
NOTE: AutoMapper ignores the dataframe passed in since it uses views. (This allows more flexibility)
A view can be created in Spark by calling the createOrReplaceTempView function.
df.createOrReplaceTempView("patients")
For example, you can create a view:
spark_session.createDataFrame(
[
(1, "Qureshi", "Imran", 45),
(2, "Vidal", "Michael", 35),
],
["member_id", "last_name", "first_name", "my_age"],
).createOrReplaceTempView("patients")
Contents:¶
- API Reference
spark_auto_mapperspark_auto_mapper.automappersspark_auto_mapper.automappers.automapperspark_auto_mapper.automappers.automapper_analysis_exceptionspark_auto_mapper.automappers.automapper_basespark_auto_mapper.automappers.automapper_exceptionspark_auto_mapper.automappers.check_schema_resultspark_auto_mapper.automappers.column_spec_wrapperspark_auto_mapper.automappers.columnsspark_auto_mapper.automappers.complexspark_auto_mapper.automappers.containerspark_auto_mapper.automappers.with_columnspark_auto_mapper.automappers.with_column_base
spark_auto_mapper.data_typesspark_auto_mapper.data_types.complexspark_auto_mapper.data_types.amountspark_auto_mapper.data_types.arrayspark_auto_mapper.data_types.array_basespark_auto_mapper.data_types.array_distinctspark_auto_mapper.data_types.array_maxspark_auto_mapper.data_types.booleanspark_auto_mapper.data_types.coalescespark_auto_mapper.data_types.columnspark_auto_mapper.data_types.column_wrapperspark_auto_mapper.data_types.concatspark_auto_mapper.data_types.data_type_basespark_auto_mapper.data_types.datespark_auto_mapper.data_types.date_formatspark_auto_mapper.data_types.datetimespark_auto_mapper.data_types.decimalspark_auto_mapper.data_types.expressionspark_auto_mapper.data_types.fieldspark_auto_mapper.data_types.filterspark_auto_mapper.data_types.firstspark_auto_mapper.data_types.first_valid_columnspark_auto_mapper.data_types.flattenspark_auto_mapper.data_types.floatspark_auto_mapper.data_types.hashspark_auto_mapper.data_types.if_spark_auto_mapper.data_types.if_column_existsspark_auto_mapper.data_types.if_notspark_auto_mapper.data_types.if_not_nullspark_auto_mapper.data_types.if_not_null_or_emptyspark_auto_mapper.data_types.if_regexspark_auto_mapper.data_types.join_using_delimiterspark_auto_mapper.data_types.listspark_auto_mapper.data_types.literalspark_auto_mapper.data_types.lpadspark_auto_mapper.data_types.mapspark_auto_mapper.data_types.null_if_emptyspark_auto_mapper.data_types.numberspark_auto_mapper.data_types.regex_extractspark_auto_mapper.data_types.regex_replacespark_auto_mapper.data_types.split_by_delimiterspark_auto_mapper.data_types.substringspark_auto_mapper.data_types.substring_by_delimiterspark_auto_mapper.data_types.text_like_basespark_auto_mapper.data_types.transformspark_auto_mapper.data_types.trimspark_auto_mapper.data_types.unix_timestamp
spark_auto_mapper.helpersspark_auto_mapper.type_definitions