Migrate¶
Schema migration utilities
Migration utilities for DataJoint schema updates.
This module provides tools for migrating existing schemas to use the new
Codec system, particularly for upgrading blob columns to use
explicit <blob> type declarations.
.. note:: This module is provided temporarily to assist with migration from pre-2.0. It will be deprecated in DataJoint 2.1 and removed in 2.2. Complete your migrations while on DataJoint 2.0.
Note on Terminology
This module uses "external storage" because that was the term in DataJoint 0.14.6. In DataJoint 2.0 documentation, this is called "object storage" (general term) or "in-store storage" (specific to the @ modifier).
analyze_columns ¶
analyze_columns(schema)
Analyze a schema to find columns that need type labels in comments.
This identifies columns that:
- Use native MySQL types that should be labeled with core types
- Are blob columns without codec markers
- Use external storage (requiring Phase 3-4 migration)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to analyze. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
Each column entry has: table, column, native_type, core_type, comment |
Examples:
>>> import datajoint as dj
>>> from datajoint.migrate import analyze_columns
>>> schema = dj.Schema('my_database')
>>> result = analyze_columns(schema)
>>> for col in result['needs_migration']:
... print(f"{col['table']}.{col['column']}: {col['native_type']} โ {col['core_type']}")
migrate_columns ¶
migrate_columns(schema, dry_run=True)
Add type labels to column comments for Phase 2 migration.
This updates column comments to include type labels, enabling DataJoint 2.0 to recognize column types without relying on native MySQL types.
Migrates:
- Numeric types: int unsigned โ :uint32:, smallint โ :int16:, etc.
- Blob types: longblob โ :
:
Does NOT migrate external storage columns (external-, attach@, filepath@*) - those require Phase 3-4.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to migrate. |
required |
dry_run
|
bool
|
If True, only preview changes without applying. Default True. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> from datajoint.migrate import migrate_columns
>>> # Preview
>>> result = migrate_columns(schema, dry_run=True)
>>> print(f"Would migrate {len(result['sql_statements'])} columns")
>>> # Apply
>>> result = migrate_columns(schema, dry_run=False)
>>> print(f"Migrated {result['columns_migrated']} columns")
analyze_blob_columns ¶
analyze_blob_columns(schema)
Analyze a schema to find blob columns that could be migrated to
This function identifies blob columns that:
- Have a MySQL blob type (tinyblob, blob, mediumblob, longblob)
- Do NOT already have a codec/type specified in their comment
All blob size variants are included in the analysis.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to analyze. |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of dicts with keys:
|
Examples:
>>> import datajoint as dj
>>> schema = dj.Schema('my_database')
>>> columns = dj.migrate.analyze_blob_columns(schema)
>>> for col in columns:
... if col['needs_migration']:
... print(f"{col['table_name']}.{col['column_name']} ({col['column_type']})")
generate_migration_sql ¶
generate_migration_sql(schema, target_type='blob', dry_run=True)
Generate SQL statements to migrate blob columns to use
This generates ALTER TABLE statements that update column comments to
include the :<blob>: prefix, marking them as using explicit
DataJoint blob serialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to migrate. |
required |
target_type
|
str
|
The type name to migrate to. Default "blob". |
'blob'
|
dry_run
|
bool
|
If True, only return SQL without executing. |
True
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of SQL ALTER TABLE statements. |
Examples:
>>> sql_statements = dj.migrate.generate_migration_sql(schema)
>>> for sql in sql_statements:
... print(sql)
Notes
This is a metadata-only migration. The actual blob data format remains unchanged - only the column comments are updated to indicate explicit type handling.
migrate_blob_columns ¶
migrate_blob_columns(schema, target_type='blob', dry_run=True)
Migrate blob columns in a schema to use explicit
This updates column comments in the database to include the type declaration. The data format remains unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to migrate. |
required |
target_type
|
str
|
The type name to migrate to. Default "blob". |
'blob'
|
dry_run
|
bool
|
If True, only preview changes without applying. Default True. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> # Preview migration
>>> result = dj.migrate.migrate_blob_columns(schema, dry_run=True)
>>> print(f"Would migrate {result['needs_migration']} columns")
>>> # Apply migration
>>> result = dj.migrate.migrate_blob_columns(schema, dry_run=False)
>>> print(f"Migrated {result['migrated']} columns")
Warnings
After migration, table definitions should be updated to use
<blob> instead of longblob for consistency. The migration
only updates database metadata; source code changes are manual.
check_migration_status ¶
check_migration_status(schema)
Check the migration status of blob columns in a schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to check. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> status = dj.migrate.check_migration_status(schema)
>>> print(f"Migration progress: {status['migrated']}/{status['total_blob_columns']}")
add_job_metadata_columns ¶
add_job_metadata_columns(target, dry_run=True)
Add hidden job metadata columns to existing Computed/Imported tables.
This migration utility adds the hidden columns (_job_start_time, _job_duration, _job_version) to tables that were created before config.jobs.add_job_metadata was enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
Table or Schema
|
Either a table class/instance (dj.Computed or dj.Imported) or a Schema object. If a Schema, all Computed/Imported tables in the schema will be processed. |
required |
dry_run
|
bool
|
If True, only preview changes without applying. Default True. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> import datajoint as dj
>>> from datajoint.migrate import add_job_metadata_columns
>>>
>>> # Preview migration for a single table
>>> result = add_job_metadata_columns(MyComputedTable, dry_run=True)
>>> print(f"Would add {result['columns_added']} columns")
>>>
>>> # Apply migration to all tables in a schema
>>> result = add_job_metadata_columns(schema, dry_run=False)
>>> print(f"Modified {result['tables_modified']} tables")
Notes
- Only Computed and Imported tables are modified (not Manual, Lookup, or Part)
- Existing rows will have NULL values for _job_start_time and _job_duration
- Future populate() calls will fill in metadata for new rows
- This does NOT retroactively populate metadata for existing rows
migrate_external ¶
migrate_external(schema, dry_run=True, finalize=False)
Migrate external storage columns from 0.x to 2.0 format.
This migration uses a safe, multi-step approach:
-
Initial run (dry_run=False): Adds new
<column>_v2columns with JSON type and copies data from the old columns, converting UUID references to JSON metadata. -
Verification: You verify all data is accessible via DataJoint 2.0.
-
Finalize (finalize=True): Renames columns (old โ
_v1, new โ original name) and optionally drops the old columns.
This allows 0.x and 2.0 to coexist during migration and provides a rollback path if issues are discovered.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to migrate. |
required |
dry_run
|
bool
|
If True, only preview changes without applying. Default True. |
True
|
finalize
|
bool
|
If True, rename migrated columns to original names and drop old columns. Only run after verifying migration succeeded. Default False. |
False
|
Returns:
| Type | Description |
|---|---|
dict
|
Migration results with keys:
|
Examples:
>>> from datajoint.migration import migrate_external
>>>
>>> # Step 1: Preview
>>> result = migrate_external(schema, dry_run=True)
>>> print(f"Found {result['columns_found']} columns to migrate")
>>>
>>> # Step 2: Run migration (adds new columns)
>>> result = migrate_external(schema, dry_run=False)
>>> print(f"Migrated {result['rows_migrated']} rows")
>>>
>>> # Step 3: Verify data is accessible via DataJoint 2.0
>>> # ... manual verification ...
>>>
>>> # Step 4: Finalize (rename columns, drop old)
>>> result = migrate_external(schema, finalize=True)
Notes
The migration reads from the hidden ~external_<store> tables to build
JSON metadata. Ensure store configuration in datajoint.json matches the
paths stored in these tables.
check_store_configuration ¶
check_store_configuration(schema)
Verify external stores are properly configured.
Checks that all external storage stores referenced in the schema's tables are configured in settings and accessible.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to check. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> from datajoint.migrate import check_store_configuration
>>> result = check_store_configuration(schema)
>>> if result['stores_missing']:
... print(f"Missing stores: {result['stores_missing']}")
verify_external_integrity ¶
verify_external_integrity(schema, store_name=None)
Check that all external references point to existing files.
Verifies integrity of external storage by checking that each reference in the ~external_* tables points to an accessible file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to check. |
required |
store_name
|
str
|
Specific store to check. If None, checks all stores. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> from datajoint.migrate import verify_external_integrity
>>> result = verify_external_integrity(schema)
>>> if result['missing']:
... print(f"Missing files: {len(result['missing'])}")
... for entry in result['missing'][:5]:
... print(f" {entry['filepath']}")
Notes
For S3/MinIO stores, this function does not verify file existence (would require network calls). Only local file stores are fully verified.
rebuild_lineage ¶
rebuild_lineage(schema, dry_run=True)
Rebuild ~lineage table from current table definitions.
Use after schema changes or to repair corrupted lineage data. The lineage table tracks foreign key relationships for semantic matching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to rebuild lineage for. |
required |
dry_run
|
bool
|
If True, only preview changes without applying. Default True. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
Examples:
>>> from datajoint.migrate import rebuild_lineage
>>> result = rebuild_lineage(schema, dry_run=True)
>>> print(f"Would create {result['lineage_entries']} lineage entries")
>>> result = rebuild_lineage(schema, dry_run=False)
>>> print(f"Rebuilt lineage: {result['status']}")
Notes
This function wraps schema.rebuild_lineage() with dry_run support and additional reporting.
migrate_filepath ¶
migrate_filepath(schema, dry_run=True, finalize=False)
Migrate filepath columns from 0.x to 2.0 format.
Same multi-step approach as migrate_external:
- Initial run: Adds new
<column>_v2columns with JSON type - Verification: Verify files accessible via DataJoint 2.0
- Finalize: Rename columns and drop old
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Schema
|
The DataJoint schema to migrate. |
required |
dry_run
|
bool
|
If True, only preview changes. Default True. |
True
|
finalize
|
bool
|
If True, finalize migration. Default False. |
False
|
Returns:
| Type | Description |
|---|---|
dict
|
Migration results (same format as migrate_external). |
Examples:
>>> from datajoint.migration import migrate_filepath
>>>
>>> # Preview
>>> result = migrate_filepath(schema, dry_run=True)
>>>
>>> # Run migration
>>> result = migrate_filepath(schema, dry_run=False)
>>>
>>> # Finalize after verification
>>> result = migrate_filepath(schema, finalize=True)
create_parallel_schema ¶
create_parallel_schema(source, dest, copy_data=False, connection=None)
Create a parallel _v20 schema for migration testing.
This creates a copy of a production schema (source) into a test schema (dest) for safely testing DataJoint 2.0 migration without affecting production.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Production schema name (e.g., 'my_pipeline') |
required |
dest
|
str
|
Test schema name (e.g., 'my_pipeline_v20') |
required |
copy_data
|
bool
|
If True, copy all table data. If False (default), create empty tables. |
False
|
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> from datajoint.migrate import create_parallel_schema
>>> result = create_parallel_schema('my_pipeline', 'my_pipeline_v20')
>>> print(f"Created {result['tables_created']} tables")
See Also
copy_table_data : Copy data between schemas
copy_table_data ¶
copy_table_data(source_schema, dest_schema, table, limit=None, where_clause=None, connection=None)
Copy data from production table to test table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_schema
|
str
|
Production schema name |
required |
dest_schema
|
str
|
Test schema name (_v20) |
required |
table
|
str
|
Table name |
required |
limit
|
int
|
Maximum number of rows to copy |
None
|
where_clause
|
str
|
SQL WHERE clause for filtering (without 'WHERE' keyword) |
None
|
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> # Copy all data
>>> result = copy_table_data('my_pipeline', 'my_pipeline_v20', 'Mouse')
>>> # Copy sample
>>> result = copy_table_data(
... 'my_pipeline', 'my_pipeline_v20', 'Session',
... limit=100,
... where_clause="session_date >= '2024-01-01'"
... )
compare_query_results ¶
compare_query_results(prod_schema, test_schema, table, tolerance=1e-06, connection=None)
Compare query results between production and test schemas.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prod_schema
|
str
|
Production schema name |
required |
test_schema
|
str
|
Test schema name (_v20) |
required |
table
|
str
|
Table name to compare |
required |
tolerance
|
float
|
Tolerance for floating-point comparison. Default 1e-6. |
1e-06
|
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> result = compare_query_results('my_pipeline', 'my_pipeline_v20', 'neuron')
>>> if result['match']:
... print(f"โ All {result['row_count']} rows match")
backup_schema ¶
backup_schema(schema, backup_name, connection=None)
Create full backup of a schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
str
|
Schema name to backup |
required |
backup_name
|
str
|
Backup schema name (e.g., 'my_pipeline_backup_20250114') |
required |
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> result = backup_schema('my_pipeline', 'my_pipeline_backup_20250114')
>>> print(f"Backed up {result['tables_backed_up']} tables")
restore_schema ¶
restore_schema(backup, dest, connection=None)
Restore schema from backup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backup
|
str
|
Backup schema name |
required |
dest
|
str
|
Destination schema name |
required |
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> restore_schema('my_pipeline_backup_20250114', 'my_pipeline')
verify_schema_v20 ¶
verify_schema_v20(schema, connection=None)
Verify schema is fully migrated to DataJoint 2.0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
str
|
Schema name to verify |
required |
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> result = verify_schema_v20('my_pipeline')
>>> if result['compatible']:
... print("โ Schema fully migrated to 2.0")
migrate_external_pointers_v2 ¶
migrate_external_pointers_v2(schema, table, attribute, source_store, dest_store, copy_files=False, connection=None)
Migrate external storage pointers from 0.14.6 to 2.0 format.
Converts BINARY(16) UUID references to JSON metadata format. Optionally copies blob files to new storage location.
This is useful when copying production data to _v2 schemas and you need to access external storage attributes but don't want to move the files yet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
str
|
Schema name (e.g., 'my_pipeline_v2') |
required |
table
|
str
|
Table name |
required |
attribute
|
str
|
External attribute name (e.g., 'signal') |
required |
source_store
|
str
|
0.14.6 store name (e.g., 'external-raw') |
required |
dest_store
|
str
|
2.0 store name (e.g., 'raw') |
required |
copy_files
|
bool
|
If True, copy blob files to new location. If False (default), JSON points to existing files. |
False
|
connection
|
Connection
|
Database connection. If None, uses default connection. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Examples:
>>> # Migrate pointers without moving files
>>> result = migrate_external_pointers_v2(
... schema='my_pipeline_v2',
... table='recording',
... attribute='signal',
... source_store='external-raw',
... dest_store='raw',
... copy_files=False
... )
>>> print(f"Migrated {result['rows_migrated']} pointers")
Notes
This function: 1. Reads BINARY(16) UUID from table column 2. Looks up file in ~external_{source_store} table 3. Creates JSON metadata with file path 4. Optionally copies file to new store location 5. Updates column with JSON metadata
The JSON format is: { "path": "schema/table/key_hash/file.ext", "size": 12345, "hash": null, "ext": ".dat", "is_dir": false, "timestamp": "2025-01-14T10:30:00+00:00" }