Skip to content

SparkAdapter Protocol Specification

This document specifies the SparkAdapter Protocol โ€” an opt-in contract that codecs can implement to declare that their decoded values can be expressed as Spark-native types (primitives, lists, dicts, and nested combinations).

The Protocol is intentionally small: a single method, no inheritance, no abstract-method surface area on the existing Codec base class. Consumers detect support via isinstance(codec, SparkAdapter).

New in DataJoint 2.3

Introduced for the Databricks Linked Delta Tables ("silver layer") integration. Independent of any consumer: any codec author can implement to_spark() and any downstream tool can check for it.

For the Codec base class itself, see Codec API Specification. For background on why the protocol is needed, see Spark Adapters and Silver-Layer Publishing.

Why this exists

DataJoint's <blob@> codec stores arbitrary Python values โ€” numpy arrays, lists, dicts, custom objects โ€” as serialized binary. That generality is the right call for most use cases, but it makes the column opaque to consumers that need typed access at query time: SQL engines, dataframe libraries, BI tools, federated query systems. Such consumers can only treat blob columns as BINARY and rely on the application to decode.

The downstream concrete case is Databricks Linked Delta Tables (the "silver layer"): every column must render to Spark-native types so the data is queryable with Spark SQL, exposed through Delta Sharing, and usable by Genie / BI without round-tripping through DataJoint. A BINARY blob doesn't qualify.

SparkAdapter is the minimum framework-side contract to make typed rendering possible without forcing every codec to support it. Generic codecs (<blob@>, <hash@>) remain unsupported by design. Typed codecs (<float_array@>, <image_2d@>, future shapes) opt in by implementing the method.

The protocol

from typing import Any, Protocol, runtime_checkable

@runtime_checkable
class SparkAdapter(Protocol):
    """
    A codec that adapts its decoded values to Spark-native types.

    Opt-in. Codecs implementing this method declare that their decoded
    values can be expressed as primitives, lists, or dicts of the same โ€”
    i.e., shapes that map cleanly to Spark's StructType / ArrayType / MapType.
    """

    def to_spark(self, decoded: Any, *, key: dict | None = None) -> Any: ...

Defined in datajoint.spark and re-exported at the top level as dj.SparkAdapter.

Signature

Parameter Type Description
decoded Any The Python value produced by the codec's decode(). The protocol does not constrain this beyond requiring that the codec can map it to a Spark-native shape.
key dict \| None Optional context dict โ€” same shape as Codec.encode's key parameter. Codecs may use it to resolve per-row context (rare; most codecs ignore it).

Return value

A value composed entirely of:

  • Primitives: bool, int, float, str, bytes, None, datetime.date, datetime.datetime.
  • Lists: list[T] where T is any allowed shape. Maps to Spark ArrayType.
  • Dicts with string keys: dict[str, T] where T is any allowed shape. Maps to Spark StructType (uniform key types per row) or MapType (uniform value types per row) โ€” the consumer decides based on schema inference.

Numpy scalars (np.int32, np.float64, etc.) are accepted but consumers may coerce to native Python types. Numpy arrays must be converted to lists before return โ€” Spark has no representation for np.ndarray.

No tuples, no sets, no custom objects, no callables. The output must be JSON-shaped (plus the binary/date/datetime primitives listed above).

Why a Protocol, not an abstract method on Codec

