Distributed Computing¶
This tutorial covers running computations across multiple workers. You'll learn:
- Jobs 2.0 — DataJoint's job coordination system
- Multi-process — Parallel workers on one machine
- Multi-machine — Cluster-scale computation
- Error handling — Recovery and monitoring
In [1]:
Copied!
import datajoint as dj
import numpy as np
import time
schema = dj.Schema('tutorial_distributed')
# Clean up from previous runs
schema.drop(prompt=False)
schema = dj.Schema('tutorial_distributed')
import datajoint as dj
import numpy as np
import time
schema = dj.Schema('tutorial_distributed')
# Clean up from previous runs
schema.drop(prompt=False)
schema = dj.Schema('tutorial_distributed')
[2026-02-06 11:44:20] DataJoint 2.1.0 connected to datajoint@127.0.0.1:5432
Setup¶
In [2]:
Copied!
@schema
class Experiment(dj.Manual):
definition = """
exp_id : int
---
n_samples : int
"""
@schema
class Analysis(dj.Computed):
definition = """
-> Experiment
---
result : float64
compute_time : float32
"""
def make(self, key):
start = time.time()
n = (Experiment & key).fetch1('n_samples')
result = float(np.mean(np.random.randn(n) ** 2))
time.sleep(0.1)
self.insert1({**key, 'result': result, 'compute_time': time.time() - start})
@schema
class Experiment(dj.Manual):
definition = """
exp_id : int
---
n_samples : int
"""
@schema
class Analysis(dj.Computed):
definition = """
-> Experiment
---
result : float64
compute_time : float32
"""
def make(self, key):
start = time.time()
n = (Experiment & key).fetch1('n_samples')
result = float(np.mean(np.random.randn(n) ** 2))
time.sleep(0.1)
self.insert1({**key, 'result': result, 'compute_time': time.time() - start})
[2026-02-06 11:44:20][WARNING]: Native type 'int' is used in attribute 'exp_id'. Consider using a core DataJoint type for better portability.
[2026-02-06 11:44:20][WARNING]: Native type 'int' is used in attribute 'n_samples'. Consider using a core DataJoint type for better portability.
In [3]:
Copied!
Experiment.insert([{'exp_id': i, 'n_samples': 10000} for i in range(20)])
print(f"To compute: {len(Analysis.key_source - Analysis)}")
Experiment.insert([{'exp_id': i, 'n_samples': 10000} for i in range(20)])
print(f"To compute: {len(Analysis.key_source - Analysis)}")
To compute: 20
Direct vs Distributed Mode¶
Direct mode (default): No coordination, suitable for single worker.
Distributed mode (reserve_jobs=True): Workers coordinate via jobs table.
In [4]:
Copied!
# Distributed mode
Analysis.populate(reserve_jobs=True, max_calls=5, display_progress=True)
# Distributed mode
Analysis.populate(reserve_jobs=True, max_calls=5, display_progress=True)
[2026-02-06 11:44:20][WARNING]: Native type 'integer' is used in attribute 'exp_id'. Consider using a core DataJoint type for better portability.
Analysis: 0%| | 0/5 [00:00<?, ?it/s]
Analysis: 20%|██ | 1/5 [00:00<00:00, 7.69it/s]
Analysis: 40%|████ | 2/5 [00:00<00:00, 8.34it/s]
Analysis: 60%|██████ | 3/5 [00:00<00:00, 8.47it/s]
Analysis: 80%|████████ | 4/5 [00:00<00:00, 8.54it/s]
Analysis: 100%|██████████| 5/5 [00:00<00:00, 8.54it/s]
Analysis: 100%|██████████| 5/5 [00:00<00:00, 8.45it/s]
Out[4]:
{'success_count': 5, 'error_list': []}
The Jobs Table¶
In [5]:
Copied!
# Refresh job queue
result = Analysis.jobs.refresh()
print(f"Added: {result['added']}")
# Check status
for status, count in Analysis.jobs.progress().items():
print(f"{status}: {count}")
# Refresh job queue
result = Analysis.jobs.refresh()
print(f"Added: {result['added']}")
# Check status
for status, count in Analysis.jobs.progress().items():
print(f"{status}: {count}")
Added: 0 pending: 15 reserved: 0 success: 0 error: 0 ignore: 0 total: 15
Multi-Process and Multi-Machine¶
The processes=N parameter spawns multiple worker processes on one machine. However, this requires table classes to be defined in importable Python modules (not notebooks), because multiprocessing needs to pickle and transfer the class definitions to worker processes.
For production use, define your tables in a module and run workers as scripts:
# pipeline.py - Define your tables
import datajoint as dj
schema = dj.Schema('my_pipeline')
@schema
class Analysis(dj.Computed):
definition = """..."""
def make(self, key): ...
# worker.py - Run workers
from pipeline import Analysis
# Single machine, 4 processes
Analysis.populate(reserve_jobs=True, processes=4)
# Or run this script on multiple machines
while True:
result = Analysis.populate(reserve_jobs=True, max_calls=100, suppress_errors=True)
if result['success_count'] == 0:
break
In this notebook, we'll demonstrate distributed coordination with a single process:
In [6]:
Copied!
# Complete remaining jobs with distributed coordination
Analysis.populate(reserve_jobs=True, display_progress=True)
print(f"Computed: {len(Analysis())}")
# Complete remaining jobs with distributed coordination
Analysis.populate(reserve_jobs=True, display_progress=True)
print(f"Computed: {len(Analysis())}")
Analysis: 0%| | 0/15 [00:00<?, ?it/s]
Analysis: 7%|▋ | 1/15 [00:00<00:01, 8.79it/s]
Analysis: 13%|█▎ | 2/15 [00:00<00:01, 8.88it/s]
Analysis: 20%|██ | 3/15 [00:00<00:01, 8.71it/s]
Analysis: 27%|██▋ | 4/15 [00:00<00:01, 8.82it/s]
Analysis: 33%|███▎ | 5/15 [00:00<00:01, 8.76it/s]
Analysis: 40%|████ | 6/15 [00:00<00:01, 8.82it/s]
Analysis: 47%|████▋ | 7/15 [00:00<00:00, 8.81it/s]
Analysis: 53%|█████▎ | 8/15 [00:00<00:00, 8.75it/s]
Analysis: 60%|██████ | 9/15 [00:01<00:00, 8.72it/s]
Analysis: 67%|██████▋ | 10/15 [00:01<00:00, 8.68it/s]
Analysis: 73%|███████▎ | 11/15 [00:01<00:00, 8.72it/s]
Analysis: 80%|████████ | 12/15 [00:01<00:00, 8.63it/s]
Analysis: 87%|████████▋ | 13/15 [00:01<00:00, 8.66it/s]
Analysis: 93%|█████████▎| 14/15 [00:01<00:00, 8.58it/s]
Analysis: 100%|██████████| 15/15 [00:01<00:00, 8.65it/s]
Analysis: 100%|██████████| 15/15 [00:01<00:00, 8.71it/s]
Computed: 20
Error Handling¶
In [7]:
Copied!
# View errors
print(f"Errors: {len(Analysis.jobs.errors)}")
# Retry failed jobs
Analysis.jobs.errors.delete()
Analysis.populate(reserve_jobs=True, suppress_errors=True)
# View errors
print(f"Errors: {len(Analysis.jobs.errors)}")
# Retry failed jobs
Analysis.jobs.errors.delete()
Analysis.populate(reserve_jobs=True, suppress_errors=True)
Errors: 0
Out[7]:
{'success_count': 0, 'error_list': []}
Quick Reference¶
| Option | Description |
|---|---|
reserve_jobs=True |
Enable coordination |
processes=N |
N worker processes |
max_calls=N |
Limit jobs per run |
suppress_errors=True |
Continue on errors |
In [8]:
Copied!
schema.drop(prompt=False)
schema.drop(prompt=False)