Skip to content

ingest.py

get_miniscope_root_data_dir()

Return root directory for miniscope from 'miniscope_root_data_dir' config as list

Returns:

Name Type Description
path any

List of path(s) if available or None

Source code in workflow_miniscope/paths.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def get_miniscope_root_data_dir() -> Union[list, None]:
    """Return root directory for miniscope from 'miniscope_root_data_dir' config as list

    Returns:
        path (any): List of path(s) if available or None
    """
    mini_root_dirs = dj.config.get("custom", {}).get("miniscope_root_data_dir")

    if not mini_root_dirs:
        return None
    elif not isinstance(mini_root_dirs, abc.Sequence):
        return list(mini_root_dirs)
    else:
        return mini_root_dirs

Device

Bases: dj.Lookup

Table for managing lab Devices.

Attributes:

Name Type Description
device varchar(32)

Device short name.

modality varchar(64)

Modality for which this device is used.

description varchar(256)

Optional. Description of device.

Source code in workflow_miniscope/reference.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@schema
class Device(dj.Lookup):
    """Table for managing lab Devices.

    Attributes:
        device ( varchar(32) ): Device short name.
        modality ( varchar(64) ): Modality for which this device is used.
        description ( varchar(256) ): Optional. Description of device.
    """

    definition = """
    device             : varchar(32)
    ---
    modality           : varchar(64)
    description=''     : varchar(256)
    """
    contents = [
        ["Miniscope_V4_BNO", "Miniscope", "V4 Miniscope with head orientation sensor."],
    ]

ingest_subjects(subject_csv_path='./user_data/subjects.csv', skip_duplicates=True, verbose=True)

Ingest subjects listed in the subject column of ./user_data/subjects.csv

Parameters:

Name Type Description Default
subject_csv_path str

Relative path to subject csv. Defaults to "./user_data/subjects.csv".

'./user_data/subjects.csv'
skip_duplicates bool

See DataJoint insert function. Default True.

True
verbose bool

Print number inserted (i.e., table length change). Defaults to True.

True
Source code in workflow_miniscope/ingest.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def ingest_subjects(
    subject_csv_path: str = "./user_data/subjects.csv",
    skip_duplicates: bool = True,
    verbose: bool = True,
):
    """Ingest subjects listed in the subject column of ./user_data/subjects.csv

    Args:
        subject_csv_path (str, optional): Relative path to subject csv.
            Defaults to "./user_data/subjects.csv".
        skip_duplicates (bool, optional): See DataJoint `insert` function. Default True.
        verbose (bool, optional): Print number inserted (i.e., table length change).
            Defaults to True.
    """
    csvs = [subject_csv_path]
    tables = [subject.Subject()]

    ingest_csv_to_table(csvs, tables, skip_duplicates=skip_duplicates, verbose=verbose)

ingest_sessions(session_csv_path='./user_data/sessions.csv', skip_duplicates=False, verbose=True)

Ingest session list from csv

Parameters:

Name Type Description Default
session_csv_path str

Relative path to CSV of sessions. Defaults to "./user_data/sessions.csv".

'./user_data/sessions.csv'
skip_duplicates bool

See DataJoint insert function. Default False.

False
verbose bool

Print number inserted. Defaults to True.

True

Raises:

Type Description
NotImplementedError

Only software Miniscope-DAQ-V3 or V4 implemented

FileNotFoundError

No .avi files found in session path

