model

Defines data and processing structures.

🚀 Welcome to the Model module of neuralactivitycubic!

If you’re here, chances are you’re interested in understanding or extending the Model class that sits at the heart of neuralactivitycubic. Whether you’re a researcher, a contributor, or just curious, this notebook is for you!

Before we dive into the code, let’s take a moment to talk about how this piece fits into the big picture.

🧠 What is the Model?

Think of the Model as the brain of neuralactivitycubic. It’s the hub that:

  • Manages and validates configuration settings

  • Creates analysis jobs based on user input or directory structures

  • Executes the core analysis pipeline

  • Communicates with the graphical user interface (GUI), if used

  • Saves logs, plots, and results for each analysis

It can run standalone or be accessed via the GUI. It handles batch jobs, logs everything, and orchestrates the creation of structured outputs, including NWB files. Crucially, though, as the brain of neuralactivitycubic, it essentially sends the commands to the other, downstream modules, which in turn implement the logic that will be executed. Like, for instance, the Model leverages the RecordingLoader that is implemented in the input module for loading of your recording data when creating the analysis jobs, and so on. Thus, the Model also represents the highest level of abstraction in neuralactivitycubic and enables you to run a wholistic analysis with only a few commands.

Here’s what a minimal programmatic workflow might look like:

from neuralactivitycubic.model import Model

model = Model('path/to/my/recording.avi')
model.create_analysis_jobs()
model.run_analysis()

Boom, that´s it! Your data is analyzed, and results are saved. We made this even more accessible for you, by the way, in the api module. If you´re planning to use neuralactivitycubic programmatically, we highly recommend you check it out.

🧩 Literate Programming in Action

Also, before we start with the actual source code of this module, we also added some quick notes for those of you that are new to the concept of Literate Programming, which we are using throughout neuralactivitycubic. Why? Our goal is to make the logic transparent and easy to understand - because research software deserves to be as readable as the papers that use it! Therefore, all notebooks of neuralactivitycubic interleave documentation, explanations, usage examples, and tests with the actual source code. We hope you like it!

We´re using a literate programming framework called nbdev for the development, testing, documentation, and dissemination of neuralactivitycubic. We believe the concept of rich annotations and usage examples directly intermixed with the source code (read more about the concept of literate programming for instance here) holds great value and potential, especially in the context of research software, as it makes it easier for others to understand, (re-)use, and ideally even to adapt or contribute to the code. That being said, some things in here might look a bit confusing to you - conversely especially if you are an experienced developer used to “regular” source code.

For instance, you´ll regularly find the implementation of a class, like the Model here, interrupted by markdown text and maybe even some addtional code cells that serve as usage examples or even represent the implementation of tests, only for the subsequent class methods to continue in code cells below, literally patched to the class using the @patch decorator. Just be aware that this may not meet your expectations of what conventional source code typically looks like and maybe you can discover some valuable takeaway for you along the way, too!

One final note before you head on, though: we are ourselves still experimenting a lot with this concept and how it can be used to best effect and are more than happy to engage in discussion or to hear your feedback on this topic. Feel free to drop us a message via GitHub!

🛠️ Import all dependencies

Everybody needs some help - and we´re standing on the shoulders of some giants here. Let´s start by importing all dependencies we need to make neuralactivitycubic work:

Exported source
# External functional dependencies:
from pathlib import Path
from datetime import datetime, timezone
from matplotlib.pyplot import show
import ipywidgets as w
import multiprocessing
from fastcore.basics import patch
import gc

# External dependencies for type hints:
from typing import Callable
from matplotlib.figure import Figure
from matplotlib.axes._axes import Axes

# Internal dependencies
from neuralactivitycubic.datamodels import Config
from neuralactivitycubic.processing import AnalysisJob
from neuralactivitycubic.input import RecordingLoaderFactory, ROILoaderFactory, RecordingLoader, ROILoader, get_filepaths_with_supported_extension_in_dirpath, FocusAreaPathRestrictions

📘 Start of the source code

Ready? Grab a coffee and enjoy the tour through the module! Let’s jump into the first class, the Logger:

Before any jobs are created or analysis is run, we need a reliable way to record what happens and when - and that´s what the Logger does. It logs messages with precise UTC timestamps and can:

  • Store messages in memory for programmatic access

  • Print logs to the console (or GUI)

  • Save logs to disk in a simple text file

  • Clear logs when switching contexts or rerunning jobs


source

Logger


def Logger(
    
):

A simple logging utility class that captures log messages with UTC timestamps, allows retrieval and clearing of logs, and supports saving them to a file.