The earlier framing of this work (#1457, superseded) proposed adding to_spark() as an abstract method on dj.Codec. The current factoring uses a separate Protocol for four reasons:

  1. Smaller OSS surface. Adding an abstract method requires NotImplementedError stubs on every built-in codec โ€” not just <blob@> and <hash@> (which can't render) but also every plugin codec retroactively. The Protocol approach adds ~10 lines total.
  2. Cleaner opt-in semantics. Codec authors implement the method when they want silver-layer eligibility; they don't have to acknowledge it otherwise. Codecs that don't adapt to Spark are invisible to the contract.
  3. No churn for existing plugins. Third-party codecs (e.g. dj-zarr-codecs, dj-photon-codecs) work unchanged. They opt in by adding the method on a future release of their choosing.
  4. Composable with structural typing. Consumers use isinstance(codec, SparkAdapter) (enabled by @runtime_checkable) โ€” no inheritance chain or registration step required.

The Protocol pattern matches Python's Iterable, Sized, Sequence, and the dataclasses.Protocol-style design โ€” DataJoint follows the language convention rather than inventing a new mechanism.

Eligibility detection

Consumers determine whether a column adapts to Spark by checking the codec:

import datajoint as dj
from datajoint.spark import SparkAdapter

attr = table.heading["column_name"]
if attr.codec is not None and isinstance(attr.codec, SparkAdapter):
    # The codec opts in to Spark rendering
    rendered = attr.codec.to_spark(decoded_value)
else:
    # No Spark adapter โ€” consumer falls back to bronze-only handling
    ...

Because @runtime_checkable only verifies that the method exists (not its signature), the check is a structural test, not a behavioral guarantee. Codec authors must produce a Spark-native return value as defined above โ€” the framework cannot enforce this statically.

What's not in this specification

  • Specific Spark-adapter codecs. Codecs like <float_array@>, <int_array@>, <image_2d@>, <labeled_array@>, <timeseries@>, <sparse_matrix@>, <struct@> are intentionally out of scope of datajoint-python. They ship as plugins (registered via the existing codec auto-registration mechanism) so each can evolve independently of the framework. The Protocol is what they implement against.
  • <blob@> and <hash@> rendering. These codecs hold arbitrary Python values; their content can't be assumed to have a Spark-native shape. They do not implement SparkAdapter. Pipeline authors who want silver eligibility migrate columns to a typed codec.
  • Reverse direction (Spark โ†’ DataJoint). Delta consumers query rendered columns via Spark SQL; round-tripping back through DataJoint's codec is not a target of this work. There is no from_spark method.
  • Best-effort BINARY fallback. Codecs either implement SparkAdapter (and produce a Spark-native value) or they don't (and remain bronze-only / non-eligible). No automatic blob โ†’ bytes-passthrough rendering.
  • Schema inference. Consumers infer Spark schemas from sample rendered values; the protocol does not transmit type metadata. (A Codec.spark_schema() companion method may be added in a future release; not in scope here.)
  • Streaming / chunked rendering. to_spark() is invoked per decoded value (per row, per column). Chunked / vectorized rendering is a downstream concern.

Example: implementing a SparkAdapter codec

A plugin codec for 1D float arrays. Shipped in a separate package (e.g. dj-array-codecs), registered via the codec entry-point mechanism.

import datajoint as dj
import numpy as np

class FloatArrayCodec(dj.Codec):
    """1D array of float64. Adapts to Spark ARRAY<DOUBLE>."""

    name = "float_array"

    def get_dtype(self, is_store: bool) -> str:
        return "longblob" if not is_store else "<hash>"

    def encode(self, value, *, key=None, store_name=None) -> bytes:
        return np.asarray(value, dtype=np.float64).tobytes()

    def decode(self, stored: bytes, *, key=None) -> np.ndarray:
        return np.frombuffer(stored, dtype=np.float64)

    def to_spark(self, decoded: np.ndarray, *, key=None) -> list[float]:
        return decoded.tolist()

isinstance(FloatArrayCodec(), dj.SparkAdapter) returns True because the method is present. No subclassing required.

A 2D image codec returning a nested list (Spark ARRAY<ARRAY<DOUBLE>>):

class Image2DCodec(dj.Codec):
    name = "image_2d"

    def encode(self, value, *, key=None, store_name=None) -> bytes: ...
    def decode(self, stored, *, key=None) -> np.ndarray: ...

    def to_spark(self, decoded: np.ndarray, *, key=None) -> list[list[float]]:
        return decoded.tolist()  # 2D ndarray โ†’ nested list

A structured codec rendering to Spark STRUCT<x: DOUBLE, y: DOUBLE, label: STRING>:

class PointWithLabelCodec(dj.Codec):
    name = "labeled_point"

    def encode(self, value, *, key=None, store_name=None) -> bytes: ...
    def decode(self, stored, *, key=None) -> dict: ...

    def to_spark(self, decoded: dict, *, key=None) -> dict[str, Any]:
        return {
            "x": float(decoded["x"]),
            "y": float(decoded["y"]),
            "label": str(decoded["label"]),
        }

Consumer pattern

A simplified silver-layer publish loop (the actual datajoint-databricks consumer is more elaborate):

def publish_row_to_silver(table, key, target_table):
    """Publish one row of `table` (restricted by `key`) to a Spark-renderable target."""
    from datajoint.spark import SparkAdapter

    row = (table & key).fetch1()
    rendered = {}
    for attr_name, value in row.items():
        attr = table.heading[attr_name]
        if attr.codec is not None and isinstance(attr.codec, SparkAdapter):
            rendered[attr_name] = attr.codec.to_spark(value, key=key)
        elif attr.codec is None:
            # Primitive column (no codec) โ€” pass through
            rendered[attr_name] = value
        else:
            # Codec doesn't adapt to Spark โ€” skip this column for silver, or raise
            raise ValueError(
                f"Column {attr_name!r} uses codec {attr.codec.name!r} which "
                f"does not implement SparkAdapter; this row is not eligible "
                f"for silver-layer publish."
            )
    target_table.write(rendered)

The framework provides the protocol and the eligibility check. The publish pipeline lives downstream.

References