Skip to content

Jobs

Job queue for AutoPopulate

Job queue management for AutoPopulate 2.0.

Each auto-populated table (Computed/Imported) has an associated jobs table with the naming pattern ~~table_name. The jobs table tracks job status, priority, scheduling, and error information.

Job

Bases: Table

Per-table job queue for AutoPopulate 2.0.

Each auto-populated table (Computed/Imported) has an associated job table with the naming pattern ~~table_name. The job table tracks job status, priority, scheduling, and error information.

Parameters:

Name Type Description Default
target_table Table

The Computed/Imported table instance this jobs table manages.

required

Attributes:

Name Type Description
target Table

The auto-populated table this jobs table manages.

pending QueryExpression

Query for jobs with status='pending'.

reserved QueryExpression

Query for jobs with status='reserved'.

errors QueryExpression

Query for jobs with status='error'.

completed QueryExpression

Query for jobs with status='success'.

ignored QueryExpression

Query for jobs with status='ignore'.

Examples:

>>> MyTable.jobs.refresh()      # Add new jobs, clean up stale ones
>>> MyTable.jobs.pending        # Query pending jobs
>>> MyTable.jobs.errors         # Query failed jobs

target property

target

The auto-populated table this jobs table manages.

delete

delete()

Delete all entries, bypassing interactive prompts and dependencies.

drop

drop()

Drop the table, bypassing interactive prompts and dependencies.

pending property

pending

Query for pending jobs awaiting processing.

Returns:

Type Description
Job

Restricted query with status='pending'.

reserved property

reserved

Query for jobs currently being processed.

Returns:

Type Description
Job

Restricted query with status='reserved'.

errors property

errors

Query for jobs that failed with errors.

Returns:

Type Description
Job

Restricted query with status='error'.

ignored property

ignored

Query for jobs marked to be skipped.

Returns:

Type Description
Job

Restricted query with status='ignore'.

completed property

completed

Query for successfully completed jobs.

Returns:

Type Description
Job

Restricted query with status='success'.

refresh

refresh(*restrictions, delay=0, priority=None, stale_timeout=None, orphan_timeout=None)

Refresh the jobs queue: add new jobs and clean up stale/orphaned jobs.

Parameters:

Name Type Description Default
*restrictions any

Conditions to filter key_source (for adding new jobs).

()
delay float

Seconds from now until new jobs become available for processing. Default 0 (immediately available). Uses database server time.

0
priority int

Priority for new jobs (lower = more urgent). Default from config.jobs.default_priority.

None
stale_timeout float

Seconds after which jobs are checked for staleness. Jobs older than this are removed if key not in key_source. Default from config.jobs.stale_timeout. Set to 0 to skip.

None
orphan_timeout float

Seconds after which reserved jobs are considered orphaned. Reserved jobs older than this are deleted and re-added as pending. Default None (no orphan cleanup).

None

Returns:

Type Description
dict

Status counts with keys: 'added', 'removed', 'orphaned', 're_pended'.

Notes

Operations performed:

  1. Add new jobs: (key_source & restrictions) - target - jobs โ†’ insert as pending
  2. Re-pend success jobs: if keep_completed=True and key in key_source but not in target
  3. Remove stale jobs: jobs older than stale_timeout whose keys not in key_source
  4. Remove orphaned jobs: reserved jobs older than orphan_timeout (if specified)

reserve

reserve(key)

Attempt to reserve a pending job for processing.

Updates status to 'reserved' if currently 'pending' and scheduled_time <= now.

Parameters:

Name Type Description Default
key dict

Primary key dict of the job to reserve.

required

Returns:

Type Description
bool

True if reservation successful, False if job not available.

complete

complete(key, duration=None)

Mark a job as successfully completed.

Parameters:

Name Type Description Default
key dict

Primary key dict of the job.

required
duration float

Execution duration in seconds.

None
Notes

Based on config.jobs.keep_completed:

  • If True: updates status to 'success' with completion time and duration
  • If False: deletes the job entry

error

error(key, error_message, error_stack=None)

Mark a job as failed with error details.

Parameters:

Name Type Description Default
key dict

Primary key dict of the job.

required
error_message str

Error message (truncated to 2047 chars if longer).

required
error_stack str

Full stack trace.

None

ignore

ignore(key)

Mark a job to be ignored (skipped during populate).

If the key doesn't exist in the jobs table, inserts it with status='ignore'. If it exists, updates the status to 'ignore'.

Parameters:

Name Type Description Default
key dict

Primary key dict of the job.

required

progress

progress()

Return job status breakdown.

Returns:

Type Description
dict

Counts by status with keys: 'pending', 'reserved', 'success', 'error', 'ignore', 'total'.