Exported source
class Logger:

    """
    A simple logging utility class that captures log messages with UTC timestamps,
    allows retrieval and clearing of logs, and supports saving them to a file.
    """
    
    def __init__(self):
        self.logs = []

    def add_new_log(self, message: str) -> None:
        """
        Add a new log message with a UTC timestamp prefix. 
        The timestamp is formatted as 'dd-mm-yy HH:MM:SS.ffffff (UTC)'.
        """
        time_prefix_in_utc = datetime.now(timezone.utc).strftime('%d-%m-%y %H:%M:%S.%f')
        self.logs.append(f'{time_prefix_in_utc} (UTC): {message}')
        print(f'{time_prefix_in_utc} (UTC): {message}')

    def get_logs(self) -> list[str]:
        return self.logs

    def clear_logs(self) -> None:
        self.logs = []

    def save_current_logs(self, save_dir: Path) -> None:
        filepath = save_dir.joinpath('logs.txt')
        with open(filepath , 'w+') as logs_file:
            for log_message in self.logs:
                logs_file.write(f'{log_message}\n')
        print(f'Logs saved to {filepath}')

Its usage is simple, but invaluable for debugging and traceability. Here´s a short example of how to interact with the Logger:

# Create a logger instance
logger = Logger()

# Simulate logging workflow
logger.add_new_log('Starting simulated analysis...')
logger.add_new_log('Loading data...')
logger.add_new_log('Data successfully loaded.')
10-02-26 09:20:31.319811 (UTC): Starting simulated analysis...
10-02-26 09:20:31.319900 (UTC): Loading data...
10-02-26 09:20:31.319957 (UTC): Data successfully loaded.
# Access what has been logged so far:
logger.get_logs()
['10-02-26 09:20:31.319811 (UTC): Starting simulated analysis...',
 '10-02-26 09:20:31.319900 (UTC): Loading data...',
 '10-02-26 09:20:31.319957 (UTC): Data successfully loaded.']

And if you´d like to save the logs, you can do so pretty easily as well - just be aware that it expects a pathlib.Path object that defines the directory in which the logs.txt file should be saved! Once all logs are saved, you might want to clean it up before starting your next analysis:

# Create a Path:
destination_directory = Path('path/to/my/directory/of/choice')

# Pass the Path to the Logger to save the logs.txt
logger.save_current_logs(destination_directory)
# Clean up all previously logged messages and your off to a fresh start:
logger.clear_logs()

You have some trust issues? You can actually check that the list of logged messages is actually empty by calling the get_logs() method again:

# Confirm the logs were actually cleaned:
logger.get_logs()
[]

Awesome — that’s exactly the behavior we expected! 🎯

Sure, this might be a simple example, but it’s a perfect introduction to the idea of testing your code. If you know what your code should do — and maybe just as importantly, what it shouldn’t do — you can write a test to check for that.

Even better: those tests are just regular Python code. That means they can be executed automatically, for example, every time you make changes — helping you catch bugs early and ensure nothing breaks by accident.

And since we’re using the literate programming framework nbdev, we can write these tests right here, directly next to the source code they validate. That’s not just tidy — it’s powerful.

So let’s put that into practice and write a test for the Logger class that automates the check we just did manually: confirming that all log messages are properly cleared when calling clear_logs().

