Skip to content

Migrate

Schema migration utilities

Migration utilities for DataJoint schema updates.

This module provides tools for migrating existing schemas to use the new Codec system, particularly for upgrading blob columns to use explicit <blob> type declarations.

.. note:: This module is provided temporarily to assist with migration from pre-2.0. It will be deprecated in DataJoint 2.1 and removed in 2.2. Complete your migrations while on DataJoint 2.0.

Note on Terminology

This module uses "external storage" because that was the term in DataJoint 0.14.6. In DataJoint 2.0 documentation, this is called "object storage" (general term) or "in-store storage" (specific to the @ modifier).

analyze_columns

analyze_columns(schema)

Analyze a schema to find columns that need type labels in comments.

This identifies columns that:

  1. Use native MySQL types that should be labeled with core types
  2. Are blob columns without codec markers
  3. Use external storage (requiring Phase 3-4 migration)

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to analyze.

required

Returns:

Type Description
dict

Dict with keys:

  • needs_migration: list of columns needing type labels
  • already_migrated: list of columns with existing type labels
  • external_storage: list of columns requiring Phase 3-4

Each column entry has: table, column, native_type, core_type, comment

Examples:

>>> import datajoint as dj
>>> from datajoint.migrate import analyze_columns
>>> schema = dj.Schema('my_database')
>>> result = analyze_columns(schema)
>>> for col in result['needs_migration']:
...     print(f"{col['table']}.{col['column']}: {col['native_type']} โ†’ {col['core_type']}")

migrate_columns

migrate_columns(schema, dry_run=True)

Add type labels to column comments for Phase 2 migration.

This updates column comments to include type labels, enabling DataJoint 2.0 to recognize column types without relying on native MySQL types.

Migrates:

  • Numeric types: int unsigned โ†’ :uint32:, smallint โ†’ :int16:, etc.
  • Blob types: longblob โ†’ ::

Does NOT migrate external storage columns (external-, attach@, filepath@*) - those require Phase 3-4.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to migrate.

required
dry_run bool

If True, only preview changes without applying. Default True.

True

Returns:

Type Description
dict

Dict with keys:

  • columns_analyzed: total columns checked
  • columns_migrated: number of columns updated
  • columns_skipped: number already migrated or external
  • sql_statements: list of SQL executed (or to be executed)
  • details: per-column results

Examples:

>>> from datajoint.migrate import migrate_columns
>>> # Preview
>>> result = migrate_columns(schema, dry_run=True)
>>> print(f"Would migrate {len(result['sql_statements'])} columns")
>>> # Apply
>>> result = migrate_columns(schema, dry_run=False)
>>> print(f"Migrated {result['columns_migrated']} columns")

analyze_blob_columns

analyze_blob_columns(schema)

Analyze a schema to find blob columns that could be migrated to .

This function identifies blob columns that:

  1. Have a MySQL blob type (tinyblob, blob, mediumblob, longblob)
  2. Do NOT already have a codec/type specified in their comment

All blob size variants are included in the analysis.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to analyze.

required

Returns:

Type Description
list[dict]

List of dicts with keys:

  • table_name: Full table name (database.table)
  • column_name: Name of the blob column
  • column_type: MySQL column type (tinyblob, blob, mediumblob, longblob)
  • current_comment: Current column comment
  • needs_migration: True if column should be migrated

Examples:

>>> import datajoint as dj
>>> schema = dj.Schema('my_database')
>>> columns = dj.migrate.analyze_blob_columns(schema)
>>> for col in columns:
...     if col['needs_migration']:
...         print(f"{col['table_name']}.{col['column_name']} ({col['column_type']})")

generate_migration_sql

generate_migration_sql(schema, target_type='blob', dry_run=True)

Generate SQL statements to migrate blob columns to use .

This generates ALTER TABLE statements that update column comments to include the :<blob>: prefix, marking them as using explicit DataJoint blob serialization.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to migrate.

required
target_type str

The type name to migrate to. Default "blob".

'blob'
dry_run bool

If True, only return SQL without executing.

True

Returns:

Type Description
list[str]

List of SQL ALTER TABLE statements.

Examples:

>>> sql_statements = dj.migrate.generate_migration_sql(schema)
>>> for sql in sql_statements:
...     print(sql)
Notes

This is a metadata-only migration. The actual blob data format remains unchanged - only the column comments are updated to indicate explicit type handling.

migrate_blob_columns

migrate_blob_columns(schema, target_type='blob', dry_run=True)

Migrate blob columns in a schema to use explicit type.

This updates column comments in the database to include the type declaration. The data format remains unchanged.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to migrate.

