"""
.. code-block:: python
DEMO_SESSION [1]: from blissoda.demo.stscan_processor import stscan_processor
DEMO_SESSION [2]: stscan_processor
DEMO_SESSION [3]: stscan_processor.submit_workflows()
"""
import json
import os
from contextlib import contextmanager
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from ewoksjob.client import submit
from ..automation import BlissAutomationObject
from ..bliss_globals import current_session
from ..persistent.parameters import ParameterInfo
from ..utils.directories import get_processed_dir
from ..utils.directories import get_workflows_dir
class _NoRepr:
def __repr__(self):
return ""
_NOREPR = _NoRepr()
[docs]
class StScanProcessor(
BlissAutomationObject,
parameters=[
ParameterInfo("_convert_workflow"),
ParameterInfo("_rebinsum_workflow"),
ParameterInfo("_extract_workflow"),
ParameterInfo("session_in_outprefix"),
],
):
"""Submit data processing workflows on stscan results
StScanProcessor().submit_workflows()
"""
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
defaults: Optional[Dict[str, Any]] = None,
**deprecated_defaults: Dict[str, Any],
) -> None:
defaults = self._merge_defaults(deprecated_defaults, defaults)
defaults.setdefault(
"_convert_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/convert.json",
)
defaults.setdefault(
"_rebinsum_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/rebinsum.json",
)
defaults.setdefault(
"_extract_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/extract.json",
)
defaults.setdefault("session_in_outprefix", False)
super().__init__(config=config, defaults=defaults)
self._context_cache = dict()
@property
def convert_workflow(self):
return self._convert_workflow
@convert_workflow.setter
def convert_workflow(self, value):
if os.path.exists(self._convert_workflow):
with self._convert_context():
self._convert_workflow = value
else:
self._convert_workflow = value
with self._convert_context():
pass
@property
def rebinsum_workflow(self):
return self._rebinsum_workflow
@rebinsum_workflow.setter
def rebinsum_workflow(self, value):
if os.path.exists(self._rebinsum_workflow):
with self._rebinsum_context():
self._rebinsum_workflow = value
else:
self._rebinsum_workflow = value
with self._rebinsum_context():
pass
@property
def extract_workflow(self):
return self._extract_workflow
@extract_workflow.setter
def extract_workflow(self, value):
if os.path.exists(self._extract_workflow):
with self._extract_context():
self._extract_workflow = value
else:
self._extract_workflow = value
with self._extract_context():
pass
def _info_categories(self) -> Dict[str, dict]:
with self._convert_context():
with self._rebinsum_context():
with self._extract_context():
return {
"workflows": self._workflows_info(),
"execution": self._execution_info(),
"convert": self._convert_info(),
"rebin": self._rebin_info(),
"sum": self._sum_info(),
"extract": self._extract_info(),
}
def _execution_info(self) -> dict:
info = dict()
info["session_in_outprefix"] = self.session_in_outprefix
return info
def _workflows_info(self) -> dict:
info = dict()
info["convert"] = self.convert_workflow
info["rebin/sum"] = self.rebinsum_workflow
info["extract"] = self.extract_workflow
return info
def _convert_info(self) -> dict:
info = dict()
info["do_convert"] = self.do_convert
info["Results:"] = _NOREPR
info.update(self._outdirs_info(self.convertdirs))
info[" include_proposal_outdir"] = self.convert_include_proposal_outdir
return info
def _rebin_info(self) -> dict:
info = dict()
info["do_rebin"] = self.do_rebin
info["Parameters:"] = _NOREPR
info[" range"] = self.range
info[" delta2theta"] = self.delta2theta
info[" startp"] = self.startp
info[" parsfile"] = self.parsfile
info[" order_of_elements_in_roicollection"] = (
self.order_of_elements_in_roicollection
)
info[" num_col_for_depth_res"] = self.num_col_for_depth_res
info["Results:"] = _NOREPR
info.update(self._outdirs_info(self.rebindirs))
info[" include_proposal_outdir"] = self.rebin_include_proposal_outdir
return info
def _sum_info(self) -> dict:
info = dict()
info["do_sum_single"] = self.do_sum_single
info["do_sum_all"] = self.do_sum_all
info["Parameters:"] = _NOREPR
info[" binsize"] = self.binsize
info[" resfile"] = self.resfile
info[" advanced"] = self.advanced_sum_arguments
info["Results:"] = _NOREPR
info[" include_proposal_outdir"] = self.sum_include_proposal_outdir
return info
def _extract_info(self) -> dict:
info = dict()
info["do_extract"] = self.do_extract
info["Parameters:"] = _NOREPR
info[" tth_min"] = self.tth_min
info[" tth_max"] = self.tth_max
info[" full_tth"] = self.full_tth
info[" startp"] = self.startp
info[" inp_file"] = self.inp_file
info[" inp_step"] = self.inp_step
info["Results:"] = _NOREPR
info[" include_proposal_outdir"] = self.extract_include_proposal_outdir
return info
@property
def convertdirs(self):
with self._convert_context() as workflow:
return self._get_node_parameter(workflow, "convert", "outdirs")
@convertdirs.setter
def convertdirs(self, value):
with self._convert_context() as workflow:
self._set_node_parameter(workflow, "convert", "outdirs", value)
@property
def rebindirs(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "outdirs")
@rebindirs.setter
def rebindirs(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "outdirs", value)
self._set_node_parameter(workflow, "convert", "outdirs", value)
@property
def convert_include_proposal_outdir(self):
return True
@property
def rebin_include_proposal_outdir(self):
return True
@property
def sum_include_proposal_outdir(self):
return True
@property
def extract_include_proposal_outdir(self):
return True
def _outdirs_info(self, outdirs):
if not outdirs:
return dict()
return {f" {k}": v for k, v in outdirs.items()}
[docs]
def submit_workflows(
self, filename=None, outprefix=None, scannr=None, extract=False, overwrite=False
):
if scannr is None:
entries = list()
else:
entries = [f"{scannr}.1", f"{scannr}.2"]
if outprefix is None:
if self.session_in_outprefix:
outprefix = f"{current_session.scan_saving.proposal_name}_{current_session.scan_saving.proposal_session_name}"
else:
outprefix = current_session.scan_saving.proposal_name
if filename is None:
filename = current_session.scan_saving.filename
if self.do_convert:
convert_destination = self._convert_destination(
"convert", filename, scan=scannr
)
self._submit_convert_workflow(
filename,
entries,
outprefix,
list(),
convert_destination,
overwrite=overwrite,
)
if extract:
convert_destination = self._convert_destination(
"extract", filename, scan=scannr
)
self._submit_extract_workflow(
filename,
entries,
outprefix,
list(),
convert_destination,
overwrite=overwrite,
)
elif self.do_rebin:
for inputs, kw in self._iter_rebinsum_parameters():
convert_destination = self._convert_destination(
"rebinsum", filename, scan=scannr, **kw
)
self._submit_rebinsum_workflow(
filename,
entries,
outprefix,
inputs,
convert_destination,
overwrite=overwrite,
)
def _iter_rebinsum_parameters(self):
for binsize in self.binsize:
for delta2theta in self.delta2theta:
inputs = [
{"id": "rebin", "name": "delta2theta", "value": delta2theta},
{"id": "sum", "name": "binsize", "value": binsize},
]
convert_destination = {"w": delta2theta, "b": binsize}
yield inputs, convert_destination
def _convert_destination(self, workflowname, filename, **kw):
root_dir = self._get_workflows_dir(filename)
dirname = os.path.join(root_dir, workflowname)
basename = os.path.splitext(os.path.basename(filename))[0]
for name, value in kw.items():
if value is None:
continue
value = str(value).replace(".", "")
basename += f"_{name}{value}"
return os.path.join(dirname, basename + ".json")
def _get_workflows_dir(self, dataset_filename: str) -> str:
return get_workflows_dir(dataset_filename)
@property
def delta2theta(self):
with self._rebinsum_context() as workflow:
delta2theta_list = self._get_node_parameter(
workflow, "rebin", "delta2theta", dest="for_bliss"
)
if delta2theta_list is None:
delta2theta = self._get_node_parameter(workflow, "rebin", "delta2theta")
if delta2theta is None:
delta2theta_list = list()
else:
delta2theta_list = [delta2theta]
return delta2theta_list
@delta2theta.setter
def delta2theta(self, value):
try:
delta2theta_list = list(value)
except Exception:
delta2theta_list = [value]
with self._rebinsum_context() as workflow:
self._set_node_parameter(
workflow, "rebin", "delta2theta", delta2theta_list[0]
)
self._set_node_parameter(
workflow, "rebin", "delta2theta", delta2theta_list, dest="for_bliss"
)
@property
def binsize(self):
with self._rebinsum_context() as workflow:
binsize_list = self._get_node_parameter(
workflow, "sum", "binsize", dest="for_bliss"
)
if binsize_list is None:
binsize = self._get_node_parameter(workflow, "sum", "binsize")
if binsize is None:
binsize_list = list()
else:
binsize_list = [binsize]
return binsize_list
@binsize.setter
def binsize(self, value):
try:
binsize_list = list(value)
except Exception:
binsize_list = [value]
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "binsize", binsize_list[0])
self._set_node_parameter(
workflow, "sum", "binsize", binsize_list, dest="for_bliss"
)
@property
def parsfile(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "parsfile")
@parsfile.setter
def parsfile(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "parsfile", value)
@property
def range(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "range")
@range.setter
def range(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "range", value)
@property
def startp(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "startp")
@startp.setter
def startp(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "startp", value)
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "startp", value)
@property
def device(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "device")
@device.setter
def device(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "device", value)
@property
def resfile(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "resfile")
@resfile.setter
def resfile(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "resfile", value)
@property
def advanced_sum_arguments(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "advanced")
@advanced_sum_arguments.setter
def advanced_sum_arguments(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "advanced", value)
@property
def tth_min(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "tth_min")
@tth_min.setter
def tth_min(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "tth_min", value)
@property
def tth_max(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "tth_max")
@tth_max.setter
def tth_max(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "tth_max", value)
@property
def full_tth(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "full_tth")
@full_tth.setter
def full_tth(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "full_tth", value)
@property
def inp_file(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "inp_file")
@inp_file.setter
def inp_file(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "inp_file", value)
@property
def inp_step(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "inp_step")
@inp_step.setter
def inp_step(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "inp_step", value)
@property
def do_rebin(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "do", dest="for_bliss")
@do_rebin.setter
def do_rebin(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "do", value, dest="for_bliss")
@property
def do_convert(self):
return True
@property
def do_sum_single(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "sum_single")
@do_sum_single.setter
def do_sum_single(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "sum_single", value)
if value:
self.do_rebin = True
@property
def do_sum_all(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "sum_all")
@do_sum_all.setter
def do_sum_all(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "sum_all", value)
if value:
self.do_rebin = True
@property
def do_extract(self):
return "< from command stscan(..., extract=False) >"
@property
def order_of_elements_in_roicollection(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "columnorder")
@order_of_elements_in_roicollection.setter
def order_of_elements_in_roicollection(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "columnorder", value)
@property
def num_col_for_depth_res(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "num_col")
@num_col_for_depth_res.setter
def num_col_for_depth_res(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "num_col", value)
def _submit_convert_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
overwrite: bool = False,
):
with self._convert_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_convert_inputs(inputs, filename, entries)
self._add_convert_inputs(inputs, outprefix, primary_outdir, overwrite=overwrite)
self._submit_job(workflow, inputs, convert_destination, queue="solo1")
def _submit_rebinsum_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
overwrite: bool = False,
):
with self._rebinsum_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_rebin_inputs(inputs, filename, entries)
self._add_rebin_inputs(inputs, outprefix, primary_outdir, overwrite=overwrite)
self._add_convertrebin_inputs(inputs, primary_outdir, overwrite=overwrite)
self._add_sum_inputs(inputs, primary_outdir, overwrite=overwrite)
self._submit_job(workflow, inputs, convert_destination, queue="solo2")
def _submit_extract_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
overwrite: bool = False,
):
with self._extract_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_extract_inputs(inputs, filename, entries)
self._add_extract_inputs(inputs, outprefix, primary_outdir, overwrite=overwrite)
self._submit_job(workflow, inputs, convert_destination, queue="solo2")
def _submit_job(self, workflow, inputs, convert_destination, **kw):
submit(
args=(workflow,),
kwargs={"inputs": inputs, "convert_destination": convert_destination},
**kw,
)
def _save_default_convert_workflow(self):
workflow = self._convert_graph()
self._add_wait_convert_default_inputs(workflow)
self._add_convert_default_inputs(workflow)
self._save_workflow(self.convert_workflow, workflow)
return workflow
def _save_default_rebinsum_workflow(self):
workflow = self._rebinsum_graph()
self._add_wait_rebin_default_inputs(workflow)
self._add_rebin_default_inputs(workflow)
self._add_convertrebin_default_inputs(workflow)
self._add_sum_default_inputs(workflow)
self._save_workflow(self.rebinsum_workflow, workflow)
return workflow
def _save_default_extract_workflow(self):
workflow = self._extract_graph()
self._add_wait_extract_default_inputs(workflow)
self._add_extract_default_inputs(workflow)
self._save_workflow(self.extract_workflow, workflow)
return workflow
def _save_workflow(self, filename, workflow):
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as fh:
json.dump(workflow, fh, indent=2)
@contextmanager
def _convert_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("convert") as workflow:
yield workflow
@contextmanager
def _rebinsum_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("rebinsum") as workflow:
yield workflow
@contextmanager
def _extract_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("extract") as workflow:
yield workflow
@contextmanager
def _workflow_context(self, workflow_id: str):
"""Re-entrant context which loads/creates the workflow upon entering
and saves the workflow upon exiting."""
filename = self._get_workflow_filename(workflow_id)
workflow = self._context_cache.get(filename)
if workflow is not None:
yield workflow
return
if os.path.exists(filename):
with open(filename, "r") as fh:
workflow = json.load(fh)
else:
workflow = self._create_default_workflow(workflow_id)
self._context_cache[filename] = workflow
try:
yield workflow
finally:
self._context_cache.pop(filename)
filename = self._get_workflow_filename(workflow_id)
self._save_workflow(filename, workflow)
def _get_workflow_filename(self, workflow_id):
if workflow_id == "convert":
return self.convert_workflow
elif workflow_id == "rebinsum":
return self.rebinsum_workflow
elif workflow_id == "extract":
return self.extract_workflow
assert False, f"Unkown workflow '{workflow_id}'"
def _create_default_workflow(self, workflow_id):
if workflow_id == "convert":
return self._save_default_convert_workflow()
elif workflow_id == "rebinsum":
return self._save_default_rebinsum_workflow()
elif workflow_id == "extract":
return self._save_default_extract_workflow()
assert False, f"Unkown workflow '{workflow_id}'"
def _get_node_attrs(self, workflow, node_id):
for node_attrs in workflow["nodes"]:
if node_attrs["id"] == node_id:
return node_attrs
def _get_node_parameter(self, workflow, node_id, name, dest="default_inputs"):
node_attrs = self._get_node_attrs(workflow, node_id)
for argument in node_attrs.get(dest, list()):
if argument["name"] == name:
return argument["value"]
def _set_node_parameter(
self, workflow, node_id, name, value, dest="default_inputs"
):
node_attrs = self._get_node_attrs(workflow, node_id)
for argument in node_attrs.setdefault(dest, list()):
if argument["name"] == name:
argument["value"] = value
return
node_attrs[dest].append({"name": name, "value": value})
def _convert_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "convert",
"task_type": "class",
"task_identifier": "ewoksid22.convert.ID22H5ToSpec",
},
]
links = [
{
"source": "wait",
"target": "convert",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
]
return {"graph": {"id": "convert"}, "nodes": nodes, "links": links}
def _rebinsum_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "rebin",
"task_type": "class",
"task_identifier": "ewoksid22.rebin.ID22Rebin",
},
{
"id": "convert",
"task_type": "class",
"task_identifier": "ewoksid22.convert.ID22H5ToSpec",
},
{
"id": "sum",
"task_type": "class",
"task_identifier": "ewoksid22.sum.ID22Sum",
},
]
links = [
{
"source": "wait",
"target": "rebin",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "wait",
"target": "convert",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "wait",
"target": "sum",
"data_mapping": [
{"source_output": "filename", "target_input": "raw_filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "rebin",
"target": "convert",
"data_mapping": [
{"source_output": "outfile", "target_input": "rebin_filename"}
],
},
{
"source": "convert",
"target": "sum",
"data_mapping": [
{"source_output": "outfile", "target_input": "filename"},
],
},
]
return {"graph": {"id": "rebinsum"}, "nodes": nodes, "links": links}
def _extract_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "extract",
"task_type": "class",
"task_identifier": "ewoksid22.extract.ID22TopasExtract",
},
]
links = [
{
"source": "wait",
"target": "extract",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
{"source_output": "entries", "target_input": "entries"},
],
},
]
return {"graph": {"id": "extract"}, "nodes": nodes, "links": links}
def _add_wait_convert_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_convert_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_wait_rebin_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_rebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_wait_extract_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_extract_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_rebin_inputs(
self, inputs: list, outprefix: str, primary_outdir: str, overwrite: bool = False
):
if self.rebin_include_proposal_outdir:
inputs += [
{"id": "rebin", "name": "outprefix", "value": outprefix},
{"id": "rebin", "name": "primary_outdir", "value": primary_outdir},
]
inputs += [{"id": "rebin", "name": "overwrite", "value": overwrite}]
def _add_rebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "rebin")
node_attrs["for_bliss"] = [{"name": "do", "value": False}]
node_attrs["default_inputs"] = [
{
"name": "parsfile",
"value": "/data/id22/inhouse/CD_GC_PDF/advanced_50keV/patterns/for_wout/out7.pars",
},
{"name": "range", "value": [float("nan"), float("nan")]},
{"name": "delta2theta", "value": 0.003},
{"name": "startp", "value": 31},
{"name": "device", "value": 0},
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
]
def _add_convertrebin_inputs(
self, inputs: list, primary_outdir: str, overwrite: bool = False
):
if self.convert_include_proposal_outdir:
inputs += [
{"id": "convert", "name": "primary_outdir", "value": primary_outdir}
]
inputs += [{"id": "convert", "name": "overwrite", "value": overwrite}]
def _add_convertrebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "convert")
node_attrs["default_inputs"] = [
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
{"name": "ascii_extension", "value": ".adv"},
]
def _add_sum_inputs(
self, inputs: list, primary_outdir: str, overwrite: bool = False
):
if self.sum_include_proposal_outdir:
inputs += [{"id": "sum", "name": "primary_outdir", "value": primary_outdir}]
inputs += [{"id": "sum", "name": "overwrite", "value": overwrite}]
def _add_sum_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "sum")
node_attrs["default_inputs"] = [
{
"name": "resfile",
"value": "/data/id22/inhouse/CD_GC_PDF/advanced_50keV/patterns/for_wout/temp.res",
},
{"name": "binsize", "value": 0.002},
{"name": "sum_single", "value": False},
{"name": "sum_all", "value": False},
{"name": "retry_timeout", "value": 30},
{"name": "ascii_extension", "value": ".adv"},
{"name": "advanced", "value": None},
]
def _add_convert_inputs(
self, inputs: list, outprefix: str, primary_outdir: str, overwrite: bool = False
):
if self.convert_include_proposal_outdir:
inputs += [
{"id": "convert", "name": "outprefix", "value": outprefix},
{"id": "convert", "name": "primary_outdir", "value": primary_outdir},
]
inputs += [{"id": "convert", "name": "overwrite", "value": overwrite}]
def _add_convert_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "convert")
node_attrs["default_inputs"] = [
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
]
def _add_extract_inputs(
self, inputs: list, outprefix: str, primary_outdir: str, overwrite: bool = False
):
if self.extract_include_proposal_outdir:
inputs += [
{"id": "extract", "name": "outprefix", "value": outprefix},
{"id": "extract", "name": "primary_outdir", "value": primary_outdir},
]
inputs += [{"id": "extract", "name": "overwrite", "value": overwrite}]
def _add_extract_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "extract")
node_attrs["default_inputs"] = [
{"name": "tth_min", "value": None},
{"name": "tth_max", "value": None},
{"name": "full_tth", "value": False},
{"name": "inp_file", "value": "/users/opid22/out7_files/LaB6_all7_os.inp"},
{"name": "inp_step", "value": 2},
{"name": "startp", "value": 31},
]