Monitor Progress¶
Track computation progress and job status.
Progress Display¶
Show progress bar during populate:
ProcessedData.populate(display_progress=True)
Check Remaining Work¶
Count entries left to compute:
# What's left to compute
remaining = ProcessedData.key_source - ProcessedData
print(f"{len(remaining)} entries remaining")
Job Status Summary¶
Get counts by status:
progress = ProcessedData.jobs.progress()
# {'pending': 100, 'reserved': 5, 'error': 3, 'success': 892}
for status, count in progress.items():
print(f"{status}: {count}")
Filter Jobs by Status¶
Access jobs by their current status:
# Pending jobs (waiting to run)
ProcessedData.jobs.pending
# Currently running
ProcessedData.jobs.reserved
# Failed jobs
ProcessedData.jobs.errors
# Completed jobs (if keep_completed=True)
ProcessedData.jobs.completed
# Skipped jobs
ProcessedData.jobs.ignored
View Job Details¶
Inspect specific jobs:
# All jobs for a key
(ProcessedData.jobs & key).fetch1()
# Recent errors
ProcessedData.jobs.errors.to_dicts(
order_by='completed_time DESC',
limit=10
)
Worker Information¶
See which workers are processing:
for job in ProcessedData.jobs.reserved.to_dicts():
print(f"Key: {job}")
print(f"Host: {job['host']}")
print(f"PID: {job['pid']}")
print(f"Started: {job['reserved_time']}")
Computation Timing¶
Track how long jobs take:
# Average duration of completed jobs
completed = ProcessedData.jobs.completed.to_arrays('duration')
print(f"Average: {np.mean(completed):.1f}s")
print(f"Median: {np.median(completed):.1f}s")
Enable Job Metadata¶
Store timing info in computed tables:
import datajoint as dj
dj.config.jobs.add_job_metadata = True
dj.config.jobs.keep_completed = True
This adds hidden attributes to computed tables:
_job_start_timeโ When computation began_job_durationโ How long it took_job_versionโ Code version (if configured)
Simple Progress Script¶
import time
from my_pipeline import ProcessedData
while True:
remaining, total = ProcessedData.progress()
print(f"\rProgress: {total - remaining}/{total} ({(total - remaining) / total:.0%})", end='')
if remaining == 0:
print("\nDone!")
break
time.sleep(10)
For distributed mode with job tracking:
import time
from my_pipeline import ProcessedData
while True:
status = ProcessedData.jobs.progress()
print(f"\rPending: {status.get('pending', 0)} | "
f"Running: {status.get('reserved', 0)} | "
f"Done: {status.get('success', 0)} | "
f"Errors: {status.get('error', 0)}", end='')
if status.get('pending', 0) == 0 and status.get('reserved', 0) == 0:
print("\nDone!")
break
time.sleep(10)
Pipeline-Wide Status¶
Check multiple tables:
tables = [RawData, ProcessedData, Analysis]
for table in tables:
total = len(table.key_source)
done = len(table())
print(f"{table.__name__}: {done}/{total} ({done/total:.0%})")
See Also¶
- Run Computations โ Basic populate usage
- Distributed Computing โ Multi-worker setup
- Handle Errors โ Error recovery