Manage Large Data¶
Work effectively with blobs and object storage.
Choose the Right Storage¶
| Data Size | Recommended | Syntax |
|---|---|---|
| < 1 MB | Database | <blob> |
| 1 MB - 1 GB | Hash-addressed | <blob@> |
| > 1 GB | Schema-addressed | <object@>, <npy@> |
Streaming Large Results¶
Avoid loading everything into memory:
# Bad: loads all data at once
all_data = LargeTable().to_arrays('big_column')
# Good: stream rows lazily (single cursor, one row at a time)
for row in LargeTable():
process(row['big_column'])
# Good: batch by ID range
keys = LargeTable().keys()
batch_size = 100
for i in range(0, len(keys), batch_size):
batch_keys = keys[i:i + batch_size]
data = (LargeTable() & batch_keys).to_arrays('big_column')
process(data)
Lazy Loading with ObjectRef¶
<object@> and <filepath@> return lazy references:
# Returns ObjectRef, not the actual data
ref = (Dataset & key).fetch1('large_file')
# Stream without full download
with ref.open('rb') as f:
# Process in chunks
while chunk := f.read(1024 * 1024):
process(chunk)
# Or download when needed
local_path = ref.download('/tmp/working')
Selective Fetching¶
Fetch only what you need:
# Bad: fetches all columns including blobs
row = MyTable.fetch1()
# Good: fetch only metadata
metadata = (MyTable & key).fetch1('name', 'date', 'status')
# Then fetch blob only if needed
if needs_processing(metadata):
data = (MyTable & key).fetch1('large_data')
Projection for Efficiency¶
Exclude large columns from joins:
# Slow: joins include blob columns
result = Table1 * Table2
# Fast: project away blobs before join
result = Table1.proj('id', 'name') * Table2.proj('id', 'status')
Batch Inserts¶
Insert large data efficiently:
# Good: single transaction for related data
with dj.conn().transaction:
for item in large_batch:
MyTable.insert1(item)
Content Deduplication¶
<blob@> and <attach@> automatically deduplicate within each schema:
# Same array inserted twice
data = np.random.randn(1000, 1000)
Table.insert1({'id': 1, 'data': data})
Table.insert1({'id': 2, 'data': data}) # References same storage
# Only one copy exists in object storage (per schema)
Deduplication is per-schema—identical content in different schemas is stored separately. This enables independent garbage collection per schema.
Storage Cleanup¶
Object storage items are not automatically deleted with rows. Run garbage collection periodically:
import datajoint as dj
# Objects are NOT automatically deleted with rows
(MyTable & old_data).delete()
# Scan for orphaned items
stats = dj.gc.scan(my_schema)
print(dj.gc.format_stats(stats))
# Remove orphaned items
stats = dj.gc.collect(my_schema, dry_run=False)
See Clean Up Object Storage for details.
Monitor Storage Usage¶
Check object store size:
# Get store configuration
spec = dj.config.get_object_store_spec()
# For S3/MinIO, use boto3 or similar
# For filesystem, use standard tools
Compression¶
Blobs are compressed by default:
# Compression happens automatically in <blob>
large_array = np.zeros((10000, 10000)) # Compresses well
sparse_data = np.random.randn(10000, 10000) # Less compression
Memory Management¶
For very large computations:
def make(self, key):
# Process in chunks
for chunk_idx in range(n_chunks):
chunk_data = load_chunk(key, chunk_idx)
result = process(chunk_data)
save_partial_result(key, chunk_idx, result)
del chunk_data # Free memory
# Combine results
final = combine_results(key)
self.insert1({**key, 'result': final})
External Tools for Very Large Data¶
For datasets too large for DataJoint:
@schema
class LargeDataset(dj.Manual):
definition = """
dataset_id : uuid
---
zarr_path : <filepath@> # Reference to external Zarr
"""
# Store path reference, process with specialized tools
import zarr
store = zarr.open(local_zarr_path)
# ... process with Zarr/Dask ...
LargeDataset.insert1({
'dataset_id': uuid.uuid4(),
'zarr_path': local_zarr_path
})
See Also¶
- Use Object Storage — Storage patterns
- Configure Object Storage — Storage setup
- Create Custom Codecs — Domain-specific types