Skip to content

vodex.experiment

This module contains the 'Experiment' class, which provides a summary of the information about an experiment. The class can initialise, save, and load the database, search for frames based on volumes or annotations, and load image data using the appropriate loader. To initialise the database, it integrates the information from the FileManager, FrameManager, VolumeManager, as well as Annotations, to create a database.

Experiment

The class can initialise, save, and load the database, search for frames based on volumes or annotations, and load image data using the appropriate loader. To initialise the database, it integrates the information from the File, Frame, and Volume managers, as well as Annotations, to create a database.

Parameters:

Name Type Description Default
db_reader DbReader

a DbReader object connected to the database with the experiment description.

required

Attributes:

Name Type Description
db

a DbReader object connected to the database with the experiment description.

loader ImageLoader

an ImageLoader object to load metadata and image data from files.

Source code in src/vodex/experiment.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
class Experiment:
    """
    The class can initialise, save, and load the database, search for frames based on volumes or annotations, and load image
    data using the appropriate loader. To initialise the database, it integrates the information from the File, Frame,
    and Volume managers, as well as Annotations, to create a database.

    Args:
        db_reader: a DbReader object connected to the database with the experiment description.

    Attributes:
        db: a DbReader object connected to the database with the experiment description.
        loader: an ImageLoader object to load metadata and image data from files.
    """

    def __init__(self, db_reader: DbReader):
        """
        Initialize the experiment with the given DbReader object.
        """

        assert isinstance(db_reader, DbReader), "Need DbReader to initialise the Experiment"

        self.db = db_reader
        # will add the loader the first time you are loading anything
        # in load_frames() or load_volumes()
        self.loader: ImageLoader

    @property
    def n_frames(self) -> int:
        """
        Returns the total number of frames in the experiment.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_n_frames()

    @property
    def n_volumes(self) -> int:
        """
        Returns the total number of volumes in the experiment.
        This might include partial volumes at the beginning of the experiment (ID: -1)
        and at the end of the experiment (ID : -2).
        """
        # TODO: cash this value when property is called for the first time
        return len(self.db.get_volume_list())

    @property
    def n_full_volumes(self) -> int:
        """
        Returns the total number of full volumes in the experiment.
        """
        # TODO: cash this value when property is called for the first time
        options = self.db.get_options()
        return int(options['num_full_volumes'])

    @property
    def n_head_frames(self) -> int:
        """
        Returns the number of frames in the first partial volume,
        or 0 if there is no partial volume at the beginning.
        """
        # TODO: cash this value when property is called for the first time
        options = self.db.get_options()
        return int(options['num_head_frames'])

    @property
    def n_tail_frames(self) -> int:
        """
        Returns the number of frames in the last partial volume,
        or 0 if there is no partial volume at the end.
        """
        # TODO: cash this value when property is called for the first time
        options = self.db.get_options()
        return int(options['num_tail_frames'])

    @property
    def volumes(self) -> npt.NDArray:
        """
        Returns the list of volume IDs in the experiment.
        This might include partial volumes at the beginning of the experiment (ID: -1)
        and at the end of the experiment (ID : -2).
        """
        # TODO: cash this value when property is called for the first time
        volume_list = np.array(self.db.get_volume_list())
        if np.sum(volume_list == -1) > 0:
            warnings.warn(f"The are some frames at the beginning of the recording "
                          f"that don't correspond to a full volume.")
        if np.sum(volume_list == -2) > 0:
            warnings.warn(f"The are some frames at the end of the recording "
                          f"that don't correspond to a full volume.")
        return volume_list

    @property
    def full_volumes(self) -> npt.NDArray:
        """
        Returns the list of full volume IDs in the experiment.
        """
        volume_list = self.volumes
        return volume_list[volume_list >= 0]

    def batch_volumes(self, batch_size: int, overlap: int = 0,
                      volumes: Optional[Union[npt.NDArray, List[int]]] = None,
                      full_only: bool = True) -> List[List[int]]:
        """
        Returns a list of volume IDs that can be used to load batches of volumes.
        The batch size is given in number of volumes, and the overlap is given in number of volumes.
        If full_only is True, only full volumes are returned.

        Args:
            batch_size: the number of volumes in each batch.
            overlap: the number of volumes that overlap between batches.
            volumes: the list of volumes to be batched.
            full_only: if True, only full volumes are returned. If volumes is not None, this argument is ignored.

        Returns:
            A list of lists (n_batches x batch_size) of volume IDs that can be used to load batches of volumes.
        """
        if overlap >= batch_size:
            raise ValueError("Overlap must be smaller than batch size.")

        if volumes is not None:
            volume_list = volumes
        else:
            if full_only:
                volume_list = self.full_volumes
            else:
                volume_list = self.volumes

        # turn into a list if numpy array
        if isinstance(volume_list, np.ndarray):
            volume_list = volume_list.tolist()

        batch_list = []
        for i in range(0, len(volume_list), batch_size - overlap):
            batch_list.append(volume_list[i:i + batch_size])
        return batch_list

    @property
    def annotations(self) -> List[str]:
        """
        Returns the list of annotation names that have been added to the experiment.
        """
        return self.db.get_Names_from_AnnotationTypes()

    @property
    def labels(self) -> dict:
        """
        Returns a dict with annotation names, labels and label descriptions
        that have been added to the experiment.
        """
        annotation_names = self.annotations
        label_dict = {}
        for annotation_name in annotation_names:
            label_names, descriptions = self.db.get_Name_and_Description_from_AnnotationTypeLabels(annotation_name)
            label_dict[annotation_name] = {
                'labels': label_names,
                'descriptions': descriptions}
        return label_dict

    @property
    def labels_df(self) -> pd.DataFrame:
        """
        Returns a dataframe with annotation names, labels and label descriptions
        that have been added to the experiment.
        """
        annotation_names = self.annotations
        label_dict = {'annotation': [], 'label': [], 'description': []}
        for annotation_name in annotation_names:
            label_names, descriptions = self.db.get_Name_and_Description_from_AnnotationTypeLabels(annotation_name)
            label_dict['annotation'].extend([annotation_name] * len(label_names))
            label_dict['label'].extend(label_names)
            label_dict['description'].extend([descriptions[key] for key in label_names])
        return pd.DataFrame(label_dict)

    @property
    def cycles(self) -> List[str]:
        """
        Returns the list of cycle names that have been added to the experiment.
        """
        return self.db.get_cycle_names()

    @property
    def file_names(self) -> List[str]:
        """
        Returns the list of file names that have been added to the experiment.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_file_names()

    @property
    def frames_per_file(self) -> List[int]:
        """
        Returns the list of frames per file that have been added to the experiment.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_frames_per_file()

    @property
    def data_dir(self) -> str:
        """
        Returns the path to the data directory.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_data_dir()

    @property
    def frames_per_volume(self) -> int:
        """
        Returns the number of frames per volume.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_fpv()

    @property
    def starting_slice(self) -> int:
        """
        Returns the number of the first slice in the experiment.
        """
        # TODO: cash this value when property is called for the first time
        return self.db.get_fgf()  # fgf stands for first good frame

    @classmethod
    def create(cls, volume_manager: VolumeManager, annotations: List[Annotation], verbose: bool = False):
        """
        Creates a database instance from the core classes and initialises the experiment.

        Args:
            volume_manager: VolumeManager object that summarises the information about the image data.
            annotations: list of annotations to add to the experiment descriptions.
            verbose: whether to print the information about Filemanager, VolumeManager and Annotations on the screen.

        Returns:
            (Experiment): initialised experiment.
        """
        if verbose:
            print(volume_manager.file_manager)
            print(volume_manager)
            for annotation in annotations:
                print(annotation)
                if annotation.cycle is not None:
                    print(annotation.cycle_info())

        db = DbWriter.create()
        db.populate(volumes=volume_manager, annotations=annotations)
        db_reader = DbReader(db.connection)
        return cls(db_reader)

    @classmethod
    def from_dir(cls, dir_name: Union[Path, str], frames_per_volume: int,
                 starting_slice: int = 0, verbose: bool = False):
        """
        Creates a database instance from a directory and initialises the experiment.
        The directory should contain the image files.
        Annotations are not initialised, but can be added later.
        """
        # initialise volume manager
        volume_manager = VolumeManager.from_dir(dir_name, frames_per_volume, fgf=starting_slice)
        return cls.create(volume_manager, [], verbose)

    def save(self, file_name: Union[Path, str]):
        """
        Saves a database into a file.

        Args:
            file_name: full path to a file to save database.
                (Usually the filename would end with .db)
        """
        DbWriter(self.db.connection).save(file_name)

    def add_annotations(self, annotations: List[Annotation]):
        """
        Adds annotations to existing experiment.
        Does NOT save the changes to disc! run self.save() to save.

        Args:
            annotations: a list of annotations to add to the database.
        """
        DbWriter(self.db.connection).add_annotations(annotations)

    def add_annotations_from_df(self, annotation_df: pd.DataFrame,
                                cycles: Union[List[str], bool] = False,
                                timing_conversion: Optional[dict] = None,
                                groups: Optional[str] = None,
                                info: Optional[dict] = None):
        """
        Adds annotations to existing experiment from a data frame.
        Does NOT save the changes to disc! run self.save() to save.

        Args:
            annotation_df: a dataframe with the annotation information
            cycles: a list of the annotation names that are cycles or a boolean.
                If False, all annotations are assumed to be timelines.
                If True, all annotations are assumed to be cycles.
                Specified as {'cycles': }
            timing_conversion: a dictionary to convert the timing of the annotation.
                For example, if you want to convert the timing from frames to seconds,
                and you were recording at 30 frames per second, you can use
                timing_conversion = {'frames': 1, 'seconds': 1/30}
                You can list multiple units in the dictionary, and the timing will be converted to all of them,
                for example if there are also 10 frames per volume, you can use:
                timing_conversion = {'frames': 1, 'seconds': 1/30, 'volumes': 1/10}
                You must include 'frames' in the dictionary! The value of frames does not have to be 1,
                but it must be consistent with the other units. the rest of the values.
                for example this is valid for the example above:
                timing_conversion = {'frames': 10, 'seconds': 1/3, 'volumes': 1}.
                If timing_conversion is None, then the timing is not converted
                and 'duration_frames' must be provided in the dataframe.
            groups: the group of the annotation if there are multiple groups in the dataframe.
                If None, all groups are added.
            info: additional information about the annotation, dictionary with keys:
                'annotation name': information
        """
        n_frames = self.db.get_n_frames()

        if groups is None:
            groups = annotation_df['group'].unique()

        annotations = []
        for group in groups:
            group_df = annotation_df[annotation_df['group'] == group]

            if cycles is True or (isinstance(cycles, list) and group in cycles):
                is_cycle = True
            else:
                is_cycle = False

            if info is not None and group in info:
                group_info = info[group]
            else:
                group_info = None

            annotations.append(Annotation.from_df(n_frames, group_df,
                                                  timing_conversion, is_cycle, group_info))

        self.add_annotations(annotations)

    def delete_annotations(self, annotation_names: List[str]):
        """
        Deletes annotations from existing experiment.
        Does NOT save the changes to disc! run self.save() to save.

        Args:
            annotation_names: a list of annotation names to delete from the database.
        """
        for name in annotation_names:
            DbWriter(self.db.connection).delete_annotation(name)

    def close(self):
        """
        Close database connection.
        """
        self.db.connection.close()

    @classmethod
    def load(cls, file_name: Union[Path, str]):
        """
        Loads a database from a file and initialises an Experiment.

        Args:
            file_name: full path to a file to database.
        Return:
            (Experiment): initialised experiment.
        """
        db_reader = DbReader.load(file_name)
        return cls(db_reader)

    def choose_frames(self, conditions: Union[tuple, List[Tuple[str, str]]], logic: str = "and") -> List[int]:
        """
        Selects the frames that correspond to specified conditions;
        Uses "or" or "and" between the conditions depending on logic.
        To load the selected frames, use load_frames().

        Args:
            conditions: a list of conditions on the annotation labels
                in a form [(group, name),(group, name), ...] where group is a string for the annotation type
                and name is the name of the label of that annotation type. For example [('light', 'on'), ('shape','c')]
            logic: "and" or "or" , default is "and".
        Returns:
            list of frame ids that were chosen. Remember that frame numbers start at 1.
        """
        assert logic == "and" or logic == "or", \
            'between_group_logic should be equal to "and" or "or"'
        frames = []
        if logic == "and":
            frames = self.db.get_and_frames_per_annotations(conditions)
        elif logic == "or":
            frames = self.db.get_or_frames_per_annotations(conditions)

        return frames

    def choose_volumes(self, conditions: Union[tuple, List[Tuple[str, str]]], logic: str = "and",
                       verbose: bool = False) -> List[int]:
        """
        Selects only full volumes that correspond to specified conditions;
        Uses "or" or "and" between the conditions depending on logic.
        To load the selected volumes, use load_volumes()

        Args:
            verbose: Whether to print the information about how many frames were choose/ dropped
            conditions: a list of conditions on the annotation labels
                in a form [(group, name),(group, name), ...] where group is a string for the annotation type
                and name is the name of the label of that annotation type.
                For example [('light', 'on'), ('shape','c')]
            logic: "and" or "or" , default is "and".
        Returns:
            list of volumes that were chosen.
            Remember that frame numbers start at 1, but volumes start at 0.
        """
        # TODO : make all indices start at 1 ?

        assert isinstance(conditions, list) or isinstance(conditions, tuple), f"conditions must be a list or a tuple," \
                                                                              f" but got {type(conditions)} instead"
        if isinstance(conditions, tuple):
            conditions = [conditions]

        # get all the frames that correspond to the conditions
        frames = self.choose_frames(conditions, logic=logic)
        n_frames = len(frames)
        # leave only such frames that correspond to full volumes
        # TODO : not necessary to return the frames?
        volumes, frames = self.db.choose_full_volumes(frames)
        n_dropped = n_frames - len(frames)
        if verbose:
            print(f"Choosing only full volumes. "
                  f"Dropped {n_dropped} frames, kept {len(frames)}")

        return volumes

    def load_volumes(self, volumes: Union[npt.NDArray, List[int]], verbose: bool = False) -> npt.NDArray:
        """
        Load volumes. Will load the specified full volumes.
        All the returned volumes or slices should have the same number of frames in them.

        Args:
            volumes: the indexes of volumes to load.
            verbose: Whether to print the information about the loading
        Returns:
            4D array with the loaded volumes. TZYX order.
        """
        # if array convert to list of int
        if isinstance(volumes, np.ndarray):
            # make sure it is a 1D array
            assert len(volumes.shape) == 1, "volumes must be a 1D array"

            # make sure all the volumes can be safely converted to integers
            assert np.all(volumes.astype(int) == volumes), "All the volumes must be integers"
            volumes = volumes.astype(int).tolist()

        frames = self.db.get_frames_per_volumes(volumes)
        info = self.db.prepare_frames_for_loading(frames)

        # unpack
        data_dir, file_names, file_ids, frame_in_file, volumes_per_frame = info
        # get unique volumes and check that they are the same as the ones we asked for
        assert set(volumes_per_frame) == set(volumes), "Requested volumes" \
                                                       f" {set(volumes).difference(set(volumes_per_frame))} " \
                                                       "can not be found"
        # make full paths to files ( remember file ids start with 1 )
        files = [Path(data_dir, file_names[file_id - 1]) for file_id in file_ids]
        if not hasattr(self, "loader"):
            self.loader = ImageLoader(Path(data_dir, file_names[0]))
        volumes_img = self.loader.load_volumes(frame_in_file,
                                               files,
                                               volumes_per_frame,
                                               show_file_names=False,
                                               show_progress=verbose)
        return volumes_img

    def get_volume_annotations(self, volumes: Union[npt.NDArray, List[int]],
                               annotation_names: Optional[List[str]] = None) -> Dict[str, List[str]]:
        """
        Get annotations for volumes.
        Will get the labels for the specified full volumes from each available annotation.

        Args:
            volumes: the indexes of volumes to get annotation for. If a multidimensional array is passed,
                will flatten it and get annotations for all the volumes in it.
            annotation_names: the names of the annotations to get. If None, will get all the annotations.

        Returns:
            a dictionary with the annotations for each annotation type.
            The keys are the annotation types, the values are lists of labels for each volume.
            The last key is "volumes" and the value is a list of volumes.
        """
        # TODO: throw a warning if some volumes are not in the database

        # if array convert to list of int
        if isinstance(volumes, np.ndarray):
            # turn into a 1D array
            volumes = volumes.flatten()
            # make sure all the volumes can be safely converted to integers
            assert np.all(volumes.astype(int) == volumes), "All the volumes must be integers"
            volumes = volumes.astype(int).tolist()

        # get annotations for the volumes
        annotations = self.db.get_volume_annotations(volumes, annotation_names=annotation_names)

        # prepare dict for the annotations
        annotation = {key: [] for key in annotations.keys()}
        annotation["volumes"] = []

        # get a single label per volume
        for volume in volumes:
            for group, data in annotations.items():
                volume_ids = np.array(data["volume_ids"])
                labels = np.array(data["labels"])
                # check that the volume has the same labels
                labels_per_volume = set(labels[volume_ids == volume])
                if len(labels_per_volume) > 1:
                    raise ValueError(f"Volume {volume} has different labels ({labels_per_volume}) "
                                     f"for the same annotation {group}. Can't assign a single label to the volume.")
                # add the label to the dict
                annotation[group].append(list(labels_per_volume)[0])
            # add the volume to the dict
            annotation["volumes"].append(volume)

        return annotation

    def get_volume_annotation_df(self, volumes: Union[npt.NDArray, List[int]],
                                 annotation_names: Optional[List[str]] = None) -> pd.DataFrame:
        """
        Get annotations for volumes.
        Will get the labels for the specified full volumes from each available annotation as pandas dataframe.
        Args:
            volumes: the indexes of volumes to get annotation for. If a multidimensional array is passed,
                will flatten it and get annotations for all the volumes in it.
            annotation_names: the names of the annotations to get. If None, will get all the annotations.

        Returns:
            a dataframe with the annotations for each annotation type.
            The columns are volumes and the annotation types with the corresponding labels for each volume.
        """
        # get annotations for the volumes
        annotations = self.get_volume_annotations(volumes, annotation_names=annotation_names)
        return pd.DataFrame(annotations)

    def add_annotations_from_volume_annotation_df(self, volume_annotation_df: pd.DataFrame,
                                                  annotation_names: Optional[List[str]] = None):
        """
        Add annotations from volume_annotation dataframe to the experiment.
        Use it if you have cropped the volumes from the original movie and
        want to add the annotations to the cropped movie.
        The format of the dataframe should be the same as the one returned by get_volume_annotation_df.
        The length of the volumes should be the same as the length of the experiment.
        Will only work for annotation types that are constant for the whole volume. If you have annotations that change
        within the volume, you will need to exclude them.

        Args:
            volume_annotation_df: the dataframe with the annotations.
            annotation_names: the names of the annotations to add. These must be the column names in the table.
                If None, will add all the annotations in the table and will
                assume that all the columns in the dataframe that are not "volumes" are the annotation names.
                ! If you have modified the table to add additional columns that are NOT annotations,
                you must specify the annotation names,
                otherwise vodex will attempt to add those columns as annotations!
        """
        # make a copy of the dataframe to avoid modifying the original
        volume_annotation_df = volume_annotation_df.copy()

        # get the columns in the dataframe that are not "
        if annotation_names is None:
            annotation_names = list(volume_annotation_df.columns)
            annotation_names.remove("volumes")

        # add duration column to the dataframe
        volume_annotation_df["duration"] = self.frames_per_volume

        # get index of the volume -1 (head)
        head_volume = volume_annotation_df[volume_annotation_df["volumes"] == -1].index
        # get index of the volume -2 (tail)
        tail_volume = volume_annotation_df[volume_annotation_df["volumes"] == -2].index
        # set the duration for partial volumes
        if len(head_volume) > 0:
            volume_annotation_df.loc[head_volume, "duration"] = self.n_head_frames
            # assign the row to the beginning of the dataframe
            volume_annotation_df = pd.concat([volume_annotation_df.loc[head_volume],
                                              volume_annotation_df.drop(head_volume)])
        if len(tail_volume) > 0:
            volume_annotation_df.loc[tail_volume, "duration"] = self.n_tail_frames
            # assign the row to the end of the dataframe
            volume_annotation_df = pd.concat([volume_annotation_df.drop(tail_volume),
                                              volume_annotation_df.loc[tail_volume]])

        # add the annotations to the experiment
        for annotation_name in annotation_names:
            # get the labels for the annotation
            annotation_df = pd.DataFrame({"group": np.repeat(annotation_name, len(volume_annotation_df)).astype(str),
                                          "name": volume_annotation_df[annotation_name].values,
                                          "duration_frames": volume_annotation_df["duration"].values})
            # add the annotation
            self.add_annotations_from_df(annotation_df)

    def load_slices(self, slices: List[int], volumes: List[int] = None,
                    skip_missing: bool = False, verbose: bool = False) -> npt.NDArray:
        """
        Load volumes. Will load the specified full volumes.
        All the returned volumes or slices should have the same number of frames in them.

        Args:
            slices: the indexes of slices in the volumes to load.
            volumes: the indexes of volumes to load slices for. If None, will load slices for all volumes.
            skip_missing: Whether to skip missing volumes.
                If False, will raise an error if a slice is missing for any volume.
            verbose: Whether to print the information about the loading
        Returns:
            4D array with the loaded slices for selected volumes. TZYX order.
        """
        if volumes is None:
            volumes = self.db.get_volume_list()

        frames = self.db.get_frames_per_volumes(volumes, slices=slices)
        info = self.db.prepare_frames_for_loading(frames)

        # unpack and load
        data_dir, file_names, file_ids, frame_in_file, volumes_per_frame = info

        # get unique volumes and check that they are the same as the ones we asked for
        if skip_missing:  # throw a warning
            if set(volumes_per_frame) != set(volumes):
                warnings.warn(f"Requested volumes {set(volumes).difference(set(volumes_per_frame))} " +
                              f"are not present in the slices {slices}. " +
                              f"Loaded slices for {set(volumes_per_frame)} volumes.")
        else:  # throw an error
            assert set(volumes_per_frame) == set(volumes), \
                f"Requested volumes {set(volumes).difference(set(volumes_per_frame))} " \
                f"are not present in the slices {slices}. "

        # make full paths to files ( remember file ids start with 1 )
        files = [Path(data_dir, file_names[file_id - 1]) for file_id in file_ids]
        if not hasattr(self, "loader"):
            self.loader = ImageLoader(Path(data_dir, file_names[0]))

        volumes_img = self.loader.load_volumes(frame_in_file,
                                               files,
                                               volumes_per_frame,
                                               show_file_names=False,
                                               show_progress=verbose)

        # if the z dimension is smaller than the number of slices, throw a warning
        if volumes_img.shape[1] < len(slices):
            warnings.warn(f"Some of the requested slices {slices} are not present in the volumes. " +
                          f"Loaded {volumes_img.shape[1]} slices instead of {len(slices)}")

        return volumes_img

    def list_volumes(self) -> npt.NDArray[int]:
        """
        Returns a list of all the volumes IDs in the experiment.
        If partial volumes are present: for "head" returns -1, for "tail" returns -2.

        Returns:
            list of volume IDs
        """
        # TODO : Remove this function and use volumes property instead
        warnings.warn(f"list_volumes will be removed in vodex 1.1.0 use volumes property instead.")

        return self.volumes

    def list_conditions_per_cycle(self, annotation_type: str, as_volumes: bool = True) -> Tuple[List[int], List[str]]:
        """
        Returns a list of conditions per cycle.

        Args:
            annotation_type: The name of the annotation for which to get the conditions list
            as_volumes: weather to return conditions per frame (default) or per volume.
                If as_volumes is true, it is expected that the conditions are not changing in the middle of the volume.
                Will throw an error if it happens.
        Returns:
            list of the condition ids ( condition per frame or per volume) and corresponding condition names.
        """

        # TODO : check if empty
        if as_volumes:
            _, condition_ids, count = self.db.get_conditionIds_per_cycle_per_volumes(annotation_type)
            fpv = self.db.get_fpv()
            assert np.all(np.array(count) == fpv), "Can't list_conditions_per_cycle with as_volumes=True: " \
                                                   "some conditions don't cover the whole volume." \
                                                   "You might want to get conditions per frame," \
                                                   " by setting as_volumes=False"
        else:
            _, condition_ids = self.db.get_conditionIds_per_cycle_per_frame(annotation_type)
        names = self.db._get_Names_from_AnnotationTypeLabels()

        return condition_ids, names

    def list_cycle_iterations(self, annotation_type: str, as_volumes: bool = True) -> List[int]:
        """
        Returns a list of cycle iterations for a specified annotation.
        The annotation must have been initialised from a cycle.

        Args:
            annotation_type: The name of the annotation for which to get the cycle iteratoins list
            as_volumes: weather to return cycle iteratoins per frame ( default) or per volume.
                If as_volumes is true, it is expected that the cycle iteratoins are not changing in the middle of the volume.
                Will throw an error if it happens.
            as_volumes: bool
        Returns:
            list of the condition ids (cycle iterations per frame or per volume)
        """

        if as_volumes:
            _, cycle_its, count = self.db.get_cycleIterations_per_volumes(annotation_type)
            fpv = self.db.get_fpv()
            assert np.all(np.array(count) == fpv), "Can't list_cycle_iterations with as_volumes=True: " \
                                                   "some iterations don't cover the whole volume." \
                                                   "You might want to get iterations per frame," \
                                                   " by setting as_volumes=False"
        else:
            _, cycle_its = self.db.get_cycleIterations_per_frame(annotation_type)

        return cycle_its

annotations: List[str] property

Returns the list of annotation names that have been added to the experiment.

cycles: List[str] property

Returns the list of cycle names that have been added to the experiment.

data_dir: str property

Returns the path to the data directory.

file_names: List[str] property

Returns the list of file names that have been added to the experiment.

frames_per_file: List[int] property

Returns the list of frames per file that have been added to the experiment.

frames_per_volume: int property

Returns the number of frames per volume.

full_volumes: npt.NDArray property

Returns the list of full volume IDs in the experiment.

labels: dict property

Returns a dict with annotation names, labels and label descriptions that have been added to the experiment.

labels_df: pd.DataFrame property

Returns a dataframe with annotation names, labels and label descriptions that have been added to the experiment.

n_frames: int property

Returns the total number of frames in the experiment.

n_full_volumes: int property

Returns the total number of full volumes in the experiment.

n_head_frames: int property

Returns the number of frames in the first partial volume, or 0 if there is no partial volume at the beginning.

n_tail_frames: int property

Returns the number of frames in the last partial volume, or 0 if there is no partial volume at the end.

n_volumes: int property

Returns the total number of volumes in the experiment. This might include partial volumes at the beginning of the experiment (ID: -1) and at the end of the experiment (ID : -2).

starting_slice: int property

Returns the number of the first slice in the experiment.

volumes: npt.NDArray property

Returns the list of volume IDs in the experiment. This might include partial volumes at the beginning of the experiment (ID: -1) and at the end of the experiment (ID : -2).

__init__(db_reader)

Initialize the experiment with the given DbReader object.

Source code in src/vodex/experiment.py
def __init__(self, db_reader: DbReader):
    """
    Initialize the experiment with the given DbReader object.
    """

    assert isinstance(db_reader, DbReader), "Need DbReader to initialise the Experiment"

    self.db = db_reader
    # will add the loader the first time you are loading anything
    # in load_frames() or load_volumes()
    self.loader: ImageLoader

add_annotations(annotations)

Adds annotations to existing experiment. Does NOT save the changes to disc! run self.save() to save.

Parameters:

Name Type Description Default
annotations List[Annotation]

a list of annotations to add to the database.

required
Source code in src/vodex/experiment.py
def add_annotations(self, annotations: List[Annotation]):
    """
    Adds annotations to existing experiment.
    Does NOT save the changes to disc! run self.save() to save.

    Args:
        annotations: a list of annotations to add to the database.
    """
    DbWriter(self.db.connection).add_annotations(annotations)

add_annotations_from_df(annotation_df, cycles=False, timing_conversion=None, groups=None, info=None)

Adds annotations to existing experiment from a data frame. Does NOT save the changes to disc! run self.save() to save.

Parameters:

Name Type Description Default
annotation_df DataFrame

a dataframe with the annotation information

required
cycles Union[List[str], bool]

a list of the annotation names that are cycles or a boolean. If False, all annotations are assumed to be timelines. If True, all annotations are assumed to be cycles. Specified as {'cycles': }

False
timing_conversion Optional[dict]

a dictionary to convert the timing of the annotation. For example, if you want to convert the timing from frames to seconds, and you were recording at 30 frames per second, you can use timing_conversion = {'frames': 1, 'seconds': 1/30} You can list multiple units in the dictionary, and the timing will be converted to all of them, for example if there are also 10 frames per volume, you can use: timing_conversion = {'frames': 1, 'seconds': 1/30, 'volumes': 1/10} You must include 'frames' in the dictionary! The value of frames does not have to be 1, but it must be consistent with the other units. the rest of the values. for example this is valid for the example above: timing_conversion = {'frames': 10, 'seconds': 1/3, 'volumes': 1}. If timing_conversion is None, then the timing is not converted and 'duration_frames' must be provided in the dataframe.

None
groups Optional[str]

the group of the annotation if there are multiple groups in the dataframe. If None, all groups are added.

None
info Optional[dict]

additional information about the annotation, dictionary with keys: 'annotation name': information

None
Source code in src/vodex/experiment.py
def add_annotations_from_df(self, annotation_df: pd.DataFrame,
                            cycles: Union[List[str], bool] = False,
                            timing_conversion: Optional[dict] = None,
                            groups: Optional[str] = None,
                            info: Optional[dict] = None):
    """
    Adds annotations to existing experiment from a data frame.
    Does NOT save the changes to disc! run self.save() to save.

    Args:
        annotation_df: a dataframe with the annotation information
        cycles: a list of the annotation names that are cycles or a boolean.
            If False, all annotations are assumed to be timelines.
            If True, all annotations are assumed to be cycles.
            Specified as {'cycles': }
        timing_conversion: a dictionary to convert the timing of the annotation.
            For example, if you want to convert the timing from frames to seconds,
            and you were recording at 30 frames per second, you can use
            timing_conversion = {'frames': 1, 'seconds': 1/30}
            You can list multiple units in the dictionary, and the timing will be converted to all of them,
            for example if there are also 10 frames per volume, you can use:
            timing_conversion = {'frames': 1, 'seconds': 1/30, 'volumes': 1/10}
            You must include 'frames' in the dictionary! The value of frames does not have to be 1,
            but it must be consistent with the other units. the rest of the values.
            for example this is valid for the example above:
            timing_conversion = {'frames': 10, 'seconds': 1/3, 'volumes': 1}.
            If timing_conversion is None, then the timing is not converted
            and 'duration_frames' must be provided in the dataframe.
        groups: the group of the annotation if there are multiple groups in the dataframe.
            If None, all groups are added.
        info: additional information about the annotation, dictionary with keys:
            'annotation name': information
    """
    n_frames = self.db.get_n_frames()

    if groups is None:
        groups = annotation_df['group'].unique()

    annotations = []
    for group in groups:
        group_df = annotation_df[annotation_df['group'] == group]

        if cycles is True or (isinstance(cycles, list) and group in cycles):
            is_cycle = True
        else:
            is_cycle = False

        if info is not None and group in info:
            group_info = info[group]
        else:
            group_info = None

        annotations.append(Annotation.from_df(n_frames, group_df,
                                              timing_conversion, is_cycle, group_info))

    self.add_annotations(annotations)

add_annotations_from_volume_annotation_df(volume_annotation_df, annotation_names=None)

Add annotations from volume_annotation dataframe to the experiment. Use it if you have cropped the volumes from the original movie and want to add the annotations to the cropped movie. The format of the dataframe should be the same as the one returned by get_volume_annotation_df. The length of the volumes should be the same as the length of the experiment. Will only work for annotation types that are constant for the whole volume. If you have annotations that change within the volume, you will need to exclude them.

Parameters:

Name Type Description Default
volume_annotation_df DataFrame

the dataframe with the annotations.

required
annotation_names Optional[List[str]]

the names of the annotations to add. These must be the column names in the table. If None, will add all the annotations in the table and will assume that all the columns in the dataframe that are not "volumes" are the annotation names. ! If you have modified the table to add additional columns that are NOT annotations, you must specify the annotation names, otherwise vodex will attempt to add those columns as annotations!

None
Source code in src/vodex/experiment.py
def add_annotations_from_volume_annotation_df(self, volume_annotation_df: pd.DataFrame,
                                              annotation_names: Optional[List[str]] = None):
    """
    Add annotations from volume_annotation dataframe to the experiment.
    Use it if you have cropped the volumes from the original movie and
    want to add the annotations to the cropped movie.
    The format of the dataframe should be the same as the one returned by get_volume_annotation_df.
    The length of the volumes should be the same as the length of the experiment.
    Will only work for annotation types that are constant for the whole volume. If you have annotations that change
    within the volume, you will need to exclude them.

    Args:
        volume_annotation_df: the dataframe with the annotations.
        annotation_names: the names of the annotations to add. These must be the column names in the table.
            If None, will add all the annotations in the table and will
            assume that all the columns in the dataframe that are not "volumes" are the annotation names.
            ! If you have modified the table to add additional columns that are NOT annotations,
            you must specify the annotation names,
            otherwise vodex will attempt to add those columns as annotations!
    """
    # make a copy of the dataframe to avoid modifying the original
    volume_annotation_df = volume_annotation_df.copy()

    # get the columns in the dataframe that are not "
    if annotation_names is None:
        annotation_names = list(volume_annotation_df.columns)
        annotation_names.remove("volumes")

    # add duration column to the dataframe
    volume_annotation_df["duration"] = self.frames_per_volume

    # get index of the volume -1 (head)
    head_volume = volume_annotation_df[volume_annotation_df["volumes"] == -1].index
    # get index of the volume -2 (tail)
    tail_volume = volume_annotation_df[volume_annotation_df["volumes"] == -2].index
    # set the duration for partial volumes
    if len(head_volume) > 0:
        volume_annotation_df.loc[head_volume, "duration"] = self.n_head_frames
        # assign the row to the beginning of the dataframe
        volume_annotation_df = pd.concat([volume_annotation_df.loc[head_volume],
                                          volume_annotation_df.drop(head_volume)])
    if len(tail_volume) > 0:
        volume_annotation_df.loc[tail_volume, "duration"] = self.n_tail_frames
        # assign the row to the end of the dataframe
        volume_annotation_df = pd.concat([volume_annotation_df.drop(tail_volume),
                                          volume_annotation_df.loc[tail_volume]])

    # add the annotations to the experiment
    for annotation_name in annotation_names:
        # get the labels for the annotation
        annotation_df = pd.DataFrame({"group": np.repeat(annotation_name, len(volume_annotation_df)).astype(str),
                                      "name": volume_annotation_df[annotation_name].values,
                                      "duration_frames": volume_annotation_df["duration"].values})
        # add the annotation
        self.add_annotations_from_df(annotation_df)

batch_volumes(batch_size, overlap=0, volumes=None, full_only=True)

Returns a list of volume IDs that can be used to load batches of volumes. The batch size is given in number of volumes, and the overlap is given in number of volumes. If full_only is True, only full volumes are returned.

Parameters:

Name Type Description Default
batch_size int

the number of volumes in each batch.

required
overlap int

the number of volumes that overlap between batches.

0
volumes Optional[Union[NDArray, List[int]]]

the list of volumes to be batched.

None
full_only bool

if True, only full volumes are returned. If volumes is not None, this argument is ignored.

True

Returns:

Type Description
List[List[int]]

A list of lists (n_batches x batch_size) of volume IDs that can be used to load batches of volumes.

Source code in src/vodex/experiment.py
def batch_volumes(self, batch_size: int, overlap: int = 0,
                  volumes: Optional[Union[npt.NDArray, List[int]]] = None,
                  full_only: bool = True) -> List[List[int]]:
    """
    Returns a list of volume IDs that can be used to load batches of volumes.
    The batch size is given in number of volumes, and the overlap is given in number of volumes.
    If full_only is True, only full volumes are returned.

    Args:
        batch_size: the number of volumes in each batch.
        overlap: the number of volumes that overlap between batches.
        volumes: the list of volumes to be batched.
        full_only: if True, only full volumes are returned. If volumes is not None, this argument is ignored.

    Returns:
        A list of lists (n_batches x batch_size) of volume IDs that can be used to load batches of volumes.
    """
    if overlap >= batch_size:
        raise ValueError("Overlap must be smaller than batch size.")

    if volumes is not None:
        volume_list = volumes
    else:
        if full_only:
            volume_list = self.full_volumes
        else:
            volume_list = self.volumes

    # turn into a list if numpy array
    if isinstance(volume_list, np.ndarray):
        volume_list = volume_list.tolist()

    batch_list = []
    for i in range(0, len(volume_list), batch_size - overlap):
        batch_list.append(volume_list[i:i + batch_size])
    return batch_list

choose_frames(conditions, logic='and')

Selects the frames that correspond to specified conditions; Uses "or" or "and" between the conditions depending on logic. To load the selected frames, use load_frames().

Parameters:

Name Type Description Default
conditions Union[tuple, List[Tuple[str, str]]]

a list of conditions on the annotation labels in a form [(group, name),(group, name), ...] where group is a string for the annotation type and name is the name of the label of that annotation type. For example [('light', 'on'), ('shape','c')]

required
logic str

"and" or "or" , default is "and".

'and'

Returns: list of frame ids that were chosen. Remember that frame numbers start at 1.

Source code in src/vodex/experiment.py
def choose_frames(self, conditions: Union[tuple, List[Tuple[str, str]]], logic: str = "and") -> List[int]:
    """
    Selects the frames that correspond to specified conditions;
    Uses "or" or "and" between the conditions depending on logic.
    To load the selected frames, use load_frames().

    Args:
        conditions: a list of conditions on the annotation labels
            in a form [(group, name),(group, name), ...] where group is a string for the annotation type
            and name is the name of the label of that annotation type. For example [('light', 'on'), ('shape','c')]
        logic: "and" or "or" , default is "and".
    Returns:
        list of frame ids that were chosen. Remember that frame numbers start at 1.
    """
    assert logic == "and" or logic == "or", \
        'between_group_logic should be equal to "and" or "or"'
    frames = []
    if logic == "and":
        frames = self.db.get_and_frames_per_annotations(conditions)
    elif logic == "or":
        frames = self.db.get_or_frames_per_annotations(conditions)

    return frames

choose_volumes(conditions, logic='and', verbose=False)

Selects only full volumes that correspond to specified conditions; Uses "or" or "and" between the conditions depending on logic. To load the selected volumes, use load_volumes()

Parameters:

Name Type Description Default
verbose bool

Whether to print the information about how many frames were choose/ dropped

False
conditions Union[tuple, List[Tuple[str, str]]]

a list of conditions on the annotation labels in a form [(group, name),(group, name), ...] where group is a string for the annotation type and name is the name of the label of that annotation type. For example [('light', 'on'), ('shape','c')]

required
logic str

"and" or "or" , default is "and".

'and'

Returns: list of volumes that were chosen. Remember that frame numbers start at 1, but volumes start at 0.

Source code in src/vodex/experiment.py
def choose_volumes(self, conditions: Union[tuple, List[Tuple[str, str]]], logic: str = "and",
                   verbose: bool = False) -> List[int]:
    """
    Selects only full volumes that correspond to specified conditions;
    Uses "or" or "and" between the conditions depending on logic.
    To load the selected volumes, use load_volumes()

    Args:
        verbose: Whether to print the information about how many frames were choose/ dropped
        conditions: a list of conditions on the annotation labels
            in a form [(group, name),(group, name), ...] where group is a string for the annotation type
            and name is the name of the label of that annotation type.
            For example [('light', 'on'), ('shape','c')]
        logic: "and" or "or" , default is "and".
    Returns:
        list of volumes that were chosen.
        Remember that frame numbers start at 1, but volumes start at 0.
    """
    # TODO : make all indices start at 1 ?

    assert isinstance(conditions, list) or isinstance(conditions, tuple), f"conditions must be a list or a tuple," \
                                                                          f" but got {type(conditions)} instead"
    if isinstance(conditions, tuple):
        conditions = [conditions]

    # get all the frames that correspond to the conditions
    frames = self.choose_frames(conditions, logic=logic)
    n_frames = len(frames)
    # leave only such frames that correspond to full volumes
    # TODO : not necessary to return the frames?
    volumes, frames = self.db.choose_full_volumes(frames)
    n_dropped = n_frames - len(frames)
    if verbose:
        print(f"Choosing only full volumes. "
              f"Dropped {n_dropped} frames, kept {len(frames)}")

    return volumes

close()

Close database connection.

Source code in src/vodex/experiment.py
def close(self):
    """
    Close database connection.
    """
    self.db.connection.close()

create(volume_manager, annotations, verbose=False) classmethod

Creates a database instance from the core classes and initialises the experiment.

Parameters:

Name Type Description Default
volume_manager VolumeManager

VolumeManager object that summarises the information about the image data.

required
annotations List[Annotation]

list of annotations to add to the experiment descriptions.

required
verbose bool

whether to print the information about Filemanager, VolumeManager and Annotations on the screen.

False

Returns:

Type Description
Experiment

initialised experiment.

Source code in src/vodex/experiment.py
@classmethod
def create(cls, volume_manager: VolumeManager, annotations: List[Annotation], verbose: bool = False):
    """
    Creates a database instance from the core classes and initialises the experiment.

    Args:
        volume_manager: VolumeManager object that summarises the information about the image data.
        annotations: list of annotations to add to the experiment descriptions.
        verbose: whether to print the information about Filemanager, VolumeManager and Annotations on the screen.

    Returns:
        (Experiment): initialised experiment.
    """
    if verbose:
        print(volume_manager.file_manager)
        print(volume_manager)
        for annotation in annotations:
            print(annotation)
            if annotation.cycle is not None:
                print(annotation.cycle_info())

    db = DbWriter.create()
    db.populate(volumes=volume_manager, annotations=annotations)
    db_reader = DbReader(db.connection)
    return cls(db_reader)

delete_annotations(annotation_names)

Deletes annotations from existing experiment. Does NOT save the changes to disc! run self.save() to save.

Parameters:

Name Type Description Default
annotation_names List[str]

a list of annotation names to delete from the database.

required
Source code in src/vodex/experiment.py
def delete_annotations(self, annotation_names: List[str]):
    """
    Deletes annotations from existing experiment.
    Does NOT save the changes to disc! run self.save() to save.

    Args:
        annotation_names: a list of annotation names to delete from the database.
    """
    for name in annotation_names:
        DbWriter(self.db.connection).delete_annotation(name)

from_dir(dir_name, frames_per_volume, starting_slice=0, verbose=False) classmethod

Creates a database instance from a directory and initialises the experiment. The directory should contain the image files. Annotations are not initialised, but can be added later.

Source code in src/vodex/experiment.py
@classmethod
def from_dir(cls, dir_name: Union[Path, str], frames_per_volume: int,
             starting_slice: int = 0, verbose: bool = False):
    """
    Creates a database instance from a directory and initialises the experiment.
    The directory should contain the image files.
    Annotations are not initialised, but can be added later.
    """
    # initialise volume manager
    volume_manager = VolumeManager.from_dir(dir_name, frames_per_volume, fgf=starting_slice)
    return cls.create(volume_manager, [], verbose)

get_volume_annotation_df(volumes, annotation_names=None)

Get annotations for volumes. Will get the labels for the specified full volumes from each available annotation as pandas dataframe. Args: volumes: the indexes of volumes to get annotation for. If a multidimensional array is passed, will flatten it and get annotations for all the volumes in it. annotation_names: the names of the annotations to get. If None, will get all the annotations.

Returns:

Type Description
DataFrame

a dataframe with the annotations for each annotation type.

DataFrame

The columns are volumes and the annotation types with the corresponding labels for each volume.

Source code in src/vodex/experiment.py
def get_volume_annotation_df(self, volumes: Union[npt.NDArray, List[int]],
                             annotation_names: Optional[List[str]] = None) -> pd.DataFrame:
    """
    Get annotations for volumes.
    Will get the labels for the specified full volumes from each available annotation as pandas dataframe.
    Args:
        volumes: the indexes of volumes to get annotation for. If a multidimensional array is passed,
            will flatten it and get annotations for all the volumes in it.
        annotation_names: the names of the annotations to get. If None, will get all the annotations.

    Returns:
        a dataframe with the annotations for each annotation type.
        The columns are volumes and the annotation types with the corresponding labels for each volume.
    """
    # get annotations for the volumes
    annotations = self.get_volume_annotations(volumes, annotation_names=annotation_names)
    return pd.DataFrame(annotations)

get_volume_annotations(volumes, annotation_names=None)

Get annotations for volumes. Will get the labels for the specified full volumes from each available annotation.

Parameters:

Name Type Description Default
volumes Union[NDArray, List[int]]

the indexes of volumes to get annotation for. If a multidimensional array is passed, will flatten it and get annotations for all the volumes in it.

required
annotation_names Optional[List[str]]

the names of the annotations to get. If None, will get all the annotations.

None

Returns:

Type Description
Dict[str, List[str]]

a dictionary with the annotations for each annotation type.

Dict[str, List[str]]

The keys are the annotation types, the values are lists of labels for each volume.

Dict[str, List[str]]

The last key is "volumes" and the value is a list of volumes.

Source code in src/vodex/experiment.py
def get_volume_annotations(self, volumes: Union[npt.NDArray, List[int]],
                           annotation_names: Optional[List[str]] = None) -> Dict[str, List[str]]:
    """
    Get annotations for volumes.
    Will get the labels for the specified full volumes from each available annotation.

    Args:
        volumes: the indexes of volumes to get annotation for. If a multidimensional array is passed,
            will flatten it and get annotations for all the volumes in it.
        annotation_names: the names of the annotations to get. If None, will get all the annotations.

    Returns:
        a dictionary with the annotations for each annotation type.
        The keys are the annotation types, the values are lists of labels for each volume.
        The last key is "volumes" and the value is a list of volumes.
    """
    # TODO: throw a warning if some volumes are not in the database

    # if array convert to list of int
    if isinstance(volumes, np.ndarray):
        # turn into a 1D array
        volumes = volumes.flatten()
        # make sure all the volumes can be safely converted to integers
        assert np.all(volumes.astype(int) == volumes), "All the volumes must be integers"
        volumes = volumes.astype(int).tolist()

    # get annotations for the volumes
    annotations = self.db.get_volume_annotations(volumes, annotation_names=annotation_names)

    # prepare dict for the annotations
    annotation = {key: [] for key in annotations.keys()}
    annotation["volumes"] = []

    # get a single label per volume
    for volume in volumes:
        for group, data in annotations.items():
            volume_ids = np.array(data["volume_ids"])
            labels = np.array(data["labels"])
            # check that the volume has the same labels
            labels_per_volume = set(labels[volume_ids == volume])
            if len(labels_per_volume) > 1:
                raise ValueError(f"Volume {volume} has different labels ({labels_per_volume}) "
                                 f"for the same annotation {group}. Can't assign a single label to the volume.")
            # add the label to the dict
            annotation[group].append(list(labels_per_volume)[0])
        # add the volume to the dict
        annotation["volumes"].append(volume)

    return annotation

list_conditions_per_cycle(annotation_type, as_volumes=True)

Returns a list of conditions per cycle.

Parameters:

Name Type Description Default
annotation_type str

The name of the annotation for which to get the conditions list

required
as_volumes bool

weather to return conditions per frame (default) or per volume. If as_volumes is true, it is expected that the conditions are not changing in the middle of the volume. Will throw an error if it happens.

True

Returns: list of the condition ids ( condition per frame or per volume) and corresponding condition names.

Source code in src/vodex/experiment.py
def list_conditions_per_cycle(self, annotation_type: str, as_volumes: bool = True) -> Tuple[List[int], List[str]]:
    """
    Returns a list of conditions per cycle.

    Args:
        annotation_type: The name of the annotation for which to get the conditions list
        as_volumes: weather to return conditions per frame (default) or per volume.
            If as_volumes is true, it is expected that the conditions are not changing in the middle of the volume.
            Will throw an error if it happens.
    Returns:
        list of the condition ids ( condition per frame or per volume) and corresponding condition names.
    """

    # TODO : check if empty
    if as_volumes:
        _, condition_ids, count = self.db.get_conditionIds_per_cycle_per_volumes(annotation_type)
        fpv = self.db.get_fpv()
        assert np.all(np.array(count) == fpv), "Can't list_conditions_per_cycle with as_volumes=True: " \
                                               "some conditions don't cover the whole volume." \
                                               "You might want to get conditions per frame," \
                                               " by setting as_volumes=False"
    else:
        _, condition_ids = self.db.get_conditionIds_per_cycle_per_frame(annotation_type)
    names = self.db._get_Names_from_AnnotationTypeLabels()

    return condition_ids, names

list_cycle_iterations(annotation_type, as_volumes=True)

Returns a list of cycle iterations for a specified annotation. The annotation must have been initialised from a cycle.

Parameters:

Name Type Description Default
annotation_type str

The name of the annotation for which to get the cycle iteratoins list

required
as_volumes bool

weather to return cycle iteratoins per frame ( default) or per volume. If as_volumes is true, it is expected that the cycle iteratoins are not changing in the middle of the volume. Will throw an error if it happens.

True
as_volumes bool

bool

True

Returns: list of the condition ids (cycle iterations per frame or per volume)

Source code in src/vodex/experiment.py
def list_cycle_iterations(self, annotation_type: str, as_volumes: bool = True) -> List[int]:
    """
    Returns a list of cycle iterations for a specified annotation.
    The annotation must have been initialised from a cycle.

    Args:
        annotation_type: The name of the annotation for which to get the cycle iteratoins list
        as_volumes: weather to return cycle iteratoins per frame ( default) or per volume.
            If as_volumes is true, it is expected that the cycle iteratoins are not changing in the middle of the volume.
            Will throw an error if it happens.
        as_volumes: bool
    Returns:
        list of the condition ids (cycle iterations per frame or per volume)
    """

    if as_volumes:
        _, cycle_its, count = self.db.get_cycleIterations_per_volumes(annotation_type)
        fpv = self.db.get_fpv()
        assert np.all(np.array(count) == fpv), "Can't list_cycle_iterations with as_volumes=True: " \
                                               "some iterations don't cover the whole volume." \
                                               "You might want to get iterations per frame," \
                                               " by setting as_volumes=False"
    else:
        _, cycle_its = self.db.get_cycleIterations_per_frame(annotation_type)

    return cycle_its

list_volumes()

Returns a list of all the volumes IDs in the experiment. If partial volumes are present: for "head" returns -1, for "tail" returns -2.

Returns:

Type Description
NDArray[int]

list of volume IDs

Source code in src/vodex/experiment.py
def list_volumes(self) -> npt.NDArray[int]:
    """
    Returns a list of all the volumes IDs in the experiment.
    If partial volumes are present: for "head" returns -1, for "tail" returns -2.

    Returns:
        list of volume IDs
    """
    # TODO : Remove this function and use volumes property instead
    warnings.warn(f"list_volumes will be removed in vodex 1.1.0 use volumes property instead.")

    return self.volumes

load(file_name) classmethod

Loads a database from a file and initialises an Experiment.

Parameters:

Name Type Description Default
file_name Union[Path, str]

full path to a file to database.

required

Return: (Experiment): initialised experiment.

Source code in src/vodex/experiment.py
@classmethod
def load(cls, file_name: Union[Path, str]):
    """
    Loads a database from a file and initialises an Experiment.

    Args:
        file_name: full path to a file to database.
    Return:
        (Experiment): initialised experiment.
    """
    db_reader = DbReader.load(file_name)
    return cls(db_reader)

load_slices(slices, volumes=None, skip_missing=False, verbose=False)

Load volumes. Will load the specified full volumes. All the returned volumes or slices should have the same number of frames in them.

Parameters:

Name Type Description Default
slices List[int]

the indexes of slices in the volumes to load.

required
volumes List[int]

the indexes of volumes to load slices for. If None, will load slices for all volumes.

None
skip_missing bool

Whether to skip missing volumes. If False, will raise an error if a slice is missing for any volume.

False
verbose bool

Whether to print the information about the loading

False

Returns: 4D array with the loaded slices for selected volumes. TZYX order.

Source code in src/vodex/experiment.py
def load_slices(self, slices: List[int], volumes: List[int] = None,
                skip_missing: bool = False, verbose: bool = False) -> npt.NDArray:
    """
    Load volumes. Will load the specified full volumes.
    All the returned volumes or slices should have the same number of frames in them.

    Args:
        slices: the indexes of slices in the volumes to load.
        volumes: the indexes of volumes to load slices for. If None, will load slices for all volumes.
        skip_missing: Whether to skip missing volumes.
            If False, will raise an error if a slice is missing for any volume.
        verbose: Whether to print the information about the loading
    Returns:
        4D array with the loaded slices for selected volumes. TZYX order.
    """
    if volumes is None:
        volumes = self.db.get_volume_list()

    frames = self.db.get_frames_per_volumes(volumes, slices=slices)
    info = self.db.prepare_frames_for_loading(frames)

    # unpack and load
    data_dir, file_names, file_ids, frame_in_file, volumes_per_frame = info

    # get unique volumes and check that they are the same as the ones we asked for
    if skip_missing:  # throw a warning
        if set(volumes_per_frame) != set(volumes):
            warnings.warn(f"Requested volumes {set(volumes).difference(set(volumes_per_frame))} " +
                          f"are not present in the slices {slices}. " +
                          f"Loaded slices for {set(volumes_per_frame)} volumes.")
    else:  # throw an error
        assert set(volumes_per_frame) == set(volumes), \
            f"Requested volumes {set(volumes).difference(set(volumes_per_frame))} " \
            f"are not present in the slices {slices}. "

    # make full paths to files ( remember file ids start with 1 )
    files = [Path(data_dir, file_names[file_id - 1]) for file_id in file_ids]
    if not hasattr(self, "loader"):
        self.loader = ImageLoader(Path(data_dir, file_names[0]))

    volumes_img = self.loader.load_volumes(frame_in_file,
                                           files,
                                           volumes_per_frame,
                                           show_file_names=False,
                                           show_progress=verbose)

    # if the z dimension is smaller than the number of slices, throw a warning
    if volumes_img.shape[1] < len(slices):
        warnings.warn(f"Some of the requested slices {slices} are not present in the volumes. " +
                      f"Loaded {volumes_img.shape[1]} slices instead of {len(slices)}")

    return volumes_img

load_volumes(volumes, verbose=False)

Load volumes. Will load the specified full volumes. All the returned volumes or slices should have the same number of frames in them.

Parameters:

Name Type Description Default
volumes Union[NDArray, List[int]]

the indexes of volumes to load.

required
verbose bool

Whether to print the information about the loading

False

Returns: 4D array with the loaded volumes. TZYX order.

Source code in src/vodex/experiment.py
def load_volumes(self, volumes: Union[npt.NDArray, List[int]], verbose: bool = False) -> npt.NDArray:
    """
    Load volumes. Will load the specified full volumes.
    All the returned volumes or slices should have the same number of frames in them.

    Args:
        volumes: the indexes of volumes to load.
        verbose: Whether to print the information about the loading
    Returns:
        4D array with the loaded volumes. TZYX order.
    """
    # if array convert to list of int
    if isinstance(volumes, np.ndarray):
        # make sure it is a 1D array
        assert len(volumes.shape) == 1, "volumes must be a 1D array"

        # make sure all the volumes can be safely converted to integers
        assert np.all(volumes.astype(int) == volumes), "All the volumes must be integers"
        volumes = volumes.astype(int).tolist()

    frames = self.db.get_frames_per_volumes(volumes)
    info = self.db.prepare_frames_for_loading(frames)

    # unpack
    data_dir, file_names, file_ids, frame_in_file, volumes_per_frame = info
    # get unique volumes and check that they are the same as the ones we asked for
    assert set(volumes_per_frame) == set(volumes), "Requested volumes" \
                                                   f" {set(volumes).difference(set(volumes_per_frame))} " \
                                                   "can not be found"
    # make full paths to files ( remember file ids start with 1 )
    files = [Path(data_dir, file_names[file_id - 1]) for file_id in file_ids]
    if not hasattr(self, "loader"):
        self.loader = ImageLoader(Path(data_dir, file_names[0]))
    volumes_img = self.loader.load_volumes(frame_in_file,
                                           files,
                                           volumes_per_frame,
                                           show_file_names=False,
                                           show_progress=verbose)
    return volumes_img

save(file_name)

Saves a database into a file.

Parameters:

Name Type Description Default
file_name Union[Path, str]

full path to a file to save database. (Usually the filename would end with .db)

required
Source code in src/vodex/experiment.py
def save(self, file_name: Union[Path, str]):
    """
    Saves a database into a file.

    Args:
        file_name: full path to a file to save database.
            (Usually the filename would end with .db)
    """
    DbWriter(self.db.connection).save(file_name)