Skip to content

Manage Large Data

Work effectively with blobs and object storage.

Choose the Right Storage

Data Size Recommended Syntax
< 1 MB Database <blob>
1 MB - 1 GB Hash-addressed <blob@>
> 1 GB Schema-addressed <object@>, <npy@>

Streaming Large Results

Avoid loading everything into memory:

# Bad: loads all data at once
all_data = LargeTable().to_arrays('big_column')

# Good: stream rows lazily (single cursor, one row at a time)
for row in LargeTable():
    process(row['big_column'])

# Good: batch by ID range
keys = LargeTable().keys()
batch_size = 100
for i in range(0, len(keys), batch_size):
    batch_keys = keys[i:i + batch_size]
    data = (LargeTable() & batch_keys).to_arrays('big_column')
    process(data)

Lazy Loading with ObjectRef

<object@> and <filepath@> return lazy references:

# Returns ObjectRef, not the actual data
ref = (Dataset & key).fetch1('large_file')

# Stream without full download
with ref.open('rb') as f:
    # Process in chunks
    while chunk := f.read(1024 * 1024):
        process(chunk)

# Or download when needed
local_path = ref.download('/tmp/working')

Selective Fetching

Fetch only what you need:

# Bad: fetches all columns including blobs
row = MyTable.fetch1()

# Good: fetch only metadata
metadata = (MyTable & key).fetch1('name', 'date', 'status')

# Then fetch blob only if needed
if needs_processing(metadata):
    data = (MyTable & key).fetch1('large_data')

Projection for Efficiency

Exclude large columns from joins:

# Slow: joins include blob columns
result = Table1 * Table2

# Fast: project away blobs before join
result = Table1.proj('id', 'name') * Table2.proj('id', 'status')

Batch Inserts

Insert large data efficiently:

# Good: single transaction for related data
with dj.conn().transaction:
    for item in large_batch:
        MyTable.insert1(item)

Content Deduplication

<blob@> and <attach@> automatically deduplicate within each schema:

# Same array inserted twice
data = np.random.randn(1000, 1000)
Table.insert1({'id': 1, 'data': data})
Table.insert1({'id': 2, 'data': data})  # References same storage

# Only one copy exists in object storage (per schema)

Deduplication is per-schema—identical content in different schemas is stored separately. This enables independent garbage collection per schema.

Storage Cleanup

Object storage items are not automatically deleted with rows. Run garbage collection periodically:

import datajoint as dj

# Objects are NOT automatically deleted with rows
(MyTable & old_data).delete()

# Scan for orphaned items
stats = dj.gc.scan(my_schema)
print(dj.gc.format_stats(stats))

# Remove orphaned items
stats = dj.gc.collect(my_schema, dry_run=False)

See Clean Up Object Storage for details.

Monitor Storage Usage

Check object store size:

# Get store configuration
spec = dj.config.get_object_store_spec()

# For S3/MinIO, use boto3 or similar
# For filesystem, use standard tools

Compression

Blobs are compressed by default:

# Compression happens automatically in <blob>
large_array = np.zeros((10000, 10000))  # Compresses well
sparse_data = np.random.randn(10000, 10000)  # Less compression

Memory Management

For very large computations:

def make(self, key):
    # Process in chunks
    for chunk_idx in range(n_chunks):
        chunk_data = load_chunk(key, chunk_idx)
        result = process(chunk_data)
        save_partial_result(key, chunk_idx, result)
        del chunk_data  # Free memory

    # Combine results
    final = combine_results(key)
    self.insert1({**key, 'result': final})

External Tools for Very Large Data

For datasets too large for DataJoint:

@schema
class LargeDataset(dj.Manual):
    definition = """
    dataset_id : uuid
    ---
    zarr_path : <filepath@>     # Reference to external Zarr
    """

# Store path reference, process with specialized tools
import zarr
store = zarr.open(local_zarr_path)
# ... process with Zarr/Dask ...

LargeDataset.insert1({
    'dataset_id': uuid.uuid4(),
    'zarr_path': local_zarr_path
})

See Also