Object-Augmented Schemas¶
This tutorial covers DataJoint's Object-Augmented Schema (OAS) model. You'll learn:
- The OAS concept — Unified relational + object storage
- Blobs — Storing arrays and Python objects
- Object storage — Scaling to large datasets
- Staged insert — Writing directly to object storage (Zarr, HDF5)
- Attachments — Preserving file names and formats
- Codecs — How data is serialized and deserialized
In an Object-Augmented Schema, the relational database and object storage operate as a single integrated system—not as separate "internal" and "external" components.
import datajoint as dj
import numpy as np
schema = dj.Schema('tutorial_oas')
# Clean slate: drop existing schema if re-running
schema.drop(prompt=False)
schema = dj.Schema('tutorial_oas')
[2026-02-06 11:45:02] DataJoint 2.1.0 connected to datajoint@127.0.0.1:5432
The Object-Augmented Schema Model¶
Scientific data often combines:
- Structured metadata — Subjects, sessions, parameters (relational)
- Large data objects — Arrays, images, recordings (binary)
DataJoint's OAS model manages both as a unified system:
block-beta
columns 1
block:oas:1
columns 2
OAS["Object-Augmented Schema"]:2
block:db:1
DB["Relational Database"]
DB1["Metadata"]
DB2["Keys"]
DB3["Relationships"]
end
block:os:1
OS["Object Storage (S3/File/etc)"]
OS1["Large arrays"]
OS2["Images/videos"]
OS3["Recordings"]
end
end
From the user's perspective, this is one schema—storage location is transparent.
Blob Attributes¶
Use <blob> to store arbitrary Python objects:
@schema
class Recording(dj.Manual):
definition = """
recording_id : int32
---
metadata : <blob> # Dict, stored in database
waveform : <blob> # NumPy array, stored in database
"""
# Insert with blob data
Recording.insert1({
'recording_id': 1,
'metadata': {'channels': 32, 'sample_rate': 30000, 'duration': 60.0},
'waveform': np.random.randn(32, 30000) # 32 channels x 1 second
})
Recording()
| recording_id | metadata | waveform |
|---|---|---|
| 1 | <blob> | <blob> |
Total: 1
# Fetch blob data
data = (Recording & {'recording_id': 1}).fetch1()
print(f"Metadata: {data['metadata']}")
print(f"Waveform shape: {data['waveform'].shape}")
Metadata: {'channels': 32, 'sample_rate': 30000, 'duration': 60.0}
Waveform shape: (32, 30000)
What Can Be Stored in Blobs?¶
The <blob> codec handles:
- NumPy arrays (any dtype, any shape)
- Python dicts, lists, tuples, sets
- Strings, bytes, integers, floats
- datetime objects and UUIDs
- Nested combinations of the above
Note: Pandas DataFrames should be converted before storage (e.g., df.to_dict() or df.to_records()).
@schema
class AnalysisResult(dj.Manual):
definition = """
result_id : int32
---
arrays : <blob>
nested_data : <blob>
"""
# Store complex data structures
arrays = {'x': np.array([1, 2, 3]), 'y': np.array([4, 5, 6])}
nested = {'arrays': [np.array([1, 2]), np.array([3, 4])], 'params': {'a': 1, 'b': 2}}
AnalysisResult.insert1({
'result_id': 1,
'arrays': arrays,
'nested_data': nested
})
# Fetch back
result = (AnalysisResult & {'result_id': 1}).fetch1()
print(f"Arrays type: {type(result['arrays'])}")
print(f"Arrays keys: {result['arrays'].keys()}")
Arrays type: <class 'dict'> Arrays keys: dict_keys(['x', 'y'])
import tempfile
import os
# Create a store for this tutorial
store_path = tempfile.mkdtemp(prefix='dj_store_')
# Configure a named store for this tutorial
dj.config.stores['tutorial'] = {
'protocol': 'file',
'location': store_path
}
print(f"Store configured at: {store_path}")
Store configured at: /var/folders/cn/dpwf5t7j3gd8gzyw2r7dhm8r0000gn/T/dj_store_x2x_aatv
Using Object Storage¶
@schema
class LargeRecording(dj.Manual):
definition = """
recording_id : int32
---
small_data : <blob> # In database (small)
large_data : <blob@tutorial> # In object storage (large)
"""
# Insert data - usage is identical regardless of storage
small = np.random.randn(10, 10)
large = np.random.randn(1000, 1000) # ~8 MB array
LargeRecording.insert1({
'recording_id': 1,
'small_data': small,
'large_data': large
})
LargeRecording()
| recording_id | small_data | large_data |
|---|---|---|
| 1 | <blob> | <blob> |
Total: 1
# Fetch is also identical - storage is transparent
data = (LargeRecording & {'recording_id': 1}).fetch1()
print(f"Small data shape: {data['small_data'].shape}")
print(f"Large data shape: {data['large_data'].shape}")
Small data shape: (10, 10) Large data shape: (1000, 1000)
# Objects are stored in the configured location
for root, dirs, files in os.walk(store_path):
for f in files:
path = os.path.join(root, f)
size = os.path.getsize(path)
print(f"{os.path.relpath(path, store_path)}: {size:,} bytes")
_hash/tutorial_oas/yqn2t7i7vajacorzpk3ipwfmkm: 7,685,032 bytes
Hash-Addressed Storage¶
<blob@> uses hash-addressed storage. Data is identified by a Base32-encoded MD5 hash, enabling automatic deduplication—identical data is stored only once:
# Insert the same data twice
shared_data = np.ones((500, 500))
LargeRecording.insert([
{'recording_id': 2, 'small_data': small, 'large_data': shared_data},
{'recording_id': 3, 'small_data': small, 'large_data': shared_data}, # Same!
])
print(f"Rows in table: {len(LargeRecording())}")
# Deduplication: identical data stored once
files = [f for _, _, fs in os.walk(store_path) for f in fs]
print(f"Files in store: {len(files)}")
Rows in table: 3 Files in store: 2
Schema-Addressed Storage with <object@>¶
While <blob@> uses hash-addressed storage with deduplication, <object@> uses schema-addressed storage where each row has its own dedicated storage path:
| Aspect | <blob@> |
<object@> |
|---|---|---|
| Addressing | By content hash | By primary key |
| Deduplication | Yes | No |
| Deletion | Garbage collected | With row |
| Use case | Arrays, serialized objects | Zarr, HDF5, multi-file outputs |
Use <object@> when you need:
- Hierarchical formats like Zarr or HDF5
- Direct write access during data generation
- Each row to have its own isolated storage location
@schema
class ImagingSession(dj.Manual):
definition = """
subject_id : int32
session_id : int32
---
n_frames : int32
frame_rate : decimal(4,1)
frames : <object@tutorial> # Zarr array stored at path derived from PK
"""
Staged Insert for Direct Object Storage Writes¶
For large datasets like multi-GB imaging recordings, copying data from local storage to object storage is inefficient. The staged_insert1 context manager lets you write directly to object storage before finalizing the database insert:
- Set primary key values in
staged.rec - Get a storage handle with
staged.store(field, extension) - Write data directly (e.g., with Zarr)
- On successful exit, metadata is computed and the record is inserted
import zarr
# Simulate acquiring imaging data frame-by-frame
n_frames = 100
height, width = 512, 512
with ImagingSession.staged_insert1 as staged:
# Set primary key values first
staged.rec['subject_id'] = 1
staged.rec['session_id'] = 1
# Get storage handle for the object field
store = staged.store('frames', '.zarr')
# Create Zarr array directly in object storage
z = zarr.open(store, mode='w', shape=(n_frames, height, width),
chunks=(10, height, width), dtype='int32')
# Write frames as they are "acquired"
for i in range(n_frames):
frame = np.random.randint(0, 4096, (height, width), dtype='int32')
z[i] = frame
# Set remaining attributes
staged.rec['n_frames'] = n_frames
staged.rec['frame_rate'] = 30.0
# Record is now inserted with metadata computed from the Zarr
ImagingSession()
| subject_id | session_id | n_frames | frame_rate | frames |
|---|---|---|---|---|
| 1 | 1 | 100 | 30.0 | <object> |
Total: 1
# Fetch returns an ObjectRef for lazy access
ref = (ImagingSession & {'subject_id': 1, 'session_id': 1}).fetch1('frames')
print(f"Type: {type(ref).__name__}")
print(f"Path: {ref.path}")
# Open as Zarr array (data stays in object storage)
z = zarr.open(ref.fsmap, mode='r')
print(f"Shape: {z.shape}")
print(f"Chunks: {z.chunks}")
print(f"First frame mean: {z[0].mean():.1f}")
Type: ObjectRef Path: tutorial_oas/ImagingSession/subject_id=1/session_id=1/frames_sWRCkZuy.zarr Shape: (100, 512, 512) Chunks: (10, 512, 512) First frame mean: 2045.9
Benefits of Staged Insert¶
- No intermediate copies — Data flows directly to object storage
- Streaming writes — Write frame-by-frame as data is acquired
- Atomic transactions — If an error occurs, storage is cleaned up automatically
- Automatic metadata — File sizes and manifests are computed on finalize
Use staged_insert1 when:
- Data is too large to hold in memory
- You're generating data incrementally (e.g., during acquisition)
- You need direct control over storage format (Zarr chunks, HDF5 datasets)
Attachments¶
Use <attach> to store files with their original names preserved:
@schema
class Document(dj.Manual):
definition = """
doc_id : int32
---
report : <attach@tutorial>
"""
# Create a sample file
sample_file = os.path.join(tempfile.gettempdir(), 'analysis_report.txt')
with open(sample_file, 'w') as f:
f.write('Analysis Results\n')
f.write('================\n')
f.write('Accuracy: 95.2%\n')
# Insert using file path directly
Document.insert1({
'doc_id': 1,
'report': sample_file # Just pass the path
})
Document()
| doc_id | report |
|---|---|
| 1 | <attach> |
Total: 1
# Fetch returns path to extracted file
doc_path = (Document & {'doc_id': 1}).fetch1('report')
print(f"Type: {type(doc_path)}")
print(f"Path: {doc_path}")
# Read the content
with open(doc_path, 'r') as f:
print(f"Content:\n{f.read()}")
Type: <class 'str'> Path: analysis_report.txt Content: Analysis Results ================ Accuracy: 95.2%
Codec Summary¶
| Codec | Syntax | Description |
|---|---|---|
<blob> |
In database | Python objects, arrays |
<blob@> |
Default store | Large objects, hash-addressed |
<blob@name> |
Named store | Specific storage tier |
<attach> |
In database | Files with names |
<attach@name> |
Named store | Large files with names |
<object@name> |
Named store | Path-addressed (Zarr, etc.) |
<filepath@name> |
Named store | References to existing files |
Computed Tables with Large Data¶
Computed tables commonly produce large results:
@schema
class ProcessedRecording(dj.Computed):
definition = """
-> LargeRecording
---
filtered : <blob@tutorial> # Result in object storage
mean_value : float64
"""
def make(self, key):
# Fetch source data
data = (LargeRecording & key).fetch1('large_data')
# Process
from scipy.ndimage import gaussian_filter
filtered = gaussian_filter(data, sigma=2)
self.insert1({
**key,
'filtered': filtered,
'mean_value': float(np.mean(filtered))
})
ProcessedRecording.populate(display_progress=True)
ProcessedRecording()
ProcessedRecording: 0%| | 0/3 [00:00<?, ?it/s]
ProcessedRecording: 33%|███▎ | 1/3 [00:00<00:01, 1.76it/s]
ProcessedRecording: 100%|██████████| 3/3 [00:00<00:00, 5.02it/s]
| recording_id | filtered | mean_value |
|---|---|---|
| 1 | <blob> | -0.001368765720450917 |
| 2 | <blob> | 1.0000000000000002 |
| 3 | <blob> | 1.0000000000000002 |
Total: 3
# Fetch only scalar metadata (fast)
meta = (ProcessedRecording & {'recording_id': 1}).fetch1('mean_value')
print(f"Mean value: {meta}")
Mean value: -0.001368765720450917
# Fetch large data only when needed
filtered = (ProcessedRecording & {'recording_id': 1}).fetch1('filtered')
print(f"Filtered shape: {filtered.shape}")
Filtered shape: (1000, 1000)
Project Away Large Columns Before Joins¶
# Efficient: project to scalar columns before join
result = LargeRecording.proj('recording_id') * ProcessedRecording.proj('mean_value')
result
| recording_id | mean_value |
|---|---|
| 1 | -0.001368765720450917 |
| 2 | 1.0000000000000002 |
| 3 | 1.0000000000000002 |
Total: 3
Best Practices¶
1. Choose Storage Based on Size¶
# Small objects (< 1 MB): no @
parameters : <blob>
# Large objects (> 1 MB): use @
raw_data : <blob@>
2. Use Named Stores for Different Tiers¶
# Fast local storage for active data
working_data : <blob@fast>
# Cold storage for archives
archived_data : <blob@archive>
3. Separate Queryable Metadata from Large Data¶
@schema
class Experiment(dj.Manual):
definition = """
exp_id : int32
---
# Queryable metadata
date : date
duration : decimal(5,1)
n_trials : int32
# Large data
raw_data : <blob@>
"""
4. Use Attachments for Files¶
# Preserves filename
video : <attach@>
config_file : <attach@>
Garbage Collection¶
Hash-addressed storage (<blob@>, <attach@>, <hash@>) uses deduplication—identical content is stored once. This means deleting a row doesn't automatically delete the stored content, since other rows might reference it.
Use garbage collection to clean up orphaned content:
import datajoint as dj
# Preview what would be deleted (dry run)
stats = dj.gc.collect(dry_run=True)
print(f"Orphaned items: {stats['orphaned']}")
print(f"Space to reclaim: {stats['orphaned_bytes'] / 1e6:.1f} MB")
# Actually delete orphaned content
stats = dj.gc.collect()
print(f"Deleted: {stats['deleted']} items")
When to Run Garbage Collection¶
- After bulk deletions — Clean up storage after removing many rows
- Periodically — Schedule weekly/monthly cleanup jobs
- Before archiving — Reclaim space before backups
Key Points¶
- GC only affects hash-addressed types (
<blob@>,<attach@>,<hash@>) - Schema-addressed types (
<object@>,<npy@>) are deleted with their rows - Always use
dry_run=Truefirst to preview changes - GC is safe—it only deletes content with zero references
See Clean Up Storage for detailed usage.
Quick Reference¶
| Pattern | Use Case |
|---|---|
<blob> |
Small Python objects |
<blob@> |
Large arrays with deduplication |
<blob@store> |
Large arrays in specific store |
<attach@store> |
Files preserving names |
<object@store> |
Schema-addressed data (Zarr, HDF5) |
Next Steps¶
- Configure Object Storage — Set up S3, MinIO, or filesystem stores
- Clean Up Storage — Garbage collection for hash-addressed storage
- Custom Codecs — Define domain-specific types
- Manage Large Data — Performance optimization
# Cleanup
schema.drop(prompt=False)
import shutil
shutil.rmtree(store_path, ignore_errors=True)