The literate programming framework we are using here, called nbdev, essentially uses any assert statements you define in the notebook for testing (see for instance here in their official documentation on how they envision literate test implementation.

To add some more background, an assert statement only passes if it´s True, otherwise it raises an AssertionError. For instance:

assert 1 == 1

would pass, while:

assert 1 == 2

would raise an AssertionError and nbdev´s automated testing pipelines would catch and raise that error, notifying you that something is wrong with your code.

For this reason, if we´re defining custom functions for the testing, we always need to make sure they return True when the check passes, such that the corresponding assert that calls the function also passes.

def test_log_clean_up():
    logger = Logger()
    logger.add_new_log('test')
    logger.clear_logs()
    logs_count = len(logger.get_logs())
    return logs_count == 0
assert test_log_clean_up()

And just like that — we’ve got automated tests running! 🎉

Every time we push a new version of the code to GitHub, this test (along with all the others we’ve written) runs automatically. If something breaks or behaves unexpectedly, we find out right away and can fix it before it becomes a problem.

Important

Think about how powerful that is: we don’t have to wait for a sharp-eyed user to notice something’s off, track us down, and explain the issue. We catch bugs early — long before they can silently affect someone’s work.

In the context of research software, that’s not just convenient — it’s critical. A bug that slips through unnoticed could end up influencing someone’s results, and possibly even get baked into a publication. That’s a scenario none of us want.

Testing is always important — but in science and research, the stakes are higher. That’s why we take it seriously.

Next up is the Model, which sits right at the core of neuralactivitycubic:


source

Model


def Model(
    config:neuralactivitycubic.datamodels.Config | str
)->None:

Initialize self. See help(type(self)) for accurate signature.

Exported source
class Model:

    def __init__(self, 
                 config: Config | str
                ) -> None:
        self.num_processes = multiprocessing.cpu_count()
        self.analysis_job_queue = []
        self.result_directories = []
        self.logs = Logger()
        if isinstance(config, str):
            config = Config(data_source_path=config)
        self.config = config
        self.nwb_metadata = None
        self.gui_enabled = False
        self.callback_view_update_infos = None
        self.callback_view_show_output_screen = None
        self.view_output = None
        self.pixel_conversion = None

This constructor sets up everything needed to use na3 in either programmatic, batch, or GUI mode.

  • The config parameter can either be a Config object or just a path string to the data source. If it’s a string, the constructor wraps it in a Config.

  • A Logger is created to track what’s happening and report status.

  • GUI-related hooks are initialized but left inactive until explicitly connected.

This object can now create and run jobs—but wait, what’s a job in this context? That’s what we’ll get into next 👇

Once the Model is initialized, the first thing you do is call create_analysis_jobs(). This method determines what data should be analyzed and sets up jobs accordingly.

It supports both single-file and batch-directory analysis and uses a set of private helpers to figure out how to match recordings, ROI masks, and focus area files.

Let’s take a look:


source

Model.create_analysis_jobs


def create_analysis_jobs(
    
)->None:
Exported source
@patch
def create_analysis_jobs(self: Model
                        ) -> None:
    self._ensure_data_from_previous_jobs_was_removed()
    self.add_info_to_logs('Basic configurations for data import validated. Starting creation of analysis job(s)...', True)
    if self.config.batch_mode:
        all_subdir_paths_with_rec_file = self._get_all_subdir_paths_with_rec_file(self.config.data_source_path)
        all_subdir_paths_with_rec_file.sort()
        for idx, subdir_path in enumerate(all_subdir_paths_with_rec_file):
            if self.config.results_filepath:
                result_path = self.config.results_filepath / subdir_path.name
            else:
                result_path = subdir_path
            self._create_analysis_jobs_for_single_rec(subdir_path, result_path)
    else:
        self._create_analysis_jobs_for_single_rec()
    self.add_info_to_logs('All job creation(s) completed.', True, 100.0)

This method acts like a scout: it ventures into the provided folder(s), searches for files that NA³ knows how to handle, and sets up one or more analysis jobs based on what it finds.

Whether it creates one job or many depends on the configuration you’ve selected. The only setup that guarantees a single job is the following:

batch mode: OFF
focus area: OFF
ROI mode: Grid

For any other combination, the number of analysis jobs depends entirely on how your source data is structured—e.g. how many recordings are detected, how many ROI masks are found, or how many focus area regions exist.

The actual logic for locating those files and assembling jobs doesn’t live directly in create_analysis_jobs() — instead, it’s delegated to a series of helper methods. These methods do the real legwork and are marked as private by convention, meaning they’re not intended to be called from outside the Model class. Each isolates a specific aspect of the analysis job creation, like locating recordings or mapping ROI files, making it easier to extend or debug individual steps.

For your convenience, we’ve bundled all of them together in the next code block so you can collapse them as a group, skim through them if you like, and then jump back into the higher-level functionality.

Exported source
@patch
def _ensure_data_from_previous_jobs_was_removed(self: Model
                                               ) -> None:
    self.add_info_to_logs('Loading of new source data. All previously created jobs & logs will be deleted.', True)
    self.analysis_job_queue = []
    self.logs.clear_logs()


@patch
def _get_all_subdir_paths_with_rec_file(self: Model, 
                                        top_level_dir_path: Path
                                       ) -> list[Path]:
    rec_loader_factory = RecordingLoaderFactory()
    supported_extensions_for_recordings = rec_loader_factory.all_supported_extensions
    all_subdir_paths_that_contain_a_supported_recording_file = []
    for elem in top_level_dir_path.iterdir():
        if not elem.name.startswith('.'):
            if elem.is_dir():
                supported_recording_filepaths = [elem_2 for elem_2 in elem.iterdir() if elem_2.suffix in supported_extensions_for_recordings]
                if len(supported_recording_filepaths) > 0:
                    all_subdir_paths_that_contain_a_supported_recording_file.append(elem)
    return all_subdir_paths_that_contain_a_supported_recording_file



@patch
def _create_analysis_jobs_for_single_rec(self: Model, 
                                         recording_path: Path = None,
                                         result_path: Path = None
                                        ) -> None:
    if recording_path is None:
        if self.config.recording_filepath:
            recording_path = self.config.recording_filepath
        else:
            recording_path = self.config.data_source_path
    self.add_info_to_logs(f'Starting with Job creation(s) for {str(recording_path)}', True)
    recording_loader = self._get_recording_loader(recording_path)


    if self.config.roi_filepath:
        roi_filepath = self.config.roi_filepath
    else:
        roi_filepath = recording_loader.filepath.parent

    if self.config.roi_mode == 'file':
        roi_loaders = self._get_all_roi_loaders(roi_filepath)
    else:
        roi_loaders = None

    if self.config.focus_area_filepath:
        focus_area_filepath = self.config.focus_area_filepath
    else:
        focus_area_filepath = recording_loader.filepath.parent

    if result_path is None:
        if self.config.results_filepath:
            result_path = self.config.results_filepath
        else:
            result_path = recording_loader.filepath.parent

    if self.config.focus_area_enabled:
        focus_area_dir_path = self._get_focus_area_dir_path(focus_area_filepath)
        if focus_area_dir_path is None:
            analysis_job = self._create_single_analysis_job(recording_loader, roi_loaders, result_filepath=result_path)
            self.analysis_job_queue.append(analysis_job)
            self.add_info_to_logs(f'Successfully created a single job for {focus_area_filepath} at queue position: #{len(self.analysis_job_queue)}.', True)
        else:
            all_focus_area_loaders = self._get_all_roi_loaders(focus_area_dir_path)
            assert len(all_focus_area_loaders) > 0, f'Focus Area analysis enabled, but no focus area ROIs could be found. Please revisit your source data and retry!'
            for idx, focus_area_loader in enumerate(all_focus_area_loaders):
                if result_path is None:
                    if self.config.results_filepath:
                        result_path = self.config.results_filepath / focus_area_loader.filepath.stem
                analysis_job_with_focus_area = self._create_single_analysis_job(recording_loader, roi_loaders, focus_area_loader, result_filepath=result_path)
                self.analysis_job_queue.append(analysis_job_with_focus_area)
                job_creation_message = (f'Successfully created {idx + 1} out of {len(all_focus_area_loaders)} job(s) for {recording_path} '
                                        f'at queue position: #{len(self.analysis_job_queue)}.')
                self.add_info_to_logs(job_creation_message, True)
    else:
        analysis_job = self._create_single_analysis_job(recording_loader, roi_loaders, result_filepath=result_path)
        self.analysis_job_queue.append(analysis_job)
        self.add_info_to_logs(f'Successfully created a single job for {recording_path} at queue position: #{len(self.analysis_job_queue)}.', True)
    self.add_info_to_logs(f'Finished Job creation(s) for {recording_path}!', True)


@patch
def _get_recording_loader(self: Model, 
                          source_path: Path
                         ) -> RecordingLoader:
    rec_loader_factory = RecordingLoaderFactory()
    if source_path.is_dir():
        self.add_info_to_logs(f'Looking for a valid recording file in {source_path}...', True)
        valid_filepaths = get_filepaths_with_supported_extension_in_dirpath(source_path, rec_loader_factory.all_supported_extensions, 1)
        if len(valid_filepaths) == 0:
            self.add_info_to_logs(f'Could not find any recording files of supported type at {source_path}!', True)
        elif len(valid_filepaths) >  1:
            filepath = valid_filepaths[0]
            too_many_files_message = (f'Found more than a single recording file of supported type at {source_path}, i.e.: {valid_filepaths}. '
                                      f'However, only a single file was expected. NA3 continues with {filepath} and will ignore the other files.')
            self.add_info_to_logs(too_many_files_message, True)
        else:
            filepath = valid_filepaths[0]
            self.add_info_to_logs(f'Found recording file of supported type at: {filepath}.', True)
    else:
        filepath = source_path
        self.add_info_to_logs(f'Found recording file of supported type at: {filepath}.', True)
    recording_loader = rec_loader_factory.get_loader(filepath)
    return recording_loader


@patch
def _get_all_roi_loaders(self: Model, 
                         data_source_path: Path
                        ) -> list[ROILoader]:
    assert data_source_path.is_dir(), f'You must provide a directory as source data when using ROI mode or enabling Focus Areas. Please revisit your input data and retry.'
    roi_loader_factory = ROILoaderFactory()
    all_filepaths_with_supported_filetype_extensions = get_filepaths_with_supported_extension_in_dirpath(data_source_path, roi_loader_factory.all_supported_extensions)
    all_roi_loaders = [roi_loader_factory.get_loader(roi_filepath) for roi_filepath in all_filepaths_with_supported_filetype_extensions]
    return all_roi_loaders


@patch
def _get_focus_area_dir_path(self: Model, 
                             source_path: Path
                            ) -> Path:
    focus_area_path_restrictions = FocusAreaPathRestrictions()
    supported_dir_names = focus_area_path_restrictions.supported_dir_names
    if source_path.is_dir():
        source_dir_path = source_path
    else:
        source_dir_path = source_path.parent
    dirs_with_valid_name = [elem for elem in source_dir_path.iterdir() if (elem.name in supported_dir_names) & (elem.is_dir() == True)]
    if len(dirs_with_valid_name) == 0:
        no_dir_found_message = (f'You enabled Focus Area but a correspondingly named directory could not be found in {source_dir_path}. '
                                f'Please use one of the following for the name of the directory that contains the Focus Area ROIs: {supported_dir_names}. '
                                'In absence of such a directory, analysis is continued without using the Focus Area mode for this data.')
        self.add_info_to_logs(no_dir_found_message, True)
        focus_area_dir_path = None
    elif len(dirs_with_valid_name) > 1:
        too_many_dirs = (f'More than a single Focus Area directory was found in the following parent directory: {source_dir_path}, i.e.: '
                         f'{dirs_with_valid_name}. However, only the use of a single one that contains all your Focus Area ROIS is '
                         f'currently supported. {dirs_with_valid_name[0]} will be used for this analysis, while the other(s): {dirs_with_valid_name[1:]} '
                         'will be ignored to continue processing.')
        self.add_info_to_logs(too_many_dirs, True)
        focus_area_dir_path = dirs_with_valid_name[0]
    else:
        focus_area_dir_path = dirs_with_valid_name[0]
    return focus_area_dir_path


@patch
def _create_single_analysis_job(self: Model,
                                recording_loader: RecordingLoader,
                                roi_loaders: list[ROILoader] | None,
                                focus_area_loader: ROILoader = None,
                                result_filepath: Path = None
                               ) -> AnalysisJob:
    data_loaders = {'recording': recording_loader}
    if roi_loaders is not None:
        data_loaders['rois'] = roi_loaders
    if focus_area_loader is not None:
        data_loaders['focus_area'] = focus_area_loader
    return AnalysisJob(self.num_processes, data_loaders, result_filepath)

Once the analysis jobs have been created, it’s time to actually process them! ✨

To kick off the entire analysis pipeline, you simply call the other main method of the Model:

run_analysis()

This method takes care of everything:

  • executing each analysis job in turn,

  • logging the progress and any issues along the way,

  • generating results and saving them to disk,

  • and even updating the GUI if you’re running in interactive mode.


source

Model.run_analysis


def run_analysis(
    
)->None:
Exported source
@patch
def run_analysis(self: Model
                ) -> None:
    self._display_configs()
    self.add_info_to_logs('Starting analysis...', True)
    for job_idx in range(len(self.analysis_job_queue)):
        analysis_job = self.analysis_job_queue.pop(0)
        self.add_info_to_logs(f'Starting to process analysis job with index #{job_idx}.')
        analysis_job.run_analysis(self.config)
        self.add_info_to_logs(f'Analysis successfully completed. Continue with creation of results.. ')
        analysis_job.create_results(self.config, self.nwb_metadata)
        self.add_info_to_logs(f'Results successfully created at: {analysis_job.results_dir_path}')
        if self.gui_enabled:
            self.callback_view_show_output_screen()
            with self.view_output:
                activity_overview_fig = analysis_job.activity_overview_plot[0]
                activity_overview_fig.set_figheight(400 * self.pixel_conversion)
                activity_overview_fig.tight_layout()
                show(activity_overview_fig)
        self._save_user_settings_as_json(analysis_job)
        self.result_directories.append(analysis_job.results_dir_path)
        self.add_info_to_logs('Updating all log files to contain all logs as final step. All valid logs files will end with this message.')
        self.logs.save_current_logs(analysis_job.results_dir_path)
    else:
        gc.collect()
Note

Analysis jobs are currently processed sequentially—one after another. However, each job itself is internally parallelized: the ROI traces are processed using all available CPU cores.

That means there’s definitely room for future optimization at the job level. So if you’re feeling adventurous and want to dive into parallel job execution… we’d love to see your pull request! 🔧🚀

The private helper methods that run_analysis() uses are significantly less complex. As before, they’re grouped below for convenience to enable optional collapsing. 👇

Exported source
@patch
def _display_configs(self: Model
                    ) -> None:
    self.add_info_to_logs('Configurations for Analysis Settings and Result Creation validated successfully.', True)
    self.add_info_to_logs(f'Analysis Settings are:')
    for line in self.config.display_all_attributes():
        self.add_info_to_logs(line)


@patch
def _save_user_settings_as_json(self: Model, 
                                analysis_job: AnalysisJob
                               ) -> None:
    filepath = analysis_job.results_dir_path.joinpath('user_settings.json')
    self.config.recording_filepath = analysis_job.recording.filepath
    if analysis_job.focus_area_enabled:
        self.config.focus_area_filepath = analysis_job.focus_area.filepath
    else:
        self.config.focus_area_filepath = None
    if analysis_job.rois_source == 'file':
        self.config.roi_filepath = [roi.filepath for roi in analysis_job.all_rois]
    with open(filepath, 'w+') as user_settings_json:
        user_settings_json.write(self.config.to_json())

The Model is also designed to cooperate closely with na3’s graphical user interface (GUI) — without ever depending on it directly. This is thanks to a clear separation of concerns, following the Model-View-Controller (MVC) design pattern.

The idea is simple:

  • The Model knows how to run analyses

  • The View handles the display and user interaction

  • The controller (in this case, the App class) wires the two together

To support this structure, the Model exposes two public methods that let the controller hook up GUI elements like progress messages and result previews. Let’s look at them:


source

Model.setup_connection_to_update_infos_in_view


def setup_connection_to_update_infos_in_view(
    update_infos:Callable
)->None:

Allows to configure the widget in the GUI that is used to display the logs.

Exported source
@patch
def setup_connection_to_update_infos_in_view(self: Model, 
                                             update_infos: Callable
                                            ) -> None:
    """ Allows to configure the widget in the GUI that is used to display the logs. """
    self.callback_view_update_infos = update_infos
    self.gui_enabled = True

This method registers a callback that will be used to send updates (logs or progress percentages) from the model to the GUI’s log display widget. Once this is connected, log messages generated during analysis can be sent live to the GUI’s log panel.


source

Model.setup_connection_to_display_results


def setup_connection_to_display_results(
    show_output_screen:Callable, output:Output, pixel_conversion:float
)->None:

Allows to configure the widget in the GUI that is used to display images, plots, and figures.

Exported source
@patch
def setup_connection_to_display_results(self: Model, 
                                        show_output_screen: Callable, 
                                        output: w.Output, 
                                        pixel_conversion: float
                                       ) -> None:
    """ Allows to configure the widget in the GUI that is used to display images, plots, and figures. """
    self.callback_view_show_output_screen = show_output_screen
    self.view_output = output
    self.pixel_conversion = pixel_conversion
    self.gui_enabled = True

This second method connects the model to GUI elements responsible for showing figures and images — specifically the results of an analysis job. Once wired up, na3 can render the activity overview plots or other visualizations directly into the notebook interface. This makes for a much smoother and more intuitive analysis experience — especially for exploratory workflows.

The next method was also designed to smoothen your experience with na3´s GUI while trying to figure out which grid size you should use:


source

Model.preview_window_size


def preview_window_size(
    grid_size
)->tuple:
Exported source
@patch
def preview_window_size(self: Model, 
                        grid_size
                       ) -> tuple[Figure, Axes]:
    job_for_preview = self.analysis_job_queue[0]
    preview_fig, preview_ax = job_for_preview.preview_window_size(grid_size)
    return preview_fig, preview_ax

This method shows you how the selected grid_size will divide your image into analysis windows. It uses the first job in the queue (since you haven’t run any yet) and overlays the grid onto the corresponding image. It integrates directly with the GUI output, so users can click a button and immediately see how their current settings affect the analysis layout.

At the very beginning of this notebook, we introduced the humble Logger — a simple but powerful tool for capturing status updates throughout the analysis pipeline.

Well, guess what: we’re coming full circle now. The Model uses its own method, add_info_to_logs(), to centralize how messages are handled — whether you’re running headless, in a notebook, or in the GUI.


source

Model.add_info_to_logs


def add_info_to_logs(
    message:str, display_in_gui:bool=False, progress_in_percent:float | None=None
)->None:
Exported source
@patch
def add_info_to_logs(self: Model, 
                     message: str, 
                     display_in_gui: bool = False, 
                     progress_in_percent: float | None = None
                    ) -> None:
    self.logs.add_new_log(message)
    if (display_in_gui == True) and (self.gui_enabled == True): 
        self.callback_view_update_infos(message, progress_in_percent)

This method serves as a bridge between the raw logging system and the user interface. It logs the message internally, prints it to the console, and (if GUI mode is active) pushes the update to the GUI via the connected callback.

It’s small, but it plays a central role in making sure everything you do with NA³ is traceable, debuggable, and visible — no matter how you interact with the tool.

So if you’re seeing beautifully time-stamped updates in the GUI while a job runs, now you know who’s responsible! 🙌

And that concludes our tour through the source code of the Model — the central brain of na3!

🧪 Testing, testing, testing ..

We emphasized above already how important testing of your research software code is. Therefore, we also have some more tests implemented in here that test some of the Models behavior

from shutil import rmtree
import os
from re import compile

import pandas as pd
from pandas.testing import assert_frame_equal

from neuralactivitycubic.view import WidgetsInterface
test_filepath = Path('../test_data/00')

test00_filepath = Path('../test_data/00')
test01_filepath = Path('../test_data/01')
parent_test_filepath = Path('../test_data')
example_results_dir = Path('../test_data/00/example_test_results_for_spiking_neuron')
results_filepath = Path('../test_data/00/results_directory')
# we have to split results by own directories due to concurrency issues
results_case01_filepath = Path('../test_data/results/case_01/')
results_case02_filepath = Path('../test_data/results/case_02/')
results_case03_filepath = Path('../test_data/results/case_03/')
results_case04_filepath = Path('../test_data/results/case_04/')
results_case05_filepath = Path('../test_data/results/case_05/')
results_case06_filepath = Path('../test_data/results/case_06/')

def test_correct_model_run():
    correct_config = WidgetsInterface().export_user_settings()
    correct_config.data_source_path = test00_filepath / 'spiking_neuron.avi'
    correct_config.save_single_trace_results = True
    model = Model(correct_config)
    model.create_analysis_jobs()
    model.run_analysis()
    return model.result_directories

def test_correct_model_run_with_custom_results_dir():
    correct_config = WidgetsInterface().export_user_settings()
    correct_config.data_source_path = test00_filepath / 'spiking_neuron.avi'
    correct_config.results_filepath = results_filepath
    model = Model(correct_config)
    model.create_analysis_jobs()
    model.run_analysis()
    return correct_config.results_filepath

# tests with different modes of running analysis

def test_run_grid_focus_mode():
    """
    Test run_analysis function with focus area enabled.
    """
    grid_focus_mode_config = WidgetsInterface().export_user_settings()
    grid_focus_mode_config.data_source_path = test01_filepath
    grid_focus_mode_config.focus_area_enabled = True
    grid_focus_mode_config.results_filepath = results_case01_filepath
    model = Model(grid_focus_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return grid_focus_mode_config.results_filepath

def test_run_file_focus_mode():
    """
    Test run_analysis function with focus area enabled.
    """
    file_focus_mode_config = WidgetsInterface().export_user_settings()
    file_focus_mode_config.data_source_path = test01_filepath
    file_focus_mode_config.focus_area_enabled = True
    file_focus_mode_config.roi_mode = 'file'
    file_focus_mode_config.results_filepath = results_case01_filepath
    model = Model(file_focus_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return file_focus_mode_config.results_filepath

def test_run_grid_batch_mode():
    """
    Test run_analysis function with batch mode enabled.
    """
    grid_batch_mode_config = WidgetsInterface().export_user_settings()
    grid_batch_mode_config.data_source_path = parent_test_filepath
    grid_batch_mode_config.batch_mode = True
    grid_batch_mode_config.results_filepath = results_case02_filepath
    model = Model(grid_batch_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return grid_batch_mode_config.results_filepath

def test_run_file_batch_mode():
    """
    Test run_analysis function with batch mode enabled.
    """
    file_batch_mode_config = WidgetsInterface().export_user_settings()
    file_batch_mode_config.data_source_path = parent_test_filepath
    file_batch_mode_config.batch_mode = True
    file_batch_mode_config.roi_mode = 'file'
    file_batch_mode_config.results_filepath = results_case02_filepath
    model = Model(file_batch_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return file_batch_mode_config.results_filepath

def test_run_grid_focus_batch_mode():
    """
    Test run_analysis function with focus and batch mode enabled.
    """
    grid_focus_batch_mode_config = WidgetsInterface().export_user_settings()
    grid_focus_batch_mode_config.data_source_path = parent_test_filepath
    grid_focus_batch_mode_config.batch_mode = True
    grid_focus_batch_mode_config.focus_area_enabled = True
    grid_focus_batch_mode_config.results_filepath = results_case03_filepath
    model = Model(grid_focus_batch_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return grid_focus_batch_mode_config.results_filepath

def test_run_file_focus_batch_mode():
    """
    Test run_analysis function with focus and batch mode enabled.
    """
    file_focus_batch_mode_config = WidgetsInterface().export_user_settings()
    file_focus_batch_mode_config.data_source_path = parent_test_filepath
    file_focus_batch_mode_config.batch_mode = True
    file_focus_batch_mode_config.focus_area_enabled = True
    file_focus_batch_mode_config.roi_mode = 'file'
    file_focus_batch_mode_config.results_filepath = results_case03_filepath
    model = Model(file_focus_batch_mode_config)
    model.create_analysis_jobs()
    model.run_analysis()

    return file_focus_batch_mode_config.results_filepath
def _test_csv_files(relative_filepath_to_csv: str, results_dir: Path) -> bool:
    filepath = results_dir / relative_filepath_to_csv
    # confirm results have been created:
    if not filepath.is_file():
        return False
    # confirm computational consistency of results, while allowing minor numerical tolerance
    df_test = pd.read_csv(filepath)
    df_validation = pd.read_csv(example_results_dir / relative_filepath_to_csv)
    if assert_frame_equal(df_test, df_validation) is not None:
        return False
    else:
        return True

def test_all_peak_results(results_dir):
    return _test_csv_files('all_peak_results.csv', results_dir)

def test_amplitude_and_df_over_f_results(results_dir):
    return _test_csv_files('Amplitude_and_dF_over_F_results.csv', results_dir)

def test_auc_results(results_dir):
    return _test_csv_files('AUC_results.csv', results_dir)

def test_variance_area_results(results_dir):
    return _test_csv_files('Variance_area_results.csv', results_dir)

def test_representative_single_trace_results(results_dir):
    return _test_csv_files('single_traces/data_of_ROI_7-10.csv', results_dir)

def test_activity_overview_png(results_dir):
    filepath = results_dir / 'activity_overview.png'
    return filepath.is_file()

def test_roi_label_ids_overview_png(results_dir):
    filepath = results_dir / 'ROI_label_IDs_overview.png'
    return filepath.is_file()

def test_individual_traces_with_identified_events_pdf(results_dir):
    filepath = results_dir / 'Individual_traces_with_identified_events.pdf'
    return filepath.is_file()

def test_logs_txt(results_dir):
    filepath = results_dir / 'logs.txt'
    return filepath.is_file()

def test_user_settings_json(results_dir):
    filepath = results_dir / 'user_settings.json'
    return filepath.is_file()

def test_nwb_export(results_dir):
    filepath = results_dir / 'autogenerated_nwb_file.nwb'

def test_all_correct_files_created(results_dir: Path, test_csv: bool = True, single_trace: bool = False) -> bool:
    """
    Check if all expected files have been created in the results directory.
    """
    # confirm all csv files have been created and are correct:
    if test_csv:
        assert test_all_peak_results(results_dir), 'There is an issue with the "all_peak_results.csv" file!'
        assert test_amplitude_and_df_over_f_results(results_dir)
        assert test_auc_results(results_dir)
        assert test_variance_area_results(results_dir)
        if single_trace:
            assert test_representative_single_trace_results(results_dir)

    # confirm all other result files have been created:
    assert test_activity_overview_png(results_dir)
    assert test_roi_label_ids_overview_png(results_dir)
    assert test_logs_txt(results_dir)
    assert test_user_settings_json(results_dir)
    if single_trace:
        assert test_individual_traces_with_identified_events_pdf(results_dir)

    return True


def find_directories_after_test(base_path):
    """
    Find directories after test with timestamps.
    """
    pattern = compile(r'\d{4}_\d{2}_\d{2}_\d{2}-\d{2}-\d{2}_.+')

    matching_dirs = [
        str(base_path) + d for d in os.listdir(base_path)
        if os.path.isdir(os.path.join(base_path, d)) and pattern.fullmatch(d)
    ]

    return matching_dirs

def delete_directories_after_test(paths_list):
    """
    Delete directories after test.
    """
    for res_dir in find_directories_after_test(paths_list):
        rmtree(res_dir)

Run tests:

# confirm that model can be executed:
result_directories = test_correct_model_run()

for directory in result_directories:
    assert directory.exists()
    assert test_all_correct_files_created(directory)
    # cleanup
    rmtree(directory)
# confirm that model can be executed with custom result directory:
results_directory = test_correct_model_run_with_custom_results_dir()

assert results_directory.exists()
# only one directory with analysis files should be created, as there are only one focus area:
assert len(list(results_directory.iterdir())) == 1

for directory in results_directory.iterdir():
    assert test_all_correct_files_created(directory)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with focus mode:
results_directory = test_run_grid_focus_mode()

assert results_directory.exists()

# only one directory should be created, as there are only one focus area:
assert len(list(results_directory.iterdir())) == 2
for directory in results_directory.iterdir():
    assert test_all_correct_files_created(directory, test_csv=False)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with batch mode:
results_directory = test_run_grid_batch_mode()

assert results_directory.exists()

assert len(list(results_directory.iterdir())) == 3
for directory in results_directory.iterdir():
    for subdirectory in directory.iterdir():
        assert test_all_correct_files_created(subdirectory, test_csv=False)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with batch and focus mode:
results_directory = test_run_grid_focus_batch_mode()

assert results_directory.exists()

assert len(list(results_directory.iterdir())) == 3
for directory in results_directory.iterdir():
    # test case with multiple focus areas
    if '01' in directory.name:
        assert len(list(directory.iterdir())) == 2
    for subdirectory in directory.iterdir():
        assert test_all_correct_files_created(subdirectory, test_csv=False)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with focus mode:
results_directory = test_run_file_focus_mode()

assert results_directory.exists()

# only one directory should be created, as there are only one focus area:
assert len(list(results_directory.iterdir())) == 2
for directory in results_directory.iterdir():
    assert test_all_correct_files_created(directory, test_csv=False)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with batch mode:
results_directory = test_run_file_batch_mode()

assert results_directory.exists()

assert len(list(results_directory.iterdir())) == 3
for directory in results_directory.iterdir():
    for subdirectory in directory.iterdir():
        assert test_all_correct_files_created(subdirectory, test_csv=False)

# cleanup
rmtree(results_directory)
# confirm that model can be executed with batch and focus mode:
results_directory = test_run_file_focus_batch_mode()

assert results_directory.exists()

assert len(list(results_directory.iterdir())) == 3
for directory in results_directory.iterdir():
    # test case with multiple focus areas
    if '01' in directory.name:
        assert len(list(directory.iterdir())) == 2
    for subdirectory in directory.iterdir():
        assert test_all_correct_files_created(subdirectory, test_csv=False)

# cleanup
rmtree(results_directory)