Skip to content

Codec API Specification¶

This document specifies the DataJoint Codec API for creating custom attribute types. For the complete type system architecture (core types, built-in codecs, storage modes), see the Type System Specification.

Overview¶

Codecs define bidirectional conversion between Python objects and database storage. They enable storing complex data types (graphs, models, custom formats) while maintaining DataJoint's query capabilities.

flowchart LR
    A["Python Object
(e.g. Graph)"] -- encode --> B["Storage Type
(e.g. bytes)"] B -- decode --> A

Two Patterns for Custom Codecs¶

There are two approaches for creating custom codecs:

Pattern When to Use Base Class
Type Chaining Transform Python objects, use existing storage dj.Codec
SchemaCodec Subclassing Custom file formats with schema-addressed paths dj.SchemaCodec

Pattern 1: Type Chaining (Most Common)¶

Chain to an existing codec for storage. Your codec transforms objects; the chained codec handles storage.

import datajoint as dj
import networkx as nx

class GraphCodec(dj.Codec):
    """Store NetworkX graphs."""

    name = "graph"  # Use as <graph> in definitions

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"  # Delegate to blob for serialization

    def encode(self, graph, *, key=None, store_name=None):
        return {
            'nodes': list(graph.nodes(data=True)),
            'edges': list(graph.edges(data=True)),
        }

    def decode(self, stored, *, key=None):
        G = nx.Graph()
        G.add_nodes_from(stored['nodes'])
        G.add_edges_from(stored['edges'])
        return G

# Use in table definition
@schema
class Connectivity(dj.Manual):
    definition = '''
    conn_id : int
    ---
    network : <graph>      # in-table storage
    network_ext : <graph@>  # object store
    '''

Pattern 2: SchemaCodec Subclassing (File Formats)¶

For custom file formats that need schema-addressed storage paths.

import datajoint as dj

class ParquetCodec(dj.SchemaCodec):
    """Store DataFrames as Parquet files."""

    name = "parquet"

    # get_dtype inherited: returns "json", requires @

    def encode(self, df, *, key=None, store_name=None):
        import io
        schema, table, field, pk = self._extract_context(key)
        path, _ = self._build_path(schema, table, field, pk, ext=".parquet")
        backend = self._get_backend(store_name)

        buffer = io.BytesIO()
        df.to_parquet(buffer)
        backend.put_buffer(buffer.getvalue(), path)

        return {"path": path, "store": store_name, "shape": list(df.shape)}

    def decode(self, stored, *, key=None):
        return ParquetRef(stored, self._get_backend(stored.get("store")))

# Use in table definition (store only)
@schema
class Results(dj.Manual):
    definition = '''
    result_id : int
    ---
    data : <parquet@>
    '''

The Codec Base Class¶

All custom codecs inherit from dj.Codec:

class Codec(ABC):
    """Base class for codec types."""

    name: str | None = None  # Required: unique identifier

    def get_dtype(self, is_store: bool) -> str:
        """Return the storage dtype."""
        raise NotImplementedError

    @abstractmethod
    def encode(self, value, *, key=None, store_name=None) -> Any:
        """Encode Python value for storage."""
        ...

    @abstractmethod
    def decode(self, stored, *, key=None) -> Any:
        """Decode stored value back to Python."""
        ...

    def validate(self, value) -> None:
        """Optional: validate value before encoding."""
        pass

The SchemaCodec Base Class¶

For schema-addressed storage (file formats), inherit from dj.SchemaCodec:

class SchemaCodec(Codec, register=False):
    """Base class for schema-addressed codecs."""

    def get_dtype(self, is_store: bool) -> str:
        """Store only, returns 'json'."""
        if not is_store:
            raise DataJointError(f"<{self.name}> requires @ (store only)")
        return "json"

    def _extract_context(self, key: dict) -> tuple[str, str, str, dict]:
        """Parse key into (schema, table, field, primary_key)."""
        ...

    def _build_path(self, schema, table, field, pk, ext=None) -> tuple[str, str]:
        """Build schema-addressed path: {schema}/{table}/{pk}/{field}{ext}"""
        ...

    def _get_backend(self, store_name: str = None):
        """Get storage backend by name."""
        ...

Required Components¶

1. The name Attribute¶

The name class attribute is a unique identifier used in table definitions with <name> syntax:

class MyCodec(dj.Codec):
    name = "mycodec"  # Use as <mycodec> in definitions

Naming conventions: - Use lowercase with underscores: spike_train, graph_embedding - Avoid generic names that might conflict: prefer lab_model over model - Names must be unique across all registered codecs

2. The get_dtype() Method¶

Returns the underlying storage type. The is_store parameter indicates whether the @ modifier is present in the table definition:

def get_dtype(self, is_store: bool) -> str:
    """
    Args:
        is_store: True if @ modifier present (e.g., <mycodec@store>)

    Returns:
        - A core type: "bytes", "json", "varchar(N)", "int32", etc.
        - Another codec: "<blob>", "<hash>", etc.

    Raises:
        DataJointError: If store not supported but @ is present
    """

Examples:

# Simple: always store as bytes
def get_dtype(self, is_store: bool) -> str:
    return "bytes"

# Different behavior for in-table/store
def get_dtype(self, is_store: bool) -> str:
    return "<hash>" if is_store else "bytes"

# Store-only codec
def get_dtype(self, is_store: bool) -> str:
    if not is_store:
        raise DataJointError("<object> requires @ (store only)")
    return "json"

3. The encode() Method¶

Converts Python objects to the format expected by get_dtype():

def encode(self, value: Any, *, key: dict | None = None, store_name: str | None = None) -> Any:
    """
    Args:
        value: The Python object to store
        key: Primary key values (for context-dependent encoding)
        store_name: Target store name (for in-store storage)

    Returns:
        Value in the format expected by get_dtype()
    """

4. The decode() Method¶

Converts stored values back to Python objects:

def decode(self, stored: Any, *, key: dict | None = None) -> Any:
    """
    Args:
        stored: Data retrieved from storage
        key: Primary key values (for context-dependent decoding)

    Returns:
        The reconstructed Python object
    """

5. The validate() Method (Optional)¶

Called automatically before encode() during INSERT operations:

def validate(self, value: Any) -> None:
    """
    Args:
        value: The value to validate

    Raises:
        TypeError: If the value has an incompatible type
        ValueError: If the value fails domain validation
    """
    if not isinstance(value, ExpectedType):
        raise TypeError(f"Expected ExpectedType, got {type(value).__name__}")

Auto-Registration¶

Codecs automatically register when their class is defined. No decorator needed:

# This codec is registered automatically when the class is defined
class MyCodec(dj.Codec):
    name = "mycodec"
    # ...

Skipping Registration¶

For abstract base classes that shouldn't be registered:

class BaseCodec(dj.Codec, register=False):
    """Abstract base - not registered."""
    name = None  # Or omit entirely

class ConcreteCodec(BaseCodec):
    name = "concrete"  # This one IS registered
    # ...

Registration Timing¶

Codecs are registered at class definition time. Ensure your codec classes are imported before any table definitions that use them:

# myproject/codecs.py
class GraphCodec(dj.Codec):
    name = "graph"
    ...

# myproject/tables.py
import myproject.codecs  # Ensure codecs are registered

@schema
class Networks(dj.Manual):
    definition = '''
    id : int
    ---
    network : <graph>
    '''

Codec Composition (Chaining)¶

Codecs can delegate to other codecs by returning <codec_name> from get_dtype(). This enables layered functionality:

class CompressedJsonCodec(dj.Codec):
    """Compress JSON data with zlib."""

    name = "zjson"

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"  # Delegate serialization to blob codec

    def encode(self, value, *, key=None, store_name=None):
        import json, zlib
        json_bytes = json.dumps(value).encode('utf-8')
        return zlib.compress(json_bytes)

    def decode(self, stored, *, key=None):
        import json, zlib
        json_bytes = zlib.decompress(stored)
        return json.loads(json_bytes.decode('utf-8'))

How Chaining Works¶

When DataJoint encounters <zjson>:

  1. Calls ZjsonCodec.get_dtype(is_store=False) → returns "<blob>"
  2. Calls BlobCodec.get_dtype(is_store=False) → returns "bytes"
  3. Final storage type is bytes (LONGBLOB in MySQL)

During INSERT: 1. ZjsonCodec.encode() converts Python dict → compressed bytes 2. BlobCodec.encode() packs bytes → DJ blob format 3. Stored in database

During FETCH: 1. Read from database 2. BlobCodec.decode() unpacks DJ blob → compressed bytes 3. ZjsonCodec.decode() decompresses → Python dict

Built-in Codec Chains¶

DataJoint's built-in codecs form these chains:

Codec Chain Final Storage
<blob> <blob> → bytes Inline
<blob@> <blob> → <hash> → json Store (hash-addressed)
<attach> <attach> → bytes Inline
<attach@> <attach> → <hash> → json Store (hash-addressed)
<hash@> <hash> → json Store only (hash-addressed)
<object@> <object> → json Store only (schema-addressed)
<npy@> <npy> → json Store only (schema-addressed)
<filepath@> <filepath> → json Store only (external ref)

Store Name Propagation¶

When using object storage (@), the store name propagates through the chain:

# Table definition
data : <mycodec@coldstore>

# Resolution:
# 1. MyCodec.get_dtype(is_store=True) → "<blob>"
# 2. BlobCodec.get_dtype(is_store=True) → "<hash>"
# 3. HashCodec.get_dtype(is_store=True) → "json"
# 4. store_name="coldstore" passed to HashCodec.encode()

Plugin System (Entry Points)¶

Codecs can be distributed as installable packages using Python entry points.

Package Structure¶

dj-graph-codecs/
├── pyproject.toml
└── src/
    └── dj_graph_codecs/
        ├── __init__.py
        └── codecs.py

pyproject.toml¶

[project]
name = "dj-graph-codecs"
version = "1.0.0"
dependencies = ["datajoint>=2.0", "networkx"]

[project.entry-points."datajoint.codecs"]
graph = "dj_graph_codecs.codecs:GraphCodec"
weighted_graph = "dj_graph_codecs.codecs:WeightedGraphCodec"

Codec Implementation¶

# src/dj_graph_codecs/codecs.py
import datajoint as dj
import networkx as nx

class GraphCodec(dj.Codec):
    name = "graph"

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"

    def encode(self, graph, *, key=None, store_name=None):
        return {
            'nodes': list(graph.nodes(data=True)),
            'edges': list(graph.edges(data=True)),
        }

    def decode(self, stored, *, key=None):
        G = nx.Graph()
        G.add_nodes_from(stored['nodes'])
        G.add_edges_from(stored['edges'])
        return G

class WeightedGraphCodec(dj.Codec):
    name = "weighted_graph"

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"

    def encode(self, graph, *, key=None, store_name=None):
        return {
            'nodes': list(graph.nodes(data=True)),
            'edges': [(u, v, d) for u, v, d in graph.edges(data=True)],
        }

    def decode(self, stored, *, key=None):
        G = nx.Graph()
        G.add_nodes_from(stored['nodes'])
        for u, v, d in stored['edges']:
            G.add_edge(u, v, **d)
        return G

Usage After Installation¶

pip install dj-graph-codecs
# Codecs are automatically discovered and available
@schema
class Networks(dj.Manual):
    definition = '''
    network_id : int
    ---
    topology : <graph>
    weights : <weighted_graph>
    '''

Entry Point Discovery¶

DataJoint loads entry points lazily when a codec is first requested:

  1. Check explicit registry (codecs defined in current process)
  2. Load entry points from datajoint.codecs group
  3. Also checks legacy datajoint.types group for compatibility

API Reference¶

Module Functions¶

import datajoint as dj

# List all registered codec names
dj.list_codecs()  # Returns: ['blob', 'hash', 'object', 'attach', 'filepath', ...]

# Get a codec instance by name
codec = dj.get_codec("blob")
codec = dj.get_codec("<blob>")  # Angle brackets are optional
codec = dj.get_codec("<blob@store>")  # Store parameter is stripped

Internal Functions (for advanced use)¶

from datajoint.codecs import (
    is_codec_registered,  # Check if codec exists
    unregister_codec,     # Remove codec (testing only)
    resolve_dtype,        # Resolve codec chain
    parse_type_spec,      # Parse "<name@store>" syntax
)

Built-in Codecs¶

DataJoint provides these built-in codecs. See the Type System Specification for detailed behavior and implementation.

Codec Inline Store Addressing Description
<blob> bytes <hash@> Hash DataJoint serialization for Python objects
<attach> bytes <hash@> Hash File attachments with filename preserved
<hash@> N/A json Hash Hash-addressed storage with MD5 deduplication
<object@> N/A json Schema Schema-addressed storage for files/folders
<npy@> N/A json Schema Schema-addressed storage for numpy arrays
<filepath@> N/A json Reference Reference to existing files in store

Addressing schemes: - Hash-addressed: Path from content hash. Automatic deduplication. - Schema-addressed: Path mirrors database structure. One location per entity.

Complete Examples¶

Example 1: Simple Serialization¶

import datajoint as dj
import numpy as np

class SpikeTrainCodec(dj.Codec):
    """Efficient storage for sparse spike timing data."""

    name = "spike_train"

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"

    def validate(self, value):
        if not isinstance(value, np.ndarray):
            raise TypeError("Expected numpy array of spike times")
        if value.ndim != 1:
            raise ValueError("Spike train must be 1-dimensional")
        if len(value) > 1 and not np.all(np.diff(value) >= 0):
            raise ValueError("Spike times must be sorted")

    def encode(self, spike_times, *, key=None, store_name=None):
        # Store as differences (smaller values, better compression)
        return np.diff(spike_times, prepend=0).astype(np.float32)

    def decode(self, stored, *, key=None):
        # Reconstruct original spike times
        return np.cumsum(stored).astype(np.float64)

Example 2: In-Store Storage¶

import datajoint as dj
import pickle

class ModelCodec(dj.Codec):
    """Store ML models with optional in-store storage."""

    name = "model"

    def get_dtype(self, is_store: bool) -> str:
        # Use hash-addressed storage for large models
        return "<hash>" if is_store else "<blob>"

    def encode(self, model, *, key=None, store_name=None):
        return pickle.dumps(model, protocol=pickle.HIGHEST_PROTOCOL)

    def decode(self, stored, *, key=None):
        return pickle.loads(stored)

    def validate(self, value):
        # Check that model has required interface
        if not hasattr(value, 'predict'):
            raise TypeError("Model must have a predict() method")

Usage:

@schema
class Models(dj.Manual):
    definition = '''
    model_id : int
    ---
    small_model : <model>         # In-table storage
    large_model : <model@>        # In-store (default store)
    archive_model : <model@cold>  # In-store (specific store)
    '''

Example 3: JSON with Schema Validation¶

import datajoint as dj
import jsonschema

class ConfigCodec(dj.Codec):
    """Store validated JSON configuration."""

    name = "config"

    SCHEMA = {
        "type": "object",
        "properties": {
            "version": {"type": "integer", "minimum": 1},
            "settings": {"type": "object"},
        },
        "required": ["version", "settings"],
    }

    def get_dtype(self, is_store: bool) -> str:
        return "json"

    def validate(self, value):
        jsonschema.validate(value, self.SCHEMA)

    def encode(self, config, *, key=None, store_name=None):
        return config  # JSON type handles serialization

    def decode(self, stored, *, key=None):
        return stored

Example 4: Context-Dependent Encoding¶

import datajoint as dj

class VersionedDataCodec(dj.Codec):
    """Handle different encoding versions based on primary key."""

    name = "versioned"

    def get_dtype(self, is_store: bool) -> str:
        return "<blob>"

    def encode(self, value, *, key=None, store_name=None):
        version = key.get("schema_version", 1) if key else 1
        if version >= 2:
            return {"v": 2, "data": self._encode_v2(value)}
        return {"v": 1, "data": self._encode_v1(value)}

    def decode(self, stored, *, key=None):
        version = stored.get("v", 1)
        if version >= 2:
            return self._decode_v2(stored["data"])
        return self._decode_v1(stored["data"])

    def _encode_v1(self, value):
        return value

    def _decode_v1(self, data):
        return data

    def _encode_v2(self, value):
        # New encoding format
        return {"optimized": True, "payload": value}

    def _decode_v2(self, data):
        return data["payload"]

Example 5: External-Only Codec¶

import datajoint as dj
from pathlib import Path

class ZarrCodec(dj.Codec):
    """Store Zarr arrays in object storage."""

    name = "zarr"

    def get_dtype(self, is_store: bool) -> str:
        if not is_store:
            raise dj.DataJointError("<zarr> requires @ (in-store only)")
        return "<object>"  # Delegate to object storage

    def encode(self, value, *, key=None, store_name=None):
        import zarr
        import tempfile

        # If already a path, pass through
        if isinstance(value, (str, Path)):
            return str(value)

        # If zarr array, save to temp and return path
        if isinstance(value, zarr.Array):
            tmpdir = tempfile.mkdtemp()
            path = Path(tmpdir) / "data.zarr"
            zarr.save(path, value)
            return str(path)

        raise TypeError(f"Expected zarr.Array or path, got {type(value)}")

    def decode(self, stored, *, key=None):
        # ObjectCodec returns ObjectRef, use its fsmap for zarr
        import zarr
        return zarr.open(stored.fsmap, mode='r')

Best Practices¶

1. Choose Appropriate Storage Types¶

Data Type Recommended get_dtype()
Python objects (dicts, arrays) "<blob>"
Large binary data "<hash>" (external)
Files/folders (Zarr, HDF5) "<object>" (external)
Simple JSON-serializable "json"
Short strings "varchar(N)"
Numeric identifiers "int32", "int64"

2. Handle None Values¶

Nullable columns may pass None to your codec:

def encode(self, value, *, key=None, store_name=None):
    if value is None:
        return None  # Pass through for nullable columns
    return self._actual_encode(value)

def decode(self, stored, *, key=None):
    if stored is None:
        return None
    return self._actual_decode(stored)

3. Test Round-Trips¶

Always verify that decode(encode(x)) == x:

def test_codec_roundtrip():
    codec = MyCodec()

    test_values = [
        {"key": "value"},
        [1, 2, 3],
        np.array([1.0, 2.0]),
    ]

    for original in test_values:
        encoded = codec.encode(original)
        decoded = codec.decode(encoded)
        assert decoded == original or np.array_equal(decoded, original)

4. Include Validation¶

Catch errors early with validate():

def validate(self, value):
    if not isinstance(value, ExpectedType):
        raise TypeError(f"Expected ExpectedType, got {type(value).__name__}")

    if not self._is_valid(value):
        raise ValueError("Value fails validation constraints")

5. Document Expected Formats¶

Include docstrings explaining input/output formats:

class MyCodec(dj.Codec):
    """
    Store MyType objects.

    Input format (encode):
        MyType instance with attributes: x, y, z

    Storage format:
        Dict with keys: 'x', 'y', 'z'

    Output format (decode):
        MyType instance reconstructed from storage
    """

6. Consider Versioning¶

If your encoding format might change:

def encode(self, value, *, key=None, store_name=None):
    return {
        "_version": 2,
        "_data": self._encode_v2(value),
    }

def decode(self, stored, *, key=None):
    version = stored.get("_version", 1)
    data = stored.get("_data", stored)

    if version == 1:
        return self._decode_v1(data)
    return self._decode_v2(data)

Error Handling¶

Common Errors¶

Error Cause Solution
Unknown codec: <name> Codec not registered Import module defining codec before table definition
Codec <name> already registered Duplicate name Use unique names; check for conflicts
<codec> requires @ In-store-only codec used without @ Add @ or @store to attribute type
Circular codec reference Codec chain forms a loop Check get_dtype() return values

Debugging¶

# Check what codecs are registered
print(dj.list_codecs())

# Inspect a codec
codec = dj.get_codec("mycodec")
print(f"Name: {codec.name}")
print(f"In-table dtype: {codec.get_dtype(is_store=False)}")
print(f"In-store dtype: {codec.get_dtype(is_store=True)}")

# Resolve full chain
from datajoint.codecs import resolve_dtype
final_type, chain, store = resolve_dtype("<mycodec@store>")
print(f"Final storage type: {final_type}")
print(f"Codec chain: {[c.name for c in chain]}")
print(f"Store: {store}")