Source code in workflow_miniscope/ingest.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def ingest_sessions(
    session_csv_path: str = "./user_data/sessions.csv",
    skip_duplicates: bool = False,
    verbose: bool = True,
):
    """Ingest session list from csv

    Args:
        session_csv_path (str, optional): Relative path to CSV of sessions.
            Defaults to "./user_data/sessions.csv".
        skip_duplicates (bool, optional): See DataJoint `insert` function. Default False.
        verbose (bool, optional): Print number inserted. Defaults to True.

    Raises:
        NotImplementedError: Only software Miniscope-DAQ-V3 or V4 implemented
        FileNotFoundError: No .avi files found in session path
    """
    if not verbose:  # If non verbose, set logging to warn, reset later
        prev_loglevel = logger.level
        logger.setLevel("WARN")

    logger.info("---- Insert new `Session` and `Recording` ----")

    with open(session_csv_path, newline="") as f:
        input_sessions = list(csv.DictReader(f, delimiter=","))

    session_list, session_dir_list, recording_list, hardware_list = [], [], [], []

    for single_session in input_sessions:
        acq_software = single_session["acq_software"]
        if acq_software not in ["Miniscope-DAQ-V3", "Miniscope-DAQ-V4"]:
            raise NotImplementedError(
                f"Not implemented for acquisition software of " f"type {acq_software}."
            )

        # Folder structure: root / subject / session / .avi (raw)
        session_dir = pathlib.Path(single_session["session_dir"])
        session_path = find_full_path(get_miniscope_root_data_dir(), session_dir)
        recording_filepaths = [
            file_path.as_posix() for file_path in session_path.glob("*.avi")
        ]
        if not recording_filepaths:
            raise FileNotFoundError(f"No .avi files found in " f"{session_path}")

        # Read Miniscope DAQ *.json file
        for metadata_filepath in session_path.glob("metaData.json"):
            try:
                recording_time = datetime.fromtimestamp(
                    metadata_filepath.stat().st_ctime
                )
                with open(metadata_filepath) as json_file:
                    recording_metadata = json.load(json_file)
                acquisition_hardware = recursive_search(
                    "deviceType", recording_metadata
                )
                break
            except OSError:
                logger.warning(
                    f"Could not find `deviceType` in Miniscope-DAQ json: "
                    f"{metadata_filepath}"
                )
                continue

        session_key = dict(
            subject=single_session["subject"], session_datetime=recording_time
        )
        if session_key not in session.Session():
            hardware_list.append(
                dict(device=acquisition_hardware, modality="Miniscope")
            )

            session_list.append(session_key)

            session_dir_list.append(
                dict(**session_key, session_dir=session_dir.as_posix())
            )

            recording_list.append(
                dict(
                    **session_key,
                    recording_id=0,  # Assumes 1 recording per session
                    device=acquisition_hardware,
                    acq_software=acq_software,
                )
            )

    insert_string = "---- Inserting %s entry(s) into %s ----"  # defer number/table name

    prev_len = len(Device())
    Device.insert(hardware_list, skip_duplicates=True)  # expect duplicates for Device
    added_len = len(Device()) - prev_len
    logger.info(insert_string % (added_len, "reference.Device"))

    prev_len = len(session.Session())
    session.Session.insert(session_list, skip_duplicates=skip_duplicates)
    session.SessionDirectory.insert(session_dir_list, skip_duplicates=skip_duplicates)
    added_len = len(session.Session()) - prev_len
    logger.info(insert_string % (added_len, "session.Session"))

    prev_len = len(miniscope.Recording())
    miniscope.Recording.insert(recording_list, skip_duplicates=skip_duplicates)
    added_len = len(miniscope.Recording()) - prev_len
    logger.info(insert_string % (added_len, "miniscope.Recording"))

    logger.info("---- Successfully completed ingest_sessions ----")

    if not verbose:
        logger.setLevel(prev_loglevel)

ingest_events(recording_csv_path='./user_data/behavior_recordings.csv', block_csv_path='./user_data/blocks.csv', trial_csv_path='./user_data/trials.csv', event_csv_path='./user_data/events.csv', skip_duplicates=True, verbose=True)

Ingest each level of experiment hierarchy for element-trial

Ingestion hierarchy includes

recording, block (i.e., phases of trials), trials (repeated units), events (optionally 0-duration occurrences within trial).

Note: This ingestion function is duplicated across wf-array-ephys and wf-calcium-imaging

Parameters:

Name Type Description Default
recording_csv_path str

Relative path to recording csv. Defaults to "./user_data/behavior_recordings.csv".

'./user_data/behavior_recordings.csv'
block_csv_path str

Relative path to block csv. Defaults to "./user_data/blocks.csv".

'./user_data/blocks.csv'
trial_csv_path str

Relative path to trial csv. Defaults to "./user_data/trials.csv".

'./user_data/trials.csv'
event_csv_path str

Relative path to event csv. Defaults to "./user_data/events.csv".

'./user_data/events.csv'
skip_duplicates bool

See DataJoint insert function. Default True.

True
verbose bool

Print number inserted (i.e., table length change). Defaults to True.

