Source code for blissoda.utils.directories

import os
from typing import Tuple

from ..bliss_globals import current_session


[docs] def get_dataset_dir(dataset_filename: str) -> str: return os.path.dirname(os.path.abspath(dataset_filename))
[docs] def get_collection_dir(dataset_filename: str) -> str: return _abs_join(get_dataset_dir(dataset_filename), "..")
[docs] def get_raw_dir(dataset_filename: str) -> str: return _abs_join(get_collection_dir(dataset_filename), "..")
[docs] def get_proposal_dir(dataset_filename: str) -> str: dirname = get_raw_dir(dataset_filename) if os.path.basename(dirname) == "raw": # version 2 return _abs_join(dirname, "..") # version 1: proposal == raw return dirname
[docs] def get_processed_dir(dataset_filename: str) -> str: version, dirname = get_directory_version(dataset_filename) if version == 3: return _abs_join(dirname, "..", "PROCESSED_DATA") if version == 2: return _abs_join(dirname, "..", "processed") return _abs_join(dirname, "processed")
[docs] def get_dataset_processed_dir(dataset_filename: str, *subdirs) -> str: root = get_processed_dir(dataset_filename, *subdirs) collection = os.path.basename(get_collection_dir(dataset_filename)) dataset = os.path.basename(get_dataset_dir(dataset_filename)) return os.path.join(root, collection, dataset)
[docs] def get_processed_subdir(dataset_filename, *subdirs) -> str: return os.path.join(get_processed_dir(dataset_filename), *subdirs)
[docs] def get_workflows_dir(dataset_filename: str) -> str: return get_processed_subdir(dataset_filename, "workflows")
[docs] def get_nobackup_dir(dataset_filename: str) -> str: version, dirname = get_directory_version(dataset_filename) if version == 3: return _abs_join(dirname, "..", "NOBACKUP") if version == 2: return _abs_join(dirname, "..", "_nobackup") return _abs_join(dirname, "_nobackup")
[docs] def get_directory_version(dataset_filename: str) -> Tuple[int, str]: """Returns directory structure version number and the raw data directory""" dirname = get_raw_dir(dataset_filename) if os.path.basename(dirname) == "RAW_DATA": return 3, dirname if os.path.basename(dirname) == "raw": return 2, dirname # proposal == raw return 1, dirname
[docs] def get_session_dir(dataset_filename: str) -> str: return _abs_join(get_proposal_dir(dataset_filename), "..")
def _abs_join(*args): return os.path.abspath(os.path.join(*args))
[docs] def get_filename(scan) -> str: filename = scan.scan_info.get("filename") if filename: return filename return current_session.scan_saving.filename
[docs] def scan_processed_directory(scan) -> str: return get_dataset_processed_dir(get_filename(scan))
[docs] def workflow_destination(scan) -> str: filename = get_filename(scan) scan_nb = scan.scan_info.get("scan_nb") root = scan_processed_directory(scan) stem = os.path.splitext(os.path.basename(filename))[0] basename = f"{stem}_{scan_nb:04d}.json" return os.path.join(root, basename)
[docs] def master_output_filename(scan) -> str: """Filename which can be used to inspect the results after the processing.""" filename = get_filename(scan) root = scan_processed_directory(scan) basename = os.path.basename(filename) return os.path.join(root, basename)