required
target_type str

The type name to migrate to. Default "blob".

'blob'
dry_run bool

If True, only preview changes without applying. Default True.

True

Returns:

Type Description
dict

Dict with keys:

  • analyzed: Number of blob columns analyzed
  • needs_migration: Number of columns that need migration
  • migrated: Number of columns migrated (0 if dry_run)
  • sql_statements: List of SQL statements (executed or to be executed)

Examples:

>>> # Preview migration
>>> result = dj.migrate.migrate_blob_columns(schema, dry_run=True)
>>> print(f"Would migrate {result['needs_migration']} columns")
>>> # Apply migration
>>> result = dj.migrate.migrate_blob_columns(schema, dry_run=False)
>>> print(f"Migrated {result['migrated']} columns")
Warnings

After migration, table definitions should be updated to use <blob> instead of longblob for consistency. The migration only updates database metadata; source code changes are manual.

check_migration_status

check_migration_status(schema)

Check the migration status of blob columns in a schema.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to check.

required

Returns:

Type Description
dict

Dict with keys:

  • total_blob_columns: Total number of blob columns
  • migrated: Number of columns with explicit type
  • pending: Number of columns using implicit serialization
  • columns: List of column details

Examples:

>>> status = dj.migrate.check_migration_status(schema)
>>> print(f"Migration progress: {status['migrated']}/{status['total_blob_columns']}")

add_job_metadata_columns

add_job_metadata_columns(target, dry_run=True)

Add hidden job metadata columns to existing Computed/Imported tables.

This migration utility adds the hidden columns (_job_start_time, _job_duration, _job_version) to tables that were created before config.jobs.add_job_metadata was enabled.

Parameters:

Name Type Description Default
target Table or Schema

Either a table class/instance (dj.Computed or dj.Imported) or a Schema object. If a Schema, all Computed/Imported tables in the schema will be processed.

required
dry_run bool

If True, only preview changes without applying. Default True.

True

Returns:

Type Description
dict

Dict with keys:

  • tables_analyzed: Number of tables checked
  • tables_modified: Number of tables that were/would be modified
  • columns_added: Total columns added across all tables
  • details: List of dicts with per-table information

Examples:

>>> import datajoint as dj
>>> from datajoint.migrate import add_job_metadata_columns
>>>
>>> # Preview migration for a single table
>>> result = add_job_metadata_columns(MyComputedTable, dry_run=True)
>>> print(f"Would add {result['columns_added']} columns")
>>>
>>> # Apply migration to all tables in a schema
>>> result = add_job_metadata_columns(schema, dry_run=False)
>>> print(f"Modified {result['tables_modified']} tables")
Notes
  • Only Computed and Imported tables are modified (not Manual, Lookup, or Part)
  • Existing rows will have NULL values for _job_start_time and _job_duration
  • Future populate() calls will fill in metadata for new rows
  • This does NOT retroactively populate metadata for existing rows

migrate_external

migrate_external(schema, dry_run=True, finalize=False)

Migrate external storage columns from 0.x to 2.0 format.

This migration uses a safe, multi-step approach:

  1. Initial run (dry_run=False): Adds new <column>_v2 columns with JSON type and copies data from the old columns, converting UUID references to JSON metadata.

  2. Verification: You verify all data is accessible via DataJoint 2.0.

  3. Finalize (finalize=True): Renames columns (old โ†’ _v1, new โ†’ original name) and optionally drops the old columns.

This allows 0.x and 2.0 to coexist during migration and provides a rollback path if issues are discovered.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to migrate.

required
dry_run bool

If True, only preview changes without applying. Default True.

True
finalize bool

If True, rename migrated columns to original names and drop old columns. Only run after verifying migration succeeded. Default False.

False

Returns:

Type Description
dict

Migration results with keys:

  • columns_found: Number of external columns found
  • columns_migrated: Number of columns processed
  • rows_migrated: Number of rows with data converted
  • details: Per-column migration details

Examples:

>>> from datajoint.migration import migrate_external
>>>
>>> # Step 1: Preview
>>> result = migrate_external(schema, dry_run=True)
>>> print(f"Found {result['columns_found']} columns to migrate")
>>>
>>> # Step 2: Run migration (adds new columns)
>>> result = migrate_external(schema, dry_run=False)
>>> print(f"Migrated {result['rows_migrated']} rows")
>>>
>>> # Step 3: Verify data is accessible via DataJoint 2.0
>>> # ... manual verification ...
>>>
>>> # Step 4: Finalize (rename columns, drop old)
>>> result = migrate_external(schema, finalize=True)
Notes