True
Source code in workflow_miniscope/ingest.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def ingest_events(
    recording_csv_path: str = "./user_data/behavior_recordings.csv",
    block_csv_path: str = "./user_data/blocks.csv",
    trial_csv_path: str = "./user_data/trials.csv",
    event_csv_path: str = "./user_data/events.csv",
    skip_duplicates: bool = True,
    verbose: bool = True,
):
    """Ingest each level of experiment hierarchy for element-trial

    Ingestion hierarchy includes:
        recording, block (i.e., phases of trials), trials (repeated units),
        events (optionally 0-duration occurrences within trial).

    Note: This ingestion function is duplicated across wf-array-ephys and wf-calcium-imaging

    Args:
        recording_csv_path (str, optional): Relative path to recording csv.
            Defaults to "./user_data/behavior_recordings.csv".
        block_csv_path (str, optional): Relative path to block csv.
            Defaults to "./user_data/blocks.csv".
        trial_csv_path (str, optional): Relative path to trial csv.
            Defaults to "./user_data/trials.csv".
        event_csv_path (str, optional): Relative path to event csv.
            Defaults to "./user_data/events.csv".
        skip_duplicates (bool, optional): See DataJoint `insert` function. Default True.
        verbose (bool, optional): Print number inserted (i.e., table length change).
            Defaults to True.
    """
    csvs = [
        recording_csv_path,
        recording_csv_path,
        block_csv_path,
        block_csv_path,
        trial_csv_path,
        trial_csv_path,
        trial_csv_path,
        trial_csv_path,
        event_csv_path,
        event_csv_path,
        event_csv_path,
    ]
    tables = [
        event.BehaviorRecording(),
        event.BehaviorRecording.File(),
        trial.Block(),
        trial.Block.Attribute(),
        trial.TrialType(),
        trial.Trial(),
        trial.Trial.Attribute(),
        trial.BlockTrial(),
        event.EventType(),
        event.Event(),
        trial.TrialEvent(),
    ]

    ingest_csv_to_table(
        csvs,
        tables,
        skip_duplicates=skip_duplicates,
        verbose=verbose,
        allow_direct_insert=True,
    )

ingest_alignment(alignment_csv_path='./user_data/alignments.csv', skip_duplicates=True, verbose=True)

Ingest event alignment data from local CSVs

Note: This is duplicated across wf-array-ephys and wf-calcium-imaging

Parameters:

Name Type Description Default
alignment_csv_path str

Relative path to event alignment csv. Defaults to "./user_data/alignments.csv".

'./user_data/alignments.csv'
skip_duplicates bool

See DataJoint insert function. Default True.

True
verbose bool

Print number inserted (i.e., table length change). Defaults to True.

True
Source code in workflow_miniscope/ingest.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def ingest_alignment(
    alignment_csv_path: str = "./user_data/alignments.csv",
    skip_duplicates: bool = True,
    verbose: bool = True,
):
    """Ingest event alignment data from local CSVs

    Note: This is duplicated across wf-array-ephys and wf-calcium-imaging

    Args:
        alignment_csv_path (str, optional): Relative path to event alignment csv.
            Defaults to "./user_data/alignments.csv".
        skip_duplicates (bool, optional): See DataJoint `insert` function. Default True.
        verbose (bool, optional): Print number inserted (i.e., table length change).
            Defaults to True.
    """

    csvs = [alignment_csv_path]
    tables = [event.AlignmentEvent()]

    ingest_csv_to_table(csvs, tables, skip_duplicates=skip_duplicates, verbose=verbose)

Return value for key in a nested dictionary

Search through a nested dictionary for a key and returns its value. If there are more than one key with the same name at different depths, the algorithm returns the value of the least nested key.

Parameters:

Name Type Description Default
key str

Key used to search through a nested dictionary

required
dictionary dict

Nested dictionary

required

Returns:

Name Type Description
value any

value of the input argument key

Source code in workflow_miniscope/ingest.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def recursive_search(key, dictionary) -> any:
    """Return value for key in a nested dictionary

    Search through a nested dictionary for a key and returns its value.  If there are
    more than one key with the same name at different depths, the algorithm returns the
    value of the least nested key.

    Args:
        key (str): Key used to search through a nested dictionary
        dictionary (dict): Nested dictionary

    Returns:
        value (any): value of the input argument `key`
    """
    if key in dictionary:
        return dictionary[key]
    for value in dictionary.values():
        if isinstance(value, dict):
            a = recursive_search(key, value)
            if a is not None:
                return a
    return None