Christopher Flynn

Machine Learning
Systems Architect,
PhD Mathematician

Open Source



pbspark - Protocol buffers and pyspark

2021-12-08 Feed

Image by fsHH from Pixabay

When working with spark and strongly typed data, Apache Avro is generally the schema definition and serialization format for transmitting data. Spark has built-in avro support, as does Kafka and its schema registry, with which Spark integrates easily using structured streaming.

A popular alternative to avro is google’s protocol buffers, or protobuf. Compared to avro and other common serialization formats, protobuf is generally more compact, performant, and readable (imho).

Unlike avro, protobuf isn’t supported out of the box in pyspark. However, using the new pbspark package, we can easily perform similar conversions between encoded protobuf messages and pyspark structs.

For example, suppose we have a pyspark DataFrame which contains a column value which has protobuf encoded messages of our SimpleMessage, defined here:

syntax = "proto3";

package example;

message SimpleMessage {
  string name = 1;
  int64 quantity = 2;
  float measure = 3;

Using pbspark we can decode the messages into spark StructType and then flatten them. We can also re-encode them into protobuf messages:

from example.example_pb2 import SimpleMessage
from pbspark import MessageConverter
from pyspark.sql.functions import struct
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

# build a sample dataframe with a single column with a protobuf message
example = SimpleMessage(name="hello", quantity=5, measure=12.3)
data = [{"value": example.SerializeToString()}]
df = spark.createDataFrame(data)

# convert the encoded messages to pyspark structs
mc = MessageConverter()
df_decoded =
    mc.from_protobuf(df.value, SimpleMessage).alias("value")
df_flattened ="value.*")

# +-----+--------+-------+
# | name|quantity|measure|
# +-----+--------+-------+
# |hello|       5|   12.3|
# +-----+--------+-------+

# StructType(List(StructField(name,StringType,true),StructField(quantity,IntegerType,true),StructField(measure,FloatType,true))

# convert back to encoded message
df_encoded =
    mc.to_protobuf(struct(df_flattened.value), SimpleMessage).alias("value")

How does it work?

It’s relatively straightforward. In order to convert from protobuf to pyspark we just need to map the types. Protobuf maintains a map of C++ types to python types, so we just have to instead map them to pyspark types.

from google.protobuf.descriptor import FieldDescriptor
from pyspark.sql.types import *

# Protobuf types map to these CPP Types. We map
# them to Spark types for generating a spark schema
_CPPTYPE_TO_SPARK_TYPE_MAP: t.Dict[int, t.Type[DataType]] = {
    FieldDescriptor.CPPTYPE_INT32: IntegerType,
    FieldDescriptor.CPPTYPE_INT64: LongType,
    FieldDescriptor.CPPTYPE_UINT32: LongType,
    FieldDescriptor.CPPTYPE_UINT64: LongType,
    FieldDescriptor.CPPTYPE_DOUBLE: DoubleType,
    FieldDescriptor.CPPTYPE_FLOAT: FloatType,
    FieldDescriptor.CPPTYPE_BOOL: BooleanType,
    FieldDescriptor.CPPTYPE_ENUM: StringType,
    FieldDescriptor.CPPTYPE_STRING: StringType,

With this mapping we can walk the protobuf Message descriptor hierarchy and build a spark schema. Using this schema we declare a pyspark udf to handle the conversion, which is just using protobuf’s MessageToDict function to deserialize. The deserialized message gets coerced into the map-inferred schema.

pbspark also supports custom serde of specific message types. In particular, it has opt-in support for working with google.protobuf.Timestamp conversion to/from python datetimes.

from pbspark import MessageConverter

mc = MessageConverter()
mc.register_timestamp_serializer()  # decode Timestamp to datetimes
mc.register_timestamp_deserializer()  # expect datetimes for Timestamp

Further reading


protocol buffers



Back to the posts.