Source code for blissoda.fluo.parameters.fluoxas
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
from .utils import directories
from .utils import models
from .workflows import fluoxas_multi_detector
from .workflows import fluoxas_multi_detector_align
from .workflows import fluoxas_nofit_align
from .workflows import fluoxas_single_detector
from .workflows import fluoxas_single_detector_align
[docs]
def fluoxas_workflow_inputs(
filenames: Sequence[str],
output_root_uri: str,
scan_ranges: Sequence[Tuple[int, int]],
config_filenames: Sequence[str],
instrument_name: Optional[str] = None,
detectors: Optional[Sequence[Union[int, str]]] = None,
detector_numbers: Optional[Sequence[int]] = None,
detector_names: Optional[Sequence[str]] = None,
fast_fitting: bool = True,
quantification: bool = True,
diagnostics: bool = True,
livetime_ref_value: Union[str, int, float, None] = None,
counter_ref_value: Union[str, int, float, None] = None,
counter_name: Optional[str] = None,
energy_name: Optional[str] = None,
energy_multiplier: Optional[float] = 2,
exclude_scans: Optional[Sequence[Sequence[int]]] = None,
resolution: Optional[Dict[str, Tuple[Union[int, float], str]]] = None,
do_align: bool = True,
align_counter: Optional[str] = None,
align_crop: Optional[bool] = True,
fast_align_counter_selection: Optional[bool] = False,
stack_axis: Optional[str] = None,
real_axes: Optional[List[str]] = None,
virtual_axes: Optional[Dict[str, str]] = None,
ignore_axes: Optional[List[str]] = None,
axis_units: Optional[Dict[str, str]] = None,
) -> Tuple[str, List[dict]]:
if len(filenames) != len(scan_ranges):
raise ValueError(
f"{len(filenames)} scan ranges (first, last) are needed for {len(filenames)} files"
)
if not exclude_scans:
exclude_scans = [()] * len(filenames)
parameters = models.common_parameters_model(
detectors=detectors,
detector_numbers=detector_numbers,
detector_names=detector_names,
config_filenames=config_filenames,
energy_name=energy_name,
energy_multiplier=energy_multiplier,
counter_name=counter_name,
instrument_name=instrument_name,
fast_fitting=fast_fitting,
quantification=quantification,
diagnostics=diagnostics,
livetime_ref_value=livetime_ref_value,
counter_ref_value=counter_ref_value,
stack=True,
real_axes=real_axes,
virtual_axes=virtual_axes,
ignore_axes=ignore_axes,
axis_units=axis_units,
)
parameters = models.FluoXasParameters(
filenames=filenames,
scan_ranges=scan_ranges,
exclude_scans=exclude_scans,
output_root_uri=output_root_uri,
resolution=resolution,
align_counter=align_counter,
fast_align_counter_selection=fast_align_counter_selection,
align_crop=align_crop,
stack_axis=stack_axis,
**parameters.model_dump(),
)
if parameters.no_fit:
workflow = fluoxas_nofit_align.WORKFLOW
inputs = fluoxas_nofit_align.workflow_inputs(parameters)
elif parameters.fit_single_detector:
if do_align:
workflow = fluoxas_single_detector_align.WORKFLOW
inputs = fluoxas_single_detector_align.workflow_inputs(parameters)
else:
workflow = fluoxas_single_detector.WORKFLOW
inputs = fluoxas_single_detector.workflow_inputs(parameters)
else:
if parameters.sum_spectra:
raise ValueError(
f"Sum before fit is not supported yet for FluoXAS. Provide {len(parameters.detector_names)} config files."
)
if do_align:
workflow = fluoxas_multi_detector_align.WORKFLOW
inputs = fluoxas_multi_detector_align.workflow_inputs(parameters)
else:
workflow = fluoxas_multi_detector.WORKFLOW
inputs = fluoxas_multi_detector.workflow_inputs(parameters)
return workflow, inputs
[docs]
def fluoxas_paths(
session: str,
sample: str,
datasets: Sequence[str],
scan_ranges: Sequence[Tuple[int, int]],
config_filenames: Sequence[str],
dirname: Optional[str] = None,
demo: bool = False,
) -> models.FluoXasPaths:
filenames = [
directories.raw_directory(session, sample, dataset, demo=demo)
for dataset in datasets
]
first_dataset = datasets[0]
first_scan_number = scan_ranges[0][0]
output_filename = directories.processed_path(
session,
sample,
first_dataset,
dirname,
f"{sample}_{first_dataset}.h5",
demo=demo,
)
convert_destination = directories.processed_path(
session,
sample,
first_dataset,
dirname,
f"{sample}_{first_dataset}_scan{first_scan_number:04d}.json",
demo=demo,
)
output_root_uri = f"{output_filename}::/{first_scan_number}.1"
config_filenames = [
directories.pymca_config_path(session, s, demo=demo) for s in config_filenames
]
workflow_path = directories.accessible_workflow_path(
session, sample, first_dataset, dirname, demo=demo
)
return models.FluoXasPaths(
filenames=filenames,
output_root_uri=output_root_uri,
convert_destination=convert_destination,
workflow_path=workflow_path,
config_filenames=config_filenames,
)