Module qudi_hira_analysis.io_handler

Expand source code
import ast
import contextlib
import datetime
import inspect
import itertools
import pickle
from functools import wraps
from pathlib import Path
from typing import Callable, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pySPM


class IOHandler:
    """ Handle all read and write operations. """

    def __init__(
            self,
            base_read_path: Optional[Path] = None,
            base_write_path: Optional[Path] = None
    ):
        super().__init__()
        self.base_read_path = base_read_path
        self.base_write_path = base_write_path

    @staticmethod
    def _add_base_read_path(func: Callable) -> Callable:
        """
        Decorator to add the `base_read_path` to the filepath if it is not None

        Args:
            func: Function to be decorated

        Returns:
            Decorated function
        """

        @wraps(func)
        def wrapper(self, filepath: Path, **kwargs):
            if self.base_read_path:
                filepath = self.base_read_path / filepath
            return func(self, filepath, **kwargs)

        return wrapper

    @staticmethod
    def _add_base_write_path(func: Callable) -> Callable:
        """
        Decorator to add the `base_write_path` to the filepath if it is not None

        Args:
            func: Function to be decorated

        Returns:
            Decorated function
        """

        @wraps(func)
        def wrapper(self, filepath: Path, **kwargs):
            if self.base_write_path:
                filepath = self.base_write_path / filepath
            filepath.parent.mkdir(exist_ok=True)
            return func(self, filepath, **kwargs)

        return wrapper

    @staticmethod
    def _check_extension(ext: str) -> Callable:
        """
        Decorator to check the extension of the filepath is correct

        Args:
            ext: Extension to check for

        Returns:
            Decorated function
        """

        def decorator(func: Callable) -> Callable:
            @wraps(func)
            def wrapper(self, filepath: Path, **kwargs) -> Callable:
                if filepath.suffix == ext:
                    return func(self, filepath, **kwargs)
                elif filepath.suffix == "":
                    return func(self, filepath.with_suffix(ext), **kwargs)
                else:
                    raise OSError(
                        f"Invalid extension '{filepath.suffix}' in '{filepath}', "
                        f"extension should be '{ext}'")

            return wrapper

        return decorator

    @_add_base_read_path
    @_check_extension(".dat")
    def read_qudi_parameters(self, filepath: Path) -> dict:
        """Extract parameters from a qudi dat file.

        Args:
            filepath: Path to the qudi .dat file

        Returns:
            Dictionary of parameters
        """
        params = {}
        with open(filepath) as file:
            for line in file:
                if line == '#=====\n':
                    break
                else:
                    # noinspection PyBroadException
                    try:
                        # Remove # from beginning of lines
                        line = line[1:]
                        if line.count(":") == 1:
                            # Add params to dictionary
                            label, value = line.split(":")
                            if value != "\n":
                                params[label] = ast.literal_eval(
                                    inspect.cleandoc(value))
                        elif line.count(":") == 3:
                            # Handle files with timestamps in them
                            label = line.split(":")[0]
                            timestamp_str = "".join(line.split(":")[1:]).strip()
                            datetime_str = datetime.datetime.strptime(
                                timestamp_str,
                                "%d.%m.%Y %Hh%Mmin%Ss"
                            ).replace(tzinfo=datetime.timezone.utc)
                            params[label] = datetime_str
                    except Exception as _:
                        pass
        return params

    @_add_base_read_path
    @_check_extension(".dat")
    def read_into_dataframe(self, filepath: Path) -> pd.DataFrame:
        """Read a qudi data file into a pandas DataFrame for analysis.

        Args:
            filepath: Path to the qudi data file

        Returns:
            DataFrame containing the data from the qudi data file
        """
        with open(filepath) as handle:
            # Generate column names for DataFrame by parsing the file
            *_comments, names = itertools.takewhile(lambda line: line.startswith('#'),
                                                    handle)
            names = names[1:].strip().split("\t")
        return pd.read_csv(filepath, names=names, comment="#", sep="\t")

    @_add_base_read_path
    def read_csv(self, filepath: Path, **kwargs) -> pd.DataFrame:
        """ Read a csv file into a pandas DataFrame. """
        return pd.read_csv(filepath, **kwargs)

    @_add_base_read_path
    def read_excel(self, filepath: Path, **kwargs) -> pd.DataFrame:
        """ Read a csv file into a pandas DataFrame. """
        return pd.read_excel(filepath, **kwargs)

    @_add_base_read_path
    @_check_extension(".dat")
    def read_confocal_into_dataframe(self, filepath: Path) -> pd.DataFrame:
        """ Read a qudi confocal data file into a pandas DataFrame for analysis. """
        confocal_params = self.read_qudi_parameters(filepath)
        data = self.read_into_ndarray(filepath, delimiter="\t")
        # Use the confocal parameters to generate the index & columns for the DataFrame
        index = np.linspace(
            confocal_params['X image min (m)'],
            confocal_params['X image max (m)'],
            data.shape[0]
        )
        columns = np.linspace(
            confocal_params['Y image min'],
            confocal_params['Y image max'],
            data.shape[1]
        )
        df = pd.DataFrame(data, index=index, columns=columns)
        # Sort the index to get origin (0, 0) in the lower left corner of the DataFrame
        df.sort_index(axis=0, ascending=False, inplace=True)
        return df

    @_add_base_read_path
    def read_into_ndarray(self, filepath: Path, **kwargs) -> np.ndarray:
        """ Read a file into a numpy ndarray. """
        return np.genfromtxt(filepath, **kwargs)

    @_add_base_read_path
    def read_into_ndarray_transposed(self, filepath: Path, **kwargs) -> np.ndarray:
        """ Read a file into a transposed numpy ndarray. """
        return np.genfromtxt(filepath, **kwargs).T

    @_add_base_read_path
    @_check_extension(".pys")
    def read_pys(self, filepath: Path) -> dict:
        """ Read raw .pys data files into a dictionary. """
        byte_dict = np.load(str(filepath), encoding="bytes", allow_pickle=True)
        # Convert byte string keys to normal strings
        return {key.decode('utf8'): byte_dict.get(key) for key in byte_dict}

    @_add_base_read_path
    @_check_extension(".pkl")
    def read_pkl(self, filepath: Path) -> dict:
        """ Read pickle files into a dictionary. """
        with open(filepath, 'rb') as f:
            file = pickle.load(f)
        return file

    @_add_base_read_path
    @_check_extension(".dat")
    def read_nanonis_data(self, filepath: Path) -> pd.DataFrame:
        """Read data from a Nanonis .dat file.

        Args:
            filepath: Path to the Nanonis .dat file.

        Returns:
            DataFrame of data.
        """
        skip_rows = 0
        with open(filepath) as dat_file:
            for num, line in enumerate(dat_file, 1):
                if "[DATA]" in line:
                    # Find number of rows to skip when extracting data
                    skip_rows = num
                    break
                if "#=====" in line:
                    skip_rows = num
                    break
        df = pd.read_table(filepath, sep="\t", skiprows=skip_rows)
        return df

    @_add_base_read_path
    @_check_extension(".dat")
    def read_nanonis_parameters(self, filepath: Path) -> dict:
        """Read parameters from a Nanonis .dat file.

        Args:
            filepath: Path to the Nanonis .dat file.

        Returns:
            Dictionary of parameters.
        """
        parameters = {}
        with open(filepath) as dat_file:
            for line in dat_file:
                if line == "\n":
                    # Break when reaching empty line
                    break
                elif "User" in line or line.split("\t")[0] == "":
                    # Cleanup excess parameters and skip empty lines
                    pass
                else:
                    label, value, _ = line.split("\t")
                    with contextlib.suppress(ValueError):
                        value = float(value)
                    if "Oscillation Control>" in label:
                        label = label.replace("Oscillation Control>", "")
                    parameters[label] = value
        return parameters

    @_add_base_read_path
    @_check_extension(".sxm")
    def read_nanonis_spm_data(self, filepath: Path) -> pySPM.SXM:
        """Read a Nanonis .sxm data file.

        Args:
            filepath: Path to the .sxm file.

        Returns:
            pySPM.SXM object containing the data.
        """
        return pySPM.SXM(filepath)

    @_add_base_read_path
    @_check_extension(".001")
    def read_bruker_spm_data(self, filepath: Path) -> pySPM.Bruker:
        """Read a Bruker SPM data file.

        Args:
            filepath: Path to the .001 file.

        Returns:
            pySPM.Bruker object containing the data.
        """
        return pySPM.Bruker(filepath)

    @_add_base_read_path
    @_check_extension(".txt")
    def read_pfeiffer_data(self, filepath: Path) -> pd.DataFrame:
        """Read data stored by Pfeiffer vacuum monitoring software.

        Args:
            filepath: Path to the text file.

        Returns:
            DataFrame containing the data.
        """
        # Extract rows including the header
        df = pd.read_csv(filepath, sep="\t", skiprows=[0, 2, 3, 4])
        # Combine data and time columns together
        df["Date"] = df["Date"] + " " + df["Time"]
        df = df.drop("Time", axis=1)
        # Infer datetime format and convert to datetime objects
        df["Date"] = pd.to_datetime(df["Date"], infer_datetime_format=True)
        # Set datetime as index
        df = df.set_index("Date", drop=True)
        return df

    @_add_base_read_path
    @_check_extension(".xls")
    def read_lakeshore_data(self, filepath: Path) -> pd.DataFrame:
        """Read data stored by Lakeshore temperature monitor software.

        Args:
            filepath: Path to the Excel file.

        Returns:
            DataFrame containing the data.
        """
        # Extract only the origin timestamp
        origin = pd.read_excel(
            filepath, skiprows=1, nrows=1, usecols=[1], header=None
        )[1][0]
        # Remove any tzinfo to prevent future exceptions in pandas
        origin = origin.replace("CET", "")
        # Parse datetime object from timestamp
        origin = pd.to_datetime(origin)
        # Create DataFrame and drop empty cols
        df = pd.read_excel(filepath, skiprows=3)
        df = df.dropna(axis=1, how="all")
        # Add datetimes to DataFrame
        df["Datetime"] = pd.to_datetime(df["Time"], unit="ms", origin=origin)
        df = df.drop("Time", axis=1)
        # Set datetime as index
        df = df.set_index("Datetime", drop=True)
        return df

    @_add_base_read_path
    @_check_extension(".txt")
    def read_oceanoptics_data(self, filepath: str) -> pd.DataFrame:
        """Read spectrometer data from OceanOptics spectrometer.

        Args:
            filepath: Path to the data file.

        Returns:
            DataFrame containing the wavelength and intensity data.
        """
        df = pd.read_csv(filepath, sep="\t", skiprows=14,
                         names=["wavelength", "intensity"])
        return df

    @staticmethod
    def __get_forward_backward_counts(count_rates, num_pixels):
        split_array = np.split(count_rates, 2 * num_pixels)
        # Extract forward scan array as every second element
        forward_counts = np.stack(split_array[::2])
        # Extract backward scan array as every shifted second element
        # Flip scan so that backward and forward scans represent the same data
        backward_counts = np.flip(np.stack(split_array[1::2]), axis=1)
        return forward_counts, backward_counts

    def read_pixelscanner_data(self, filepath: Path) -> (
            pySPM.SPM_image, pySPM.SPM_image):
        """ Read data from a PixelScanner measurement.

        Args:
            filepath: Path to the data file.

        Returns:
            Forward and backward scan data.
        """
        df = self.read_into_dataframe(filepath)
        num_pixels = int(np.sqrt(len(df) // 2))

        if num_pixels ** 2 != len(df) // 2:
            raise ValueError("Number of pixels does not match data length.")

        try:
            fwd, bwd = self.__get_forward_backward_counts(df["count_rates"], num_pixels)
        except KeyError:
            try:
                fwd, bwd = self.__get_forward_backward_counts(df["Count Rates (cps)"],
                                                              num_pixels)
            except KeyError:
                # Support old data format
                fwd = df["forward (cps)"].to_numpy().reshape(num_pixels, num_pixels)
                bwd = df["backward (cps)"].to_numpy().reshape(num_pixels, num_pixels)

        fwd = pySPM.SPM_image(fwd, channel="Forward", _type="NV-PL")
        bwd = pySPM.SPM_image(bwd, channel="Backward", _type="NV-PL")
        return fwd, bwd

    @_add_base_write_path
    @_check_extension(".pkl")
    def save_pkl(self, filepath: Path, obj: object):
        """Saves pickle files.

        Args:
            filepath: Path to the data file.
            obj: Object to be saved.
        """
        with open(filepath, 'wb') as f:
            pickle.dump(obj, f)

    @_add_base_write_path
    @_check_extension(".pys")
    def save_pys(self, filepath: Path, dictionary: dict):
        """Saves .pys files.

        Args:
            filepath: Path to the data file.
            dictionary: Dictionary to be saved.
        """
        with open(filepath, 'wb') as f:
            pickle.dump(dictionary, f, 1)

    @_add_base_write_path
    @_check_extension(".pys")
    def save_df(self, filepath: Path, df: pd.DataFrame):
        """ Save Dataframe as csv. """
        df.to_csv(filepath, sep='\t', encoding='utf-8')

    @_add_base_write_path
    def save_figures(self, filepath: Path, fig: plt.Figure, **kwargs):
        """Saves figures from matplotlib plot data.

        By default, saves as jpg, png, pdf and svg.

        Args:
            fig: Matplotlib figure to save.
            filepath: Name of figure to save.
            only_jpg: If True, only save as jpg (default: False).
            only_pdf: If True, only save as pdf (default: False).
            **kwargs: Keyword arguments passed to matplotlib.pyplot.savefig().
        """
        extensions = None
        if "only_jpg" in kwargs:
            if kwargs.get("only_jpg"):
                extensions = [".jpg"]
            kwargs.pop("only_jpg", None)
        elif "only_pdf" in kwargs:
            if kwargs.get("only_pdf"):
                extensions = [".pdf"]
            kwargs.pop("only_pdf", None)
        else:
            extensions = [".jpg", ".pdf", ".svg", ".png"]

        for ext in extensions:
            fig.savefig(filepath.with_suffix(ext), dpi=200, **kwargs)

Classes

class IOHandler (base_read_path: Optional[pathlib.Path] = None, base_write_path: Optional[pathlib.Path] = None)

Handle all read and write operations.

Expand source code
class IOHandler:
    """ Handle all read and write operations. """

    def __init__(
            self,
            base_read_path: Optional[Path] = None,
            base_write_path: Optional[Path] = None
    ):
        super().__init__()
        self.base_read_path = base_read_path
        self.base_write_path = base_write_path

    @staticmethod
    def _add_base_read_path(func: Callable) -> Callable:
        """
        Decorator to add the `base_read_path` to the filepath if it is not None

        Args:
            func: Function to be decorated

        Returns:
            Decorated function
        """

        @wraps(func)
        def wrapper(self, filepath: Path, **kwargs):
            if self.base_read_path:
                filepath = self.base_read_path / filepath
            return func(self, filepath, **kwargs)

        return wrapper

    @staticmethod
    def _add_base_write_path(func: Callable) -> Callable:
        """
        Decorator to add the `base_write_path` to the filepath if it is not None

        Args:
            func: Function to be decorated

        Returns:
            Decorated function
        """

        @wraps(func)
        def wrapper(self, filepath: Path, **kwargs):
            if self.base_write_path:
                filepath = self.base_write_path / filepath
            filepath.parent.mkdir(exist_ok=True)
            return func(self, filepath, **kwargs)

        return wrapper

    @staticmethod
    def _check_extension(ext: str) -> Callable:
        """
        Decorator to check the extension of the filepath is correct

        Args:
            ext: Extension to check for

        Returns:
            Decorated function
        """

        def decorator(func: Callable) -> Callable:
            @wraps(func)
            def wrapper(self, filepath: Path, **kwargs) -> Callable:
                if filepath.suffix == ext:
                    return func(self, filepath, **kwargs)
                elif filepath.suffix == "":
                    return func(self, filepath.with_suffix(ext), **kwargs)
                else:
                    raise OSError(
                        f"Invalid extension '{filepath.suffix}' in '{filepath}', "
                        f"extension should be '{ext}'")

            return wrapper

        return decorator

    @_add_base_read_path
    @_check_extension(".dat")
    def read_qudi_parameters(self, filepath: Path) -> dict:
        """Extract parameters from a qudi dat file.

        Args:
            filepath: Path to the qudi .dat file

        Returns:
            Dictionary of parameters
        """
        params = {}
        with open(filepath) as file:
            for line in file:
                if line == '#=====\n':
                    break
                else:
                    # noinspection PyBroadException
                    try:
                        # Remove # from beginning of lines
                        line = line[1:]
                        if line.count(":") == 1:
                            # Add params to dictionary
                            label, value = line.split(":")
                            if value != "\n":
                                params[label] = ast.literal_eval(
                                    inspect.cleandoc(value))
                        elif line.count(":") == 3:
                            # Handle files with timestamps in them
                            label = line.split(":")[0]
                            timestamp_str = "".join(line.split(":")[1:]).strip()
                            datetime_str = datetime.datetime.strptime(
                                timestamp_str,
                                "%d.%m.%Y %Hh%Mmin%Ss"
                            ).replace(tzinfo=datetime.timezone.utc)
                            params[label] = datetime_str
                    except Exception as _:
                        pass
        return params

    @_add_base_read_path
    @_check_extension(".dat")
    def read_into_dataframe(self, filepath: Path) -> pd.DataFrame:
        """Read a qudi data file into a pandas DataFrame for analysis.

        Args:
            filepath: Path to the qudi data file

        Returns:
            DataFrame containing the data from the qudi data file
        """
        with open(filepath) as handle:
            # Generate column names for DataFrame by parsing the file
            *_comments, names = itertools.takewhile(lambda line: line.startswith('#'),
                                                    handle)
            names = names[1:].strip().split("\t")
        return pd.read_csv(filepath, names=names, comment="#", sep="\t")

    @_add_base_read_path
    def read_csv(self, filepath: Path, **kwargs) -> pd.DataFrame:
        """ Read a csv file into a pandas DataFrame. """
        return pd.read_csv(filepath, **kwargs)

    @_add_base_read_path
    def read_excel(self, filepath: Path, **kwargs) -> pd.DataFrame:
        """ Read a csv file into a pandas DataFrame. """
        return pd.read_excel(filepath, **kwargs)

    @_add_base_read_path
    @_check_extension(".dat")
    def read_confocal_into_dataframe(self, filepath: Path) -> pd.DataFrame:
        """ Read a qudi confocal data file into a pandas DataFrame for analysis. """
        confocal_params = self.read_qudi_parameters(filepath)
        data = self.read_into_ndarray(filepath, delimiter="\t")
        # Use the confocal parameters to generate the index & columns for the DataFrame
        index = np.linspace(
            confocal_params['X image min (m)'],
            confocal_params['X image max (m)'],
            data.shape[0]
        )
        columns = np.linspace(
            confocal_params['Y image min'],
            confocal_params['Y image max'],
            data.shape[1]
        )
        df = pd.DataFrame(data, index=index, columns=columns)
        # Sort the index to get origin (0, 0) in the lower left corner of the DataFrame
        df.sort_index(axis=0, ascending=False, inplace=True)
        return df

    @_add_base_read_path
    def read_into_ndarray(self, filepath: Path, **kwargs) -> np.ndarray:
        """ Read a file into a numpy ndarray. """
        return np.genfromtxt(filepath, **kwargs)

    @_add_base_read_path
    def read_into_ndarray_transposed(self, filepath: Path, **kwargs) -> np.ndarray:
        """ Read a file into a transposed numpy ndarray. """
        return np.genfromtxt(filepath, **kwargs).T

    @_add_base_read_path
    @_check_extension(".pys")
    def read_pys(self, filepath: Path) -> dict:
        """ Read raw .pys data files into a dictionary. """
        byte_dict = np.load(str(filepath), encoding="bytes", allow_pickle=True)
        # Convert byte string keys to normal strings
        return {key.decode('utf8'): byte_dict.get(key) for key in byte_dict}

    @_add_base_read_path
    @_check_extension(".pkl")
    def read_pkl(self, filepath: Path) -> dict:
        """ Read pickle files into a dictionary. """
        with open(filepath, 'rb') as f:
            file = pickle.load(f)
        return file

    @_add_base_read_path
    @_check_extension(".dat")
    def read_nanonis_data(self, filepath: Path) -> pd.DataFrame:
        """Read data from a Nanonis .dat file.

        Args:
            filepath: Path to the Nanonis .dat file.

        Returns:
            DataFrame of data.
        """
        skip_rows = 0
        with open(filepath) as dat_file:
            for num, line in enumerate(dat_file, 1):
                if "[DATA]" in line:
                    # Find number of rows to skip when extracting data
                    skip_rows = num
                    break
                if "#=====" in line:
                    skip_rows = num
                    break
        df = pd.read_table(filepath, sep="\t", skiprows=skip_rows)
        return df

    @_add_base_read_path
    @_check_extension(".dat")
    def read_nanonis_parameters(self, filepath: Path) -> dict:
        """Read parameters from a Nanonis .dat file.

        Args:
            filepath: Path to the Nanonis .dat file.

        Returns:
            Dictionary of parameters.
        """
        parameters = {}
        with open(filepath) as dat_file:
            for line in dat_file:
                if line == "\n":
                    # Break when reaching empty line
                    break
                elif "User" in line or line.split("\t")[0] == "":
                    # Cleanup excess parameters and skip empty lines
                    pass
                else:
                    label, value, _ = line.split("\t")
                    with contextlib.suppress(ValueError):
                        value = float(value)
                    if "Oscillation Control>" in label:
                        label = label.replace("Oscillation Control>", "")
                    parameters[label] = value
        return parameters

    @_add_base_read_path
    @_check_extension(".sxm")
    def read_nanonis_spm_data(self, filepath: Path) -> pySPM.SXM:
        """Read a Nanonis .sxm data file.

        Args:
            filepath: Path to the .sxm file.

        Returns:
            pySPM.SXM object containing the data.
        """
        return pySPM.SXM(filepath)

    @_add_base_read_path
    @_check_extension(".001")
    def read_bruker_spm_data(self, filepath: Path) -> pySPM.Bruker:
        """Read a Bruker SPM data file.

        Args:
            filepath: Path to the .001 file.

        Returns:
            pySPM.Bruker object containing the data.
        """
        return pySPM.Bruker(filepath)

    @_add_base_read_path
    @_check_extension(".txt")
    def read_pfeiffer_data(self, filepath: Path) -> pd.DataFrame:
        """Read data stored by Pfeiffer vacuum monitoring software.

        Args:
            filepath: Path to the text file.

        Returns:
            DataFrame containing the data.
        """
        # Extract rows including the header
        df = pd.read_csv(filepath, sep="\t", skiprows=[0, 2, 3, 4])
        # Combine data and time columns together
        df["Date"] = df["Date"] + " " + df["Time"]
        df = df.drop("Time", axis=1)
        # Infer datetime format and convert to datetime objects
        df["Date"] = pd.to_datetime(df["Date"], infer_datetime_format=True)
        # Set datetime as index
        df = df.set_index("Date", drop=True)
        return df

    @_add_base_read_path
    @_check_extension(".xls")
    def read_lakeshore_data(self, filepath: Path) -> pd.DataFrame:
        """Read data stored by Lakeshore temperature monitor software.

        Args:
            filepath: Path to the Excel file.

        Returns:
            DataFrame containing the data.
        """
        # Extract only the origin timestamp
        origin = pd.read_excel(
            filepath, skiprows=1, nrows=1, usecols=[1], header=None
        )[1][0]
        # Remove any tzinfo to prevent future exceptions in pandas
        origin = origin.replace("CET", "")
        # Parse datetime object from timestamp
        origin = pd.to_datetime(origin)
        # Create DataFrame and drop empty cols
        df = pd.read_excel(filepath, skiprows=3)
        df = df.dropna(axis=1, how="all")
        # Add datetimes to DataFrame
        df["Datetime"] = pd.to_datetime(df["Time"], unit="ms", origin=origin)
        df = df.drop("Time", axis=1)
        # Set datetime as index
        df = df.set_index("Datetime", drop=True)
        return df

    @_add_base_read_path
    @_check_extension(".txt")
    def read_oceanoptics_data(self, filepath: str) -> pd.DataFrame:
        """Read spectrometer data from OceanOptics spectrometer.

        Args:
            filepath: Path to the data file.

        Returns:
            DataFrame containing the wavelength and intensity data.
        """
        df = pd.read_csv(filepath, sep="\t", skiprows=14,
                         names=["wavelength", "intensity"])
        return df

    @staticmethod
    def __get_forward_backward_counts(count_rates, num_pixels):
        split_array = np.split(count_rates, 2 * num_pixels)
        # Extract forward scan array as every second element
        forward_counts = np.stack(split_array[::2])
        # Extract backward scan array as every shifted second element
        # Flip scan so that backward and forward scans represent the same data
        backward_counts = np.flip(np.stack(split_array[1::2]), axis=1)
        return forward_counts, backward_counts

    def read_pixelscanner_data(self, filepath: Path) -> (
            pySPM.SPM_image, pySPM.SPM_image):
        """ Read data from a PixelScanner measurement.

        Args:
            filepath: Path to the data file.

        Returns:
            Forward and backward scan data.
        """
        df = self.read_into_dataframe(filepath)
        num_pixels = int(np.sqrt(len(df) // 2))

        if num_pixels ** 2 != len(df) // 2:
            raise ValueError("Number of pixels does not match data length.")

        try:
            fwd, bwd = self.__get_forward_backward_counts(df["count_rates"], num_pixels)
        except KeyError:
            try:
                fwd, bwd = self.__get_forward_backward_counts(df["Count Rates (cps)"],
                                                              num_pixels)
            except KeyError:
                # Support old data format
                fwd = df["forward (cps)"].to_numpy().reshape(num_pixels, num_pixels)
                bwd = df["backward (cps)"].to_numpy().reshape(num_pixels, num_pixels)

        fwd = pySPM.SPM_image(fwd, channel="Forward", _type="NV-PL")
        bwd = pySPM.SPM_image(bwd, channel="Backward", _type="NV-PL")
        return fwd, bwd

    @_add_base_write_path
    @_check_extension(".pkl")
    def save_pkl(self, filepath: Path, obj: object):
        """Saves pickle files.

        Args:
            filepath: Path to the data file.
            obj: Object to be saved.
        """
        with open(filepath, 'wb') as f:
            pickle.dump(obj, f)

    @_add_base_write_path
    @_check_extension(".pys")
    def save_pys(self, filepath: Path, dictionary: dict):
        """Saves .pys files.

        Args:
            filepath: Path to the data file.
            dictionary: Dictionary to be saved.
        """
        with open(filepath, 'wb') as f:
            pickle.dump(dictionary, f, 1)

    @_add_base_write_path
    @_check_extension(".pys")
    def save_df(self, filepath: Path, df: pd.DataFrame):
        """ Save Dataframe as csv. """
        df.to_csv(filepath, sep='\t', encoding='utf-8')

    @_add_base_write_path
    def save_figures(self, filepath: Path, fig: plt.Figure, **kwargs):
        """Saves figures from matplotlib plot data.

        By default, saves as jpg, png, pdf and svg.

        Args:
            fig: Matplotlib figure to save.
            filepath: Name of figure to save.
            only_jpg: If True, only save as jpg (default: False).
            only_pdf: If True, only save as pdf (default: False).
            **kwargs: Keyword arguments passed to matplotlib.pyplot.savefig().
        """
        extensions = None
        if "only_jpg" in kwargs:
            if kwargs.get("only_jpg"):
                extensions = [".jpg"]
            kwargs.pop("only_jpg", None)
        elif "only_pdf" in kwargs:
            if kwargs.get("only_pdf"):
                extensions = [".pdf"]
            kwargs.pop("only_pdf", None)
        else:
            extensions = [".jpg", ".pdf", ".svg", ".png"]

        for ext in extensions:
            fig.savefig(filepath.with_suffix(ext), dpi=200, **kwargs)

Subclasses

Methods

def read_bruker_spm_data(self, filepath: pathlib.Path) ‑> pySPM.Bruker.Bruker

Read a Bruker SPM data file.

Args

filepath
Path to the .001 file.

Returns

pySPM.Bruker object containing the data.

Expand source code
@_add_base_read_path
@_check_extension(".001")
def read_bruker_spm_data(self, filepath: Path) -> pySPM.Bruker:
    """Read a Bruker SPM data file.

    Args:
        filepath: Path to the .001 file.

    Returns:
        pySPM.Bruker object containing the data.
    """
    return pySPM.Bruker(filepath)
def read_confocal_into_dataframe(self, filepath: pathlib.Path) ‑> pandas.core.frame.DataFrame

Read a qudi confocal data file into a pandas DataFrame for analysis.

Expand source code
@_add_base_read_path
@_check_extension(".dat")
def read_confocal_into_dataframe(self, filepath: Path) -> pd.DataFrame:
    """ Read a qudi confocal data file into a pandas DataFrame for analysis. """
    confocal_params = self.read_qudi_parameters(filepath)
    data = self.read_into_ndarray(filepath, delimiter="\t")
    # Use the confocal parameters to generate the index & columns for the DataFrame
    index = np.linspace(
        confocal_params['X image min (m)'],
        confocal_params['X image max (m)'],
        data.shape[0]
    )
    columns = np.linspace(
        confocal_params['Y image min'],
        confocal_params['Y image max'],
        data.shape[1]
    )
    df = pd.DataFrame(data, index=index, columns=columns)
    # Sort the index to get origin (0, 0) in the lower left corner of the DataFrame
    df.sort_index(axis=0, ascending=False, inplace=True)
    return df
def read_csv(self, filepath: pathlib.Path, **kwargs) ‑> pandas.core.frame.DataFrame

Read a csv file into a pandas DataFrame.

Expand source code
@_add_base_read_path
def read_csv(self, filepath: Path, **kwargs) -> pd.DataFrame:
    """ Read a csv file into a pandas DataFrame. """
    return pd.read_csv(filepath, **kwargs)
def read_excel(self, filepath: pathlib.Path, **kwargs) ‑> pandas.core.frame.DataFrame

Read a csv file into a pandas DataFrame.

Expand source code
@_add_base_read_path
def read_excel(self, filepath: Path, **kwargs) -> pd.DataFrame:
    """ Read a csv file into a pandas DataFrame. """
    return pd.read_excel(filepath, **kwargs)
def read_into_dataframe(self, filepath: pathlib.Path) ‑> pandas.core.frame.DataFrame

Read a qudi data file into a pandas DataFrame for analysis.

Args

filepath
Path to the qudi data file

Returns

DataFrame containing the data from the qudi data file

Expand source code
@_add_base_read_path
@_check_extension(".dat")
def read_into_dataframe(self, filepath: Path) -> pd.DataFrame:
    """Read a qudi data file into a pandas DataFrame for analysis.

    Args:
        filepath: Path to the qudi data file

    Returns:
        DataFrame containing the data from the qudi data file
    """
    with open(filepath) as handle:
        # Generate column names for DataFrame by parsing the file
        *_comments, names = itertools.takewhile(lambda line: line.startswith('#'),
                                                handle)
        names = names[1:].strip().split("\t")
    return pd.read_csv(filepath, names=names, comment="#", sep="\t")
def read_into_ndarray(self, filepath: pathlib.Path, **kwargs) ‑> numpy.ndarray

Read a file into a numpy ndarray.

Expand source code
@_add_base_read_path
def read_into_ndarray(self, filepath: Path, **kwargs) -> np.ndarray:
    """ Read a file into a numpy ndarray. """
    return np.genfromtxt(filepath, **kwargs)
def read_into_ndarray_transposed(self, filepath: pathlib.Path, **kwargs) ‑> numpy.ndarray

Read a file into a transposed numpy ndarray.

Expand source code
@_add_base_read_path
def read_into_ndarray_transposed(self, filepath: Path, **kwargs) -> np.ndarray:
    """ Read a file into a transposed numpy ndarray. """
    return np.genfromtxt(filepath, **kwargs).T
def read_lakeshore_data(self, filepath: pathlib.Path) ‑> pandas.core.frame.DataFrame

Read data stored by Lakeshore temperature monitor software.

Args

filepath
Path to the Excel file.

Returns

DataFrame containing the data.

Expand source code
@_add_base_read_path
@_check_extension(".xls")
def read_lakeshore_data(self, filepath: Path) -> pd.DataFrame:
    """Read data stored by Lakeshore temperature monitor software.

    Args:
        filepath: Path to the Excel file.

    Returns:
        DataFrame containing the data.
    """
    # Extract only the origin timestamp
    origin = pd.read_excel(
        filepath, skiprows=1, nrows=1, usecols=[1], header=None
    )[1][0]
    # Remove any tzinfo to prevent future exceptions in pandas
    origin = origin.replace("CET", "")
    # Parse datetime object from timestamp
    origin = pd.to_datetime(origin)
    # Create DataFrame and drop empty cols
    df = pd.read_excel(filepath, skiprows=3)
    df = df.dropna(axis=1, how="all")
    # Add datetimes to DataFrame
    df["Datetime"] = pd.to_datetime(df["Time"], unit="ms", origin=origin)
    df = df.drop("Time", axis=1)
    # Set datetime as index
    df = df.set_index("Datetime", drop=True)
    return df
def read_nanonis_data(self, filepath: pathlib.Path) ‑> pandas.core.frame.DataFrame

Read data from a Nanonis .dat file.

Args

filepath
Path to the Nanonis .dat file.

Returns

DataFrame of data.

Expand source code
@_add_base_read_path
@_check_extension(".dat")
def read_nanonis_data(self, filepath: Path) -> pd.DataFrame:
    """Read data from a Nanonis .dat file.

    Args:
        filepath: Path to the Nanonis .dat file.

    Returns:
        DataFrame of data.
    """
    skip_rows = 0
    with open(filepath) as dat_file:
        for num, line in enumerate(dat_file, 1):
            if "[DATA]" in line:
                # Find number of rows to skip when extracting data
                skip_rows = num
                break
            if "#=====" in line:
                skip_rows = num
                break
    df = pd.read_table(filepath, sep="\t", skiprows=skip_rows)
    return df
def read_nanonis_parameters(self, filepath: pathlib.Path) ‑> dict

Read parameters from a Nanonis .dat file.

Args

filepath
Path to the Nanonis .dat file.

Returns

Dictionary of parameters.

Expand source code
@_add_base_read_path
@_check_extension(".dat")
def read_nanonis_parameters(self, filepath: Path) -> dict:
    """Read parameters from a Nanonis .dat file.

    Args:
        filepath: Path to the Nanonis .dat file.

    Returns:
        Dictionary of parameters.
    """
    parameters = {}
    with open(filepath) as dat_file:
        for line in dat_file:
            if line == "\n":
                # Break when reaching empty line
                break
            elif "User" in line or line.split("\t")[0] == "":
                # Cleanup excess parameters and skip empty lines
                pass
            else:
                label, value, _ = line.split("\t")
                with contextlib.suppress(ValueError):
                    value = float(value)
                if "Oscillation Control>" in label:
                    label = label.replace("Oscillation Control>", "")
                parameters[label] = value
    return parameters
def read_nanonis_spm_data(self, filepath: pathlib.Path) ‑> pySPM.SXM.SXM

Read a Nanonis .sxm data file.

Args

filepath
Path to the .sxm file.

Returns

pySPM.SXM object containing the data.

Expand source code
@_add_base_read_path
@_check_extension(".sxm")
def read_nanonis_spm_data(self, filepath: Path) -> pySPM.SXM:
    """Read a Nanonis .sxm data file.

    Args:
        filepath: Path to the .sxm file.

    Returns:
        pySPM.SXM object containing the data.
    """
    return pySPM.SXM(filepath)
def read_oceanoptics_data(self, filepath: str) ‑> pandas.core.frame.DataFrame

Read spectrometer data from OceanOptics spectrometer.

Args

filepath
Path to the data file.

Returns

DataFrame containing the wavelength and intensity data.

Expand source code
@_add_base_read_path
@_check_extension(".txt")
def read_oceanoptics_data(self, filepath: str) -> pd.DataFrame:
    """Read spectrometer data from OceanOptics spectrometer.

    Args:
        filepath: Path to the data file.

    Returns:
        DataFrame containing the wavelength and intensity data.
    """
    df = pd.read_csv(filepath, sep="\t", skiprows=14,
                     names=["wavelength", "intensity"])
    return df
def read_pfeiffer_data(self, filepath: pathlib.Path) ‑> pandas.core.frame.DataFrame

Read data stored by Pfeiffer vacuum monitoring software.

Args

filepath
Path to the text file.

Returns

DataFrame containing the data.

Expand source code
@_add_base_read_path
@_check_extension(".txt")
def read_pfeiffer_data(self, filepath: Path) -> pd.DataFrame:
    """Read data stored by Pfeiffer vacuum monitoring software.

    Args:
        filepath: Path to the text file.

    Returns:
        DataFrame containing the data.
    """
    # Extract rows including the header
    df = pd.read_csv(filepath, sep="\t", skiprows=[0, 2, 3, 4])
    # Combine data and time columns together
    df["Date"] = df["Date"] + " " + df["Time"]
    df = df.drop("Time", axis=1)
    # Infer datetime format and convert to datetime objects
    df["Date"] = pd.to_datetime(df["Date"], infer_datetime_format=True)
    # Set datetime as index
    df = df.set_index("Date", drop=True)
    return df
def read_pixelscanner_data(self, filepath: pathlib.Path) ‑> ()

Read data from a PixelScanner measurement.

Args

filepath
Path to the data file.

Returns

Forward and backward scan data.

Expand source code
def read_pixelscanner_data(self, filepath: Path) -> (
        pySPM.SPM_image, pySPM.SPM_image):
    """ Read data from a PixelScanner measurement.

    Args:
        filepath: Path to the data file.

    Returns:
        Forward and backward scan data.
    """
    df = self.read_into_dataframe(filepath)
    num_pixels = int(np.sqrt(len(df) // 2))

    if num_pixels ** 2 != len(df) // 2:
        raise ValueError("Number of pixels does not match data length.")

    try:
        fwd, bwd = self.__get_forward_backward_counts(df["count_rates"], num_pixels)
    except KeyError:
        try:
            fwd, bwd = self.__get_forward_backward_counts(df["Count Rates (cps)"],
                                                          num_pixels)
        except KeyError:
            # Support old data format
            fwd = df["forward (cps)"].to_numpy().reshape(num_pixels, num_pixels)
            bwd = df["backward (cps)"].to_numpy().reshape(num_pixels, num_pixels)

    fwd = pySPM.SPM_image(fwd, channel="Forward", _type="NV-PL")
    bwd = pySPM.SPM_image(bwd, channel="Backward", _type="NV-PL")
    return fwd, bwd
def read_pkl(self, filepath: pathlib.Path) ‑> dict

Read pickle files into a dictionary.

Expand source code
@_add_base_read_path
@_check_extension(".pkl")
def read_pkl(self, filepath: Path) -> dict:
    """ Read pickle files into a dictionary. """
    with open(filepath, 'rb') as f:
        file = pickle.load(f)
    return file
def read_pys(self, filepath: pathlib.Path) ‑> dict

Read raw .pys data files into a dictionary.

Expand source code
@_add_base_read_path
@_check_extension(".pys")
def read_pys(self, filepath: Path) -> dict:
    """ Read raw .pys data files into a dictionary. """
    byte_dict = np.load(str(filepath), encoding="bytes", allow_pickle=True)
    # Convert byte string keys to normal strings
    return {key.decode('utf8'): byte_dict.get(key) for key in byte_dict}
def read_qudi_parameters(self, filepath: pathlib.Path) ‑> dict

Extract parameters from a qudi dat file.

Args

filepath
Path to the qudi .dat file

Returns

Dictionary of parameters

Expand source code
@_add_base_read_path
@_check_extension(".dat")
def read_qudi_parameters(self, filepath: Path) -> dict:
    """Extract parameters from a qudi dat file.

    Args:
        filepath: Path to the qudi .dat file

    Returns:
        Dictionary of parameters
    """
    params = {}
    with open(filepath) as file:
        for line in file:
            if line == '#=====\n':
                break
            else:
                # noinspection PyBroadException
                try:
                    # Remove # from beginning of lines
                    line = line[1:]
                    if line.count(":") == 1:
                        # Add params to dictionary
                        label, value = line.split(":")
                        if value != "\n":
                            params[label] = ast.literal_eval(
                                inspect.cleandoc(value))
                    elif line.count(":") == 3:
                        # Handle files with timestamps in them
                        label = line.split(":")[0]
                        timestamp_str = "".join(line.split(":")[1:]).strip()
                        datetime_str = datetime.datetime.strptime(
                            timestamp_str,
                            "%d.%m.%Y %Hh%Mmin%Ss"
                        ).replace(tzinfo=datetime.timezone.utc)
                        params[label] = datetime_str
                except Exception as _:
                    pass
    return params
def save_df(self, filepath: pathlib.Path, df: pandas.core.frame.DataFrame)

Save Dataframe as csv.

Expand source code
@_add_base_write_path
@_check_extension(".pys")
def save_df(self, filepath: Path, df: pd.DataFrame):
    """ Save Dataframe as csv. """
    df.to_csv(filepath, sep='\t', encoding='utf-8')
def save_figures(self, filepath: pathlib.Path, fig: matplotlib.figure.Figure, **kwargs)

Saves figures from matplotlib plot data.

By default, saves as jpg, png, pdf and svg.

Args

fig
Matplotlib figure to save.
filepath
Name of figure to save.
only_jpg
If True, only save as jpg (default: False).
only_pdf
If True, only save as pdf (default: False).
**kwargs
Keyword arguments passed to matplotlib.pyplot.savefig().
Expand source code
@_add_base_write_path
def save_figures(self, filepath: Path, fig: plt.Figure, **kwargs):
    """Saves figures from matplotlib plot data.

    By default, saves as jpg, png, pdf and svg.

    Args:
        fig: Matplotlib figure to save.
        filepath: Name of figure to save.
        only_jpg: If True, only save as jpg (default: False).
        only_pdf: If True, only save as pdf (default: False).
        **kwargs: Keyword arguments passed to matplotlib.pyplot.savefig().
    """
    extensions = None
    if "only_jpg" in kwargs:
        if kwargs.get("only_jpg"):
            extensions = [".jpg"]
        kwargs.pop("only_jpg", None)
    elif "only_pdf" in kwargs:
        if kwargs.get("only_pdf"):
            extensions = [".pdf"]
        kwargs.pop("only_pdf", None)
    else:
        extensions = [".jpg", ".pdf", ".svg", ".png"]

    for ext in extensions:
        fig.savefig(filepath.with_suffix(ext), dpi=200, **kwargs)
def save_pkl(self, filepath: pathlib.Path, obj: object)

Saves pickle files.

Args

filepath
Path to the data file.
obj
Object to be saved.
Expand source code
@_add_base_write_path
@_check_extension(".pkl")
def save_pkl(self, filepath: Path, obj: object):
    """Saves pickle files.

    Args:
        filepath: Path to the data file.
        obj: Object to be saved.
    """
    with open(filepath, 'wb') as f:
        pickle.dump(obj, f)
def save_pys(self, filepath: pathlib.Path, dictionary: dict)

Saves .pys files.

Args

filepath
Path to the data file.
dictionary
Dictionary to be saved.
Expand source code
@_add_base_write_path
@_check_extension(".pys")
def save_pys(self, filepath: Path, dictionary: dict):
    """Saves .pys files.

    Args:
        filepath: Path to the data file.
        dictionary: Dictionary to be saved.
    """
    with open(filepath, 'wb') as f:
        pickle.dump(dictionary, f, 1)