Manage a Pipeline Project¶
Organize multi-schema pipelines for team collaboration.
Overview¶
A production DataJoint pipeline typically involves:
- Multiple schemas โ Organized by experimental modality or processing stage
- Team of users โ With different roles and access levels
- Shared infrastructure โ Database server, object storage, code repository
- Coordination โ Between code, database, and storage permissions
This guide covers practical project organization. For conceptual background on pipeline architecture and the DAG structure, see Data Pipelines.
For a fully managed solution, request a DataJoint Platform account.
Project Structure¶
Use a modern Python project layout with source code under src/:
my_pipeline/
โโโ datajoint.json # Shared settings (committed)
โโโ .secrets/ # Local credentials (gitignored)
โ โโโ database.password
โ โโโ storage.credentials
โโโ .gitignore
โโโ pyproject.toml # Package metadata and dependencies
โโโ README.md
โโโ src/
โ โโโ my_pipeline/
โ โโโ __init__.py
โ โโโ subject.py # subject schema
โ โโโ session.py # session schema
โ โโโ ephys.py # ephys schema
โ โโโ imaging.py # imaging schema
โ โโโ analysis.py # analysis schema
โ โโโ utils/
โ โโโ __init__.py
โโโ tests/
โ โโโ conftest.py
โ โโโ test_ephys.py
โโโ docs/
โโโ ...
One Module Per Schema¶
Each module defines and binds to its schema:
# src/my_pipeline/ephys.py
import datajoint as dj
from . import session # Import dependency
schema = dj.Schema('ephys')
@schema
class Probe(dj.Lookup):
definition = """
probe_type : varchar(32)
---
num_channels : int32
"""
@schema
class Recording(dj.Imported):
definition = """
-> session.Session
-> Probe
---
recording_path : varchar(255)
"""
Import Dependencies Mirror Foreign Keys¶
Module imports reflect the schema DAG:
# analysis.py depends on both ephys and imaging
from . import ephys
from . import imaging
schema = dj.Schema('analysis')
@schema
class MultiModalAnalysis(dj.Computed):
definition = """
-> ephys.Recording
-> imaging.Scan
---
correlation : float64
"""
Repository Configuration¶
Shared Settings¶
Store non-secret configuration in datajoint.json at the project root:
datajoint.json (committed):
{
"database": {
"host": "db.example.com",
"port": 3306
},
"stores": {
"main": {
"protocol": "s3",
"endpoint": "s3.example.com",
"bucket": "my-org-data",
"location": "my_pipeline"
}
}
}
Credentials Management¶
Credentials are stored locally and never committed:
Option 1: .secrets/ directory
.secrets/
โโโ database.user
โโโ database.password
โโโ storage.access_key
โโโ storage.secret_key
Option 2: Environment variables
export DJ_USER=alice
export DJ_PASS=alice_password
export DJ_STORES__MAIN__ACCESS_KEY=...
export DJ_STORES__MAIN__SECRET_KEY=...
Essential .gitignore¶
# Credentials
.secrets/
# Python
__pycache__/
*.pyc
*.egg-info/
dist/
build/
# Environment
.env
.venv/
# IDE
.idea/
.vscode/
pyproject.toml Example¶
[project]
name = "my-pipeline"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"datajoint>=2.0",
"numpy",
]
[project.optional-dependencies]
dev = ["pytest", "pytest-cov"]
[tool.setuptools.packages.find]
where = ["src"]
Database Access Control¶
The Complexity¶
Multi-user database access requires:
- User accounts โ Individual credentials per team member
- Schema permissions โ Which users can access which schemas
- Operation permissions โ SELECT, INSERT, UPDATE, DELETE, CREATE, DROP
- Role hierarchy โ Admin, developer, analyst, viewer
- Audit trail โ Who modified what and when
Basic MySQL Grants¶
-- Create user
CREATE USER 'alice'@'%' IDENTIFIED BY 'password';
-- Grant read-only on specific schema
GRANT SELECT ON ephys.* TO 'alice'@'%';
-- Grant read-write on specific schema
GRANT SELECT, INSERT, UPDATE, DELETE ON analysis.* TO 'alice'@'%';
-- Grant full access (developers)
GRANT ALL PRIVILEGES ON my_pipeline_*.* TO 'bob'@'%';
Role-Based Access Patterns¶
| Role | Permissions | Typical Use |
|---|---|---|
| Viewer | SELECT | Browse data, run queries |
| Analyst | SELECT, INSERT on analysis | Add analysis results |
| Operator | SELECT, INSERT, DELETE on data schemas | Run pipeline |
| Developer | ALL on development schemas | Schema changes |
| Admin | ALL + GRANT | User management |
Considerations¶
- Users need SELECT on parent schemas to INSERT into child schemas (FK validation)
- Cascading deletes require DELETE on all dependent schemas
- Schema creation requires CREATE privilege
- Coordinating permissions across many schemas becomes complex
Object Storage Access Control¶
The Complexity¶
Object storage permissions must align with database permissions:
- Bucket/prefix policies โ Map to schema access
- Read vs write โ Match SELECT vs INSERT/UPDATE
- Credential distribution โ Per-user or shared service accounts
- Cross-schema objects โ When computed tables reference multiple inputs
Hierarchical Storage Structure¶
A DataJoint project creates a structured storage pattern:
๐ project_name/
โโโ ๐ schema_name1/
โโโ ๐ schema_name2/
โโโ ๐ schema_name3/
โ โโโ objects/
โ โ โโโ table1/
โ โ โโโ key1-value1/
โ โโโ fields/
โ โโโ table1-field1/
โโโ ...
S3/MinIO Policy Example¶
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::my-lab-data/datajoint/ephys/*"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::my-lab-data/datajoint/analysis/*"
}
]
}
Considerations¶
- Object paths include schema name:
{project}/{schema}/{table}/... - Users need read access to fetch blobs from upstream schemas
- Content-addressed storage (
<blob@>) shares objects across tables - Garbage collection requires coordinated delete permissions
Pipeline Initialization¶
Schema Creation Order¶
Initialize schemas in dependency order:
# src/my_pipeline/__init__.py
from . import subject # No dependencies
from . import session # Depends on subject
from . import ephys # Depends on session
from . import imaging # Depends on session
from . import analysis # Depends on ephys, imaging
def initialize():
"""Create all schemas in dependency order."""
# Schemas are created when modules are imported
# and tables are first accessed
subject.Subject()
session.Session()
ephys.Recording()
imaging.Scan()
analysis.MultiModalAnalysis()
Version Coordination¶
Track schema versions with your code:
# src/my_pipeline/version.py
__version__ = "1.2.0"
SCHEMA_VERSIONS = {
'subject': '1.0.0',
'session': '1.1.0',
'ephys': '1.2.0',
'imaging': '1.2.0',
'analysis': '1.2.0',
}
Team Workflows¶
Development vs Production¶
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Development โ โ Production โ
โโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโค
โ dev_subject โ โ subject โ
โ dev_session โ โ session โ
โ dev_ephys โ โ ephys โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ
โ Schema promotion โ
โโโโโโโโโโโโโโโโโโโโโโโโโ
Branching Strategy¶
main โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโถ
โ โ
โ feature/ โ hotfix/
โผ โผ
ephys-v2 fix-recording
โ โ
โโโโโโโโโโโโโโโโดโโโถ main
Summary of Complexities¶
Managing a team pipeline requires coordinating:
| Component | Challenges |
|---|---|
| Code | Module dependencies, version control, deployment |
| Database | User accounts, schema permissions, role hierarchy |
| Object Storage | Bucket policies, credential distribution, path alignment |
| Compute | Worker deployment, job distribution, resource allocation |
| Monitoring | Progress tracking, error alerting, audit logging |
These challenges grow with team size and pipeline complexity. The DataJoint Platform provides integrated management for all these concerns.
See Also¶
- Deploy to Production โ Production mode and environment configuration
- Data Pipelines โ Conceptual overview and architecture
- Configure Object Storage โ Storage setup
- Distributed Computing โ Multi-worker pipelines
- Model Relationships โ Foreign key patterns