The migration reads from the hidden ~external_<store> tables to build JSON metadata. Ensure store configuration in datajoint.json matches the paths stored in these tables.

check_store_configuration

check_store_configuration(schema)

Verify external stores are properly configured.

Checks that all external storage stores referenced in the schema's tables are configured in settings and accessible.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to check.

required

Returns:

Type Description
dict

Dict with keys:

  • stores_configured: list of store names with valid config
  • stores_missing: list of stores referenced but not configured
  • stores_unreachable: list of stores that failed connection test
  • details: per-store details

Examples:

>>> from datajoint.migrate import check_store_configuration
>>> result = check_store_configuration(schema)
>>> if result['stores_missing']:
...     print(f"Missing stores: {result['stores_missing']}")

verify_external_integrity

verify_external_integrity(schema, store_name=None)

Check that all external references point to existing files.

Verifies integrity of external storage by checking that each reference in the ~external_* tables points to an accessible file.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to check.

required
store_name str

Specific store to check. If None, checks all stores.

None

Returns:

Type Description
dict

Dict with keys:

  • total_references: count of external entries
  • valid: count with accessible files
  • missing: list of entries with inaccessible files
  • stores_checked: list of store names checked

Examples:

>>> from datajoint.migrate import verify_external_integrity
>>> result = verify_external_integrity(schema)
>>> if result['missing']:
...     print(f"Missing files: {len(result['missing'])}")
...     for entry in result['missing'][:5]:
...         print(f"  {entry['filepath']}")
Notes

For S3/MinIO stores, this function does not verify file existence (would require network calls). Only local file stores are fully verified.

rebuild_lineage

rebuild_lineage(schema, dry_run=True)

Rebuild ~lineage table from current table definitions.

Use after schema changes or to repair corrupted lineage data. The lineage table tracks foreign key relationships for semantic matching.

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to rebuild lineage for.

required
dry_run bool

If True, only preview changes without applying. Default True.

True

Returns:

Type Description
dict

Dict with keys:

  • tables_analyzed: number of tables in schema
  • lineage_entries: number of lineage entries created
  • status: 'dry_run', 'rebuilt', or 'error'

Examples:

>>> from datajoint.migrate import rebuild_lineage
>>> result = rebuild_lineage(schema, dry_run=True)
>>> print(f"Would create {result['lineage_entries']} lineage entries")
>>> result = rebuild_lineage(schema, dry_run=False)
>>> print(f"Rebuilt lineage: {result['status']}")
Notes

This function wraps schema.rebuild_lineage() with dry_run support and additional reporting.

migrate_filepath

migrate_filepath(schema, dry_run=True, finalize=False)

Migrate filepath columns from 0.x to 2.0 format.

Same multi-step approach as migrate_external:

  1. Initial run: Adds new <column>_v2 columns with JSON type
  2. Verification: Verify files accessible via DataJoint 2.0
  3. Finalize: Rename columns and drop old

Parameters:

Name Type Description Default
schema Schema

The DataJoint schema to migrate.

required
dry_run bool

If True, only preview changes. Default True.

True
finalize bool

If True, finalize migration. Default False.

False

Returns:

Type Description
dict

Migration results (same format as migrate_external).

Examples:

>>> from datajoint.migration import migrate_filepath
>>>
>>> # Preview
>>> result = migrate_filepath(schema, dry_run=True)
>>>
>>> # Run migration
>>> result = migrate_filepath(schema, dry_run=False)
>>>
>>> # Finalize after verification
>>> result = migrate_filepath(schema, finalize=True)

create_parallel_schema

create_parallel_schema(source, dest, copy_data=False, connection=None)

Create a parallel _v20 schema for migration testing.

This creates a copy of a production schema (source) into a test schema (dest) for safely testing DataJoint 2.0 migration without affecting production.

Parameters:

Name Type Description Default
source str

Production schema name (e.g., 'my_pipeline')

required
dest str

Test schema name (e.g., 'my_pipeline_v20')

required
copy_data bool

If True, copy all table data. If False (default), create empty tables.

False
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • tables_created: int - number of tables created
  • data_copied: bool - whether data was copied
  • tables: list - list of table names created

Examples:

>>> from datajoint.migrate import create_parallel_schema
>>> result = create_parallel_schema('my_pipeline', 'my_pipeline_v20')
>>> print(f"Created {result['tables_created']} tables")
See Also

copy_table_data : Copy data between schemas

copy_table_data

copy_table_data(source_schema, dest_schema, table, limit=None, where_clause=None, connection=None)

Copy data from production table to test table.

Parameters:

Name Type Description Default
source_schema str

