Machine Learning
Systems Architect,
PhD Mathematician
When working with spark and strongly typed data, Apache Avro is generally the schema definition and serialization format for transmitting data. Spark has built-in avro support, as does Kafka and its schema registry, with which Spark integrates easily using structured streaming.
A popular alternative to avro is google’s protocol buffers, or protobuf. Compared to avro and other common serialization formats, protobuf is generally more compact, performant, and readable (imho).
Unlike avro, protobuf isn’t supported out of the box in pyspark. However, using the new pbspark
package, we can easily perform similar conversions between encoded protobuf messages and pyspark structs.
For example, suppose we have a pyspark DataFrame which contains a column value
which has protobuf encoded messages of our SimpleMessage
, defined here:
syntax = "proto3";
package example;
message SimpleMessage {
string name = 1;
int64 quantity = 2;
float measure = 3;
}
Using pbspark
we can decode the messages into spark StructType
and then flatten them. We can also re-encode them into protobuf messages:
from example.example_pb2 import SimpleMessage
from pbspark import MessageConverter
from pyspark.sql.functions import struct
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
# build a sample dataframe with a single column with a protobuf message
example = SimpleMessage(name="hello", quantity=5, measure=12.3)
data = [{"value": example.SerializeToString()}]
df = spark.createDataFrame(data)
# convert the encoded messages to pyspark structs
mc = MessageConverter()
df_decoded = df.select(
mc.from_protobuf(df.value, SimpleMessage).alias("value")
)
df_flattened = df_decoded.select("value.*")
df_flattened.show()
# +-----+--------+-------+
# | name|quantity|measure|
# +-----+--------+-------+
# |hello| 5| 12.3|
# +-----+--------+-------+
df_flattened.schema
# StructType(List(StructField(name,StringType,true),StructField(quantity,IntegerType,true),StructField(measure,FloatType,true))
# convert back to encoded message
df_encoded = df_flattened.select(
mc.to_protobuf(struct(df_flattened.value), SimpleMessage).alias("value")
)
It’s relatively straightforward. In order to convert from protobuf to pyspark we just need to map the types. Protobuf maintains a map of C++ types to python types, so we just have to instead map them to pyspark types.
from google.protobuf.descriptor import FieldDescriptor
from pyspark.sql.types import *
# Protobuf types map to these CPP Types. We map
# them to Spark types for generating a spark schema
_CPPTYPE_TO_SPARK_TYPE_MAP: t.Dict[int, t.Type[DataType]] = {
FieldDescriptor.CPPTYPE_INT32: IntegerType,
FieldDescriptor.CPPTYPE_INT64: LongType,
FieldDescriptor.CPPTYPE_UINT32: LongType,
FieldDescriptor.CPPTYPE_UINT64: LongType,
FieldDescriptor.CPPTYPE_DOUBLE: DoubleType,
FieldDescriptor.CPPTYPE_FLOAT: FloatType,
FieldDescriptor.CPPTYPE_BOOL: BooleanType,
FieldDescriptor.CPPTYPE_ENUM: StringType,
FieldDescriptor.CPPTYPE_STRING: StringType,
}
With this mapping we can walk the protobuf Message
descriptor hierarchy and build a spark schema. Using this schema we declare a pyspark udf
to handle the conversion, which is just using protobuf’s MessageToDict
function to deserialize. The deserialized message gets coerced into the map-inferred schema.
pbspark
also supports custom serde of specific message types. In particular, it has opt-in support for working with google.protobuf.Timestamp
conversion to/from python datetime
s.
from pbspark import MessageConverter
mc = MessageConverter()
mc.register_timestamp_serializer() # decode Timestamp to datetimes
mc.register_timestamp_deserializer() # expect datetimes for Timestamp