Production schema name

required
dest_schema str

Test schema name (_v20)

required
table str

Table name

required
limit int

Maximum number of rows to copy

None
where_clause str

SQL WHERE clause for filtering (without 'WHERE' keyword)

None
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • rows_copied: int - number of rows copied
  • time_taken: float - seconds elapsed

Examples:

>>> # Copy all data
>>> result = copy_table_data('my_pipeline', 'my_pipeline_v20', 'Mouse')
>>> # Copy sample
>>> result = copy_table_data(
...     'my_pipeline', 'my_pipeline_v20', 'Session',
...     limit=100,
...     where_clause="session_date >= '2024-01-01'"
... )

compare_query_results

compare_query_results(prod_schema, test_schema, table, tolerance=1e-06, connection=None)

Compare query results between production and test schemas.

Parameters:

Name Type Description Default
prod_schema str

Production schema name

required
test_schema str

Test schema name (_v20)

required
table str

Table name to compare

required
tolerance float

Tolerance for floating-point comparison. Default 1e-6.

1e-06
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • match: bool - whether all rows match
  • row_count: int - number of rows compared
  • discrepancies: list - list of mismatches (if any)

Examples:

>>> result = compare_query_results('my_pipeline', 'my_pipeline_v20', 'neuron')
>>> if result['match']:
...     print(f"โœ“ All {result['row_count']} rows match")

backup_schema

backup_schema(schema, backup_name, connection=None)

Create full backup of a schema.

Parameters:

Name Type Description Default
schema str

Schema name to backup

required
backup_name str

Backup schema name (e.g., 'my_pipeline_backup_20250114')

required
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • tables_backed_up: int
  • rows_backed_up: int
  • backup_location: str

Examples:

>>> result = backup_schema('my_pipeline', 'my_pipeline_backup_20250114')
>>> print(f"Backed up {result['tables_backed_up']} tables")

restore_schema

restore_schema(backup, dest, connection=None)

Restore schema from backup.

Parameters:

Name Type Description Default
backup str

Backup schema name

required
dest str

Destination schema name

required
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • tables_restored: int
  • rows_restored: int

Examples:

>>> restore_schema('my_pipeline_backup_20250114', 'my_pipeline')

verify_schema_v20

verify_schema_v20(schema, connection=None)

Verify schema is fully migrated to DataJoint 2.0.

Parameters:

Name Type Description Default
schema str

Schema name to verify

required
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • compatible: bool - True if fully compatible with 2.0
  • blob_markers: bool - All blob columns have :: markers
  • lineage_exists: bool - ~lineage table exists
  • issues: list - List of compatibility issues found

Examples:

>>> result = verify_schema_v20('my_pipeline')
>>> if result['compatible']:
...     print("โœ“ Schema fully migrated to 2.0")

migrate_external_pointers_v2

migrate_external_pointers_v2(schema, table, attribute, source_store, dest_store, copy_files=False, connection=None)

Migrate external storage pointers from 0.14.6 to 2.0 format.

Converts BINARY(16) UUID references to JSON metadata format. Optionally copies blob files to new storage location.

This is useful when copying production data to _v2 schemas and you need to access external storage attributes but don't want to move the files yet.

Parameters:

Name Type Description Default
schema str

Schema name (e.g., 'my_pipeline_v2')

required
table str

Table name

required
attribute str

External attribute name (e.g., 'signal')

required
source_store str

0.14.6 store name (e.g., 'external-raw')

required
dest_store str

2.0 store name (e.g., 'raw')

required
copy_files bool

If True, copy blob files to new location. If False (default), JSON points to existing files.

False
connection Connection

Database connection. If None, uses default connection.

None

Returns:

Type Description
dict
  • rows_migrated: int - number of pointers migrated
  • files_copied: int - number of files copied (if copy_files=True)
  • errors: list - any errors encountered

Examples:

>>> # Migrate pointers without moving files
>>> result = migrate_external_pointers_v2(
...     schema='my_pipeline_v2',
...     table='recording',
...     attribute='signal',
...     source_store='external-raw',
...     dest_store='raw',
...     copy_files=False
... )
>>> print(f"Migrated {result['rows_migrated']} pointers")
Notes

This function: 1. Reads BINARY(16) UUID from table column 2. Looks up file in ~external_{source_store} table 3. Creates JSON metadata with file path 4. Optionally copies file to new store location 5. Updates column with JSON metadata

The JSON format is: { "path": "schema/table/key_hash/file.ext", "size": 12345, "hash": null, "ext": ".dat", "is_dir": false, "timestamp": "2025-01-14T10:30:00+00:00" }