"""
.. code-block:: python
DEMO_SESSION [1]: from blissoda.demo.stscan_processor import stscan_processor
DEMO_SESSION [2]: stscan_processor
DEMO_SESSION [3]: stscan_processor.submit_workflows()
"""
import os
import json
from typing import Dict, List
from contextlib import contextmanager
try:
from bliss import current_session
except ImportError:
current_session = None
from ewoksjob.client import submit
from ..persistent.parameters import WithPersistentParameters
from ..utils.directories import get_processed_dir
from ..utils.directories import get_workflows_dir
from ..persistent.parameters import ParameterInfo
class _NoRepr:
def __repr__(self):
return ""
_NOREPR = _NoRepr()
[docs]
class StScanProcessor(
WithPersistentParameters,
parameters=[
ParameterInfo("_convert_workflow"),
ParameterInfo("_rebinsum_workflow"),
ParameterInfo("_extract_workflow"),
ParameterInfo("session_in_outprefix"),
],
):
"""Submit data processing workflows on stscan results
StScanProcessor().submit_workflows()
"""
def __init__(self, **defaults) -> None:
defaults.setdefault(
"_convert_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/convert.json",
)
defaults.setdefault(
"_rebinsum_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/rebinsum.json",
)
defaults.setdefault(
"_extract_workflow",
"/data/id22/inhouse/ewoks/resources/workflows/extract.json",
)
defaults.setdefault("session_in_outprefix", False)
super().__init__(**defaults)
self._context_cache = dict()
@property
def convert_workflow(self):
return self._convert_workflow
@convert_workflow.setter
def convert_workflow(self, value):
if os.path.exists(self._convert_workflow):
with self._convert_context():
self._convert_workflow = value
else:
self._convert_workflow = value
with self._convert_context():
pass
@property
def rebinsum_workflow(self):
return self._rebinsum_workflow
@rebinsum_workflow.setter
def rebinsum_workflow(self, value):
if os.path.exists(self._rebinsum_workflow):
with self._rebinsum_context():
self._rebinsum_workflow = value
else:
self._rebinsum_workflow = value
with self._rebinsum_context():
pass
@property
def extract_workflow(self):
return self._extract_workflow
@extract_workflow.setter
def extract_workflow(self, value):
if os.path.exists(self._extract_workflow):
with self._extract_context():
self._extract_workflow = value
else:
self._extract_workflow = value
with self._extract_context():
pass
def _info_categories(self) -> Dict[str, dict]:
with self._convert_context():
with self._rebinsum_context():
with self._extract_context():
return {
"workflows": self._workflows_info(),
"execution": self._execution_info(),
"convert": self._convert_info(),
"rebin": self._rebin_info(),
"sum": self._sum_info(),
"extract": self._extract_info(),
}
def _execution_info(self) -> dict:
info = dict()
info["session_in_outprefix"] = self.session_in_outprefix
return info
def _workflows_info(self) -> dict:
info = dict()
info["convert"] = self.convert_workflow
info["rebin/sum"] = self.rebinsum_workflow
info["extract"] = self.extract_workflow
return info
def _convert_info(self) -> dict:
info = dict()
info["do_convert"] = self.do_convert
info["Results:"] = _NOREPR
info.update(self._outdirs_info(self.convertdirs))
info[" include_proposal_outdir"] = self.convert_include_proposal_outdir
return info
def _rebin_info(self) -> dict:
info = dict()
info["do_rebin"] = self.do_rebin
info["Parameters:"] = _NOREPR
info[" range"] = self.range
info[" delta2theta"] = self.delta2theta
info[" startp"] = self.startp
info[" parsfile"] = self.parsfile
info["Results:"] = _NOREPR
info.update(self._outdirs_info(self.rebindirs))
info[" include_proposal_outdir"] = self.rebin_include_proposal_outdir
return info
def _sum_info(self) -> dict:
info = dict()
info["do_sum_single"] = self.do_sum_single
info["do_sum_all"] = self.do_sum_all
info["Parameters:"] = _NOREPR
info[" binsize"] = self.binsize
info[" resfile"] = self.resfile
info["Results:"] = _NOREPR
info[" include_proposal_outdir"] = self.sum_include_proposal_outdir
return info
def _extract_info(self) -> dict:
info = dict()
info["do_extract"] = self.do_extract
info["Parameters:"] = _NOREPR
info[" tth_min"] = self.tth_min
info[" tth_max"] = self.tth_max
info[" full_tth"] = self.full_tth
info[" startp"] = self.startp
info[" inp_file"] = self.inp_file
info[" inp_step"] = self.inp_step
info["Results:"] = _NOREPR
info[" include_proposal_outdir"] = self.extract_include_proposal_outdir
return info
@property
def convertdirs(self):
with self._convert_context() as workflow:
return self._get_node_parameter(workflow, "convert", "outdirs")
@convertdirs.setter
def convertdirs(self, value):
with self._convert_context() as workflow:
self._set_node_parameter(workflow, "convert", "outdirs", value)
@property
def rebindirs(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "outdirs")
@rebindirs.setter
def rebindirs(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "outdirs", value)
self._set_node_parameter(workflow, "convert", "outdirs", value)
@property
def convert_include_proposal_outdir(self):
return True
@property
def rebin_include_proposal_outdir(self):
return True
@property
def sum_include_proposal_outdir(self):
return True
@property
def extract_include_proposal_outdir(self):
return True
def _outdirs_info(self, outdirs):
if not outdirs:
return dict()
return {f" {k}": v for k, v in outdirs.items()}
[docs]
def submit_workflows(
self, filename=None, outprefix=None, scannr=None, extract=False
):
if scannr is None:
entries = list()
else:
entries = [f"{scannr}.1", f"{scannr}.2"]
if outprefix is None:
if self.session_in_outprefix:
outprefix = f"{current_session.scan_saving.proposal_name}_{current_session.scan_saving.proposal_session_name}"
else:
outprefix = current_session.scan_saving.proposal_name
if filename is None:
filename = current_session.scan_saving.filename
if self.do_convert:
convert_destination = self._convert_destination(
"convert", filename, scan=scannr
)
self._submit_convert_workflow(
filename, entries, outprefix, list(), convert_destination
)
if extract:
convert_destination = self._convert_destination(
"extract", filename, scan=scannr
)
self._submit_extract_workflow(
filename, entries, outprefix, list(), convert_destination
)
elif self.do_rebin:
for inputs, kw in self._iter_rebinsum_parameters():
convert_destination = self._convert_destination(
"rebinsum", filename, scan=scannr, **kw
)
self._submit_rebinsum_workflow(
filename, entries, outprefix, inputs, convert_destination
)
def _iter_rebinsum_parameters(self):
for binsize in self.binsize:
for delta2theta in self.delta2theta:
inputs = [
{"id": "rebin", "name": "delta2theta", "value": delta2theta},
{"id": "sum", "name": "binsize", "value": binsize},
]
convert_destination = {"w": delta2theta, "b": binsize}
yield inputs, convert_destination
def _convert_destination(self, workflowname, filename, **kw):
root_dir = self._get_workflows_dir(filename)
dirname = os.path.join(root_dir, workflowname)
basename = os.path.splitext(os.path.basename(filename))[0]
for name, value in kw.items():
if value is None:
continue
value = str(value).replace(".", "")
basename += f"_{name}{value}"
return os.path.join(dirname, basename + ".json")
def _get_workflows_dir(self, dataset_filename: str) -> str:
return get_workflows_dir(dataset_filename)
@property
def delta2theta(self):
with self._rebinsum_context() as workflow:
delta2theta_list = self._get_node_parameter(
workflow, "rebin", "delta2theta", dest="for_bliss"
)
if delta2theta_list is None:
delta2theta = self._get_node_parameter(workflow, "rebin", "delta2theta")
if delta2theta is None:
delta2theta_list = list()
else:
delta2theta_list = [delta2theta]
return delta2theta_list
@delta2theta.setter
def delta2theta(self, value):
try:
delta2theta_list = list(value)
except Exception:
delta2theta_list = [value]
with self._rebinsum_context() as workflow:
self._set_node_parameter(
workflow, "rebin", "delta2theta", delta2theta_list[0]
)
self._set_node_parameter(
workflow, "rebin", "delta2theta", delta2theta_list, dest="for_bliss"
)
@property
def binsize(self):
with self._rebinsum_context() as workflow:
binsize_list = self._get_node_parameter(
workflow, "sum", "binsize", dest="for_bliss"
)
if binsize_list is None:
binsize = self._get_node_parameter(workflow, "sum", "binsize")
if binsize is None:
binsize_list = list()
else:
binsize_list = [binsize]
return binsize_list
@binsize.setter
def binsize(self, value):
try:
binsize_list = list(value)
except Exception:
binsize_list = [value]
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "binsize", binsize_list[0])
self._set_node_parameter(
workflow, "sum", "binsize", binsize_list, dest="for_bliss"
)
@property
def parsfile(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "parsfile")
@parsfile.setter
def parsfile(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "parsfile", value)
@property
def range(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "range")
@range.setter
def range(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "range", value)
@property
def startp(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "startp")
@startp.setter
def startp(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "startp", value)
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "startp", value)
@property
def device(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "device")
@device.setter
def device(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "device", value)
@property
def resfile(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "resfile")
@resfile.setter
def resfile(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "resfile", value)
@property
def tth_min(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "tth_min")
@tth_min.setter
def tth_min(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "tth_min", value)
@property
def tth_max(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "tth_max")
@tth_max.setter
def tth_max(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "tth_max", value)
@property
def full_tth(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "full_tth")
@full_tth.setter
def full_tth(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "full_tth", value)
@property
def inp_file(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "inp_file")
@inp_file.setter
def inp_file(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "inp_file", value)
@property
def inp_step(self):
with self._extract_context() as workflow:
return self._get_node_parameter(workflow, "extract", "inp_step")
@inp_step.setter
def inp_step(self, value):
with self._extract_context() as workflow:
self._set_node_parameter(workflow, "extract", "inp_step", value)
@property
def do_rebin(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "rebin", "do", dest="for_bliss")
@do_rebin.setter
def do_rebin(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "rebin", "do", value, dest="for_bliss")
@property
def do_convert(self):
return True
@property
def do_sum_single(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "sum_single")
@do_sum_single.setter
def do_sum_single(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "sum_single", value)
if value:
self.do_rebin = True
@property
def do_sum_all(self):
with self._rebinsum_context() as workflow:
return self._get_node_parameter(workflow, "sum", "sum_all")
@do_sum_all.setter
def do_sum_all(self, value):
with self._rebinsum_context() as workflow:
self._set_node_parameter(workflow, "sum", "sum_all", value)
if value:
self.do_rebin = True
@property
def do_extract(self):
return "< from command stscan(..., extract=False) >"
def _submit_convert_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
):
with self._convert_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_convert_inputs(inputs, filename, entries)
self._add_convert_inputs(inputs, outprefix, primary_outdir)
self._submit_job(workflow, inputs, convert_destination, queue="solo1")
def _submit_rebinsum_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
):
with self._rebinsum_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_rebin_inputs(inputs, filename, entries)
self._add_rebin_inputs(inputs, outprefix, primary_outdir)
self._add_convertrebin_inputs(inputs, primary_outdir)
self._add_sum_inputs(inputs, primary_outdir)
self._submit_job(workflow, inputs, convert_destination, queue="solo2")
def _submit_extract_workflow(
self,
filename: str,
entries: List[str],
outprefix: str,
inputs: List[dict],
convert_destination: str,
):
with self._extract_context() as workflow:
pass
primary_outdir = get_processed_dir(filename)
self._add_wait_extract_inputs(inputs, filename, entries)
self._add_extract_inputs(inputs, outprefix, primary_outdir)
self._submit_job(workflow, inputs, convert_destination, queue="solo2")
def _submit_job(self, workflow, inputs, convert_destination, **kw):
submit(
args=(workflow,),
kwargs={"inputs": inputs, "convert_destination": convert_destination},
**kw,
)
def _save_default_convert_workflow(self):
workflow = self._convert_graph()
self._add_wait_convert_default_inputs(workflow)
self._add_convert_default_inputs(workflow)
self._save_workflow(self.convert_workflow, workflow)
return workflow
def _save_default_rebinsum_workflow(self):
workflow = self._rebinsum_graph()
self._add_wait_rebin_default_inputs(workflow)
self._add_rebin_default_inputs(workflow)
self._add_convertrebin_default_inputs(workflow)
self._add_sum_default_inputs(workflow)
self._save_workflow(self.rebinsum_workflow, workflow)
return workflow
def _save_default_extract_workflow(self):
workflow = self._extract_graph()
self._add_wait_extract_default_inputs(workflow)
self._add_extract_default_inputs(workflow)
self._save_workflow(self.extract_workflow, workflow)
return workflow
def _save_workflow(self, filename, workflow):
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as fh:
json.dump(workflow, fh, indent=2)
@contextmanager
def _convert_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("convert") as workflow:
yield workflow
@contextmanager
def _rebinsum_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("rebinsum") as workflow:
yield workflow
@contextmanager
def _extract_context(self):
"""Creates the workflow on disk when it does not exist"""
with self._workflow_context("extract") as workflow:
yield workflow
@contextmanager
def _workflow_context(self, workflow_id: str):
"""Re-entrant context which loads/creates the workflow upon entering
and saves the workflow upon exiting."""
filename = self._get_workflow_filename(workflow_id)
workflow = self._context_cache.get(filename)
if workflow is not None:
yield workflow
return
if os.path.exists(filename):
with open(filename, "r") as fh:
workflow = json.load(fh)
else:
workflow = self._create_default_workflow(workflow_id)
self._context_cache[filename] = workflow
try:
yield workflow
finally:
self._context_cache.pop(filename)
filename = self._get_workflow_filename(workflow_id)
self._save_workflow(filename, workflow)
def _get_workflow_filename(self, workflow_id):
if workflow_id == "convert":
return self.convert_workflow
elif workflow_id == "rebinsum":
return self.rebinsum_workflow
elif workflow_id == "extract":
return self.extract_workflow
assert False, f"Unkown workflow '{workflow_id}'"
def _create_default_workflow(self, workflow_id):
if workflow_id == "convert":
return self._save_default_convert_workflow()
elif workflow_id == "rebinsum":
return self._save_default_rebinsum_workflow()
elif workflow_id == "extract":
return self._save_default_extract_workflow()
assert False, f"Unkown workflow '{workflow_id}'"
def _get_node_attrs(self, workflow, node_id):
for node_attrs in workflow["nodes"]:
if node_attrs["id"] == node_id:
return node_attrs
def _get_node_parameter(self, workflow, node_id, name, dest="default_inputs"):
node_attrs = self._get_node_attrs(workflow, node_id)
for argument in node_attrs.get(dest, list()):
if argument["name"] == name:
return argument["value"]
def _set_node_parameter(
self, workflow, node_id, name, value, dest="default_inputs"
):
node_attrs = self._get_node_attrs(workflow, node_id)
for argument in node_attrs.setdefault(dest, list()):
if argument["name"] == name:
argument["value"] = value
return
node_attrs[dest].append({"name": name, "value": value})
def _convert_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "convert",
"task_type": "class",
"task_identifier": "ewoksid22.convert.ID22H5ToSpec",
},
]
links = [
{
"source": "wait",
"target": "convert",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
]
return {"graph": {"id": "convert"}, "nodes": nodes, "links": links}
def _rebinsum_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "rebin",
"task_type": "class",
"task_identifier": "ewoksid22.rebin.ID22Rebin",
},
{
"id": "convert",
"task_type": "class",
"task_identifier": "ewoksid22.convert.ID22H5ToSpec",
},
{
"id": "sum",
"task_type": "class",
"task_identifier": "ewoksid22.sum.ID22Sum",
},
]
links = [
{
"source": "wait",
"target": "rebin",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "wait",
"target": "convert",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "wait",
"target": "sum",
"data_mapping": [
{"source_output": "filename", "target_input": "raw_filename"},
# {"source_output": "entries", "target_input": "entries"},
],
},
{
"source": "rebin",
"target": "convert",
"data_mapping": [
{"source_output": "outfile", "target_input": "rebin_filename"}
],
},
{
"source": "convert",
"target": "sum",
"data_mapping": [
{"source_output": "outfile", "target_input": "filename"},
],
},
]
return {"graph": {"id": "rebinsum"}, "nodes": nodes, "links": links}
def _extract_graph(self):
nodes = [
{
"id": "wait",
"task_type": "class",
"task_identifier": "ewoksid22.wait.WaitScansFinished",
},
{
"id": "extract",
"task_type": "class",
"task_identifier": "ewoksid22.extract.ID22TopasExtract",
},
]
links = [
{
"source": "wait",
"target": "extract",
"data_mapping": [
{"source_output": "filename", "target_input": "filename"},
{"source_output": "entries", "target_input": "entries"},
],
},
]
return {"graph": {"id": "extract"}, "nodes": nodes, "links": links}
def _add_wait_convert_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_convert_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_wait_rebin_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_rebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_wait_extract_inputs(self, inputs: list, filename: str, entries: list):
inputs += [
{"id": "wait", "name": "filename", "value": filename},
{"id": "wait", "name": "entries", "value": entries},
]
def _add_wait_extract_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "wait")
node_attrs["default_inputs"] = [
{"name": "retry_timeout", "value": 30},
]
def _add_rebin_inputs(self, inputs: list, outprefix: str, primary_outdir: str):
if self.rebin_include_proposal_outdir:
inputs += [
{"id": "rebin", "name": "outprefix", "value": outprefix},
{"id": "rebin", "name": "primary_outdir", "value": primary_outdir},
]
def _add_rebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "rebin")
node_attrs["for_bliss"] = [{"name": "do", "value": False}]
node_attrs["default_inputs"] = [
{
"name": "parsfile",
"value": "/data/id22/inhouse/CD_GC_PDF/advanced_50keV/patterns/for_wout/out7.pars",
},
{"name": "range", "value": [float("nan"), float("nan")]},
{"name": "delta2theta", "value": 0.003},
{"name": "startp", "value": 31},
{"name": "device", "value": 0},
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
]
def _add_convertrebin_inputs(self, inputs: list, primary_outdir: str):
if self.convert_include_proposal_outdir:
inputs += [
{"id": "convert", "name": "primary_outdir", "value": primary_outdir}
]
def _add_convertrebin_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "convert")
node_attrs["default_inputs"] = [
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
{"name": "ascii_extension", "value": ".adv"},
]
def _add_sum_inputs(self, inputs: list, primary_outdir: str):
if self.sum_include_proposal_outdir:
inputs += [{"id": "sum", "name": "primary_outdir", "value": primary_outdir}]
def _add_sum_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "sum")
node_attrs["default_inputs"] = [
{
"name": "resfile",
"value": "/data/id22/inhouse/CD_GC_PDF/advanced_50keV/patterns/for_wout/temp.res",
},
{"name": "binsize", "value": 0.002},
{"name": "sum_single", "value": False},
{"name": "sum_all", "value": False},
{"name": "retry_timeout", "value": 30},
{"name": "ascii_extension", "value": ".adv"},
]
def _add_convert_inputs(self, inputs: list, outprefix: str, primary_outdir: str):
if self.convert_include_proposal_outdir:
inputs += [
{"id": "convert", "name": "outprefix", "value": outprefix},
{
"id": "convert",
"name": "primary_outdir",
"value": primary_outdir,
},
]
def _add_convert_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "convert")
node_attrs["default_inputs"] = [
{
"name": "outdirs",
"value": {
"primary": "opid22@diffract22new:/users/opid22/data1/",
# "secondary": "opid22@lid22bliss:/users/opid22/data1/",
"backup": "opid22@lid22bliss:/data/id22/backup/data22/",
},
},
{"name": "retry_timeout", "value": 30},
]
def _add_extract_inputs(self, inputs: list, outprefix: str, primary_outdir: str):
if self.extract_include_proposal_outdir:
inputs += [
{"id": "extract", "name": "outprefix", "value": outprefix},
{
"id": "extract",
"name": "primary_outdir",
"value": primary_outdir,
},
]
def _add_extract_default_inputs(self, workflow: dict):
node_attrs = self._get_node_attrs(workflow, "extract")
node_attrs["default_inputs"] = [
{"name": "tth_min", "value": None},
{"name": "tth_max", "value": None},
{"name": "full_tth", "value": False},
{"name": "inp_file", "value": "/users/opid22/out7_files/LaB6_all7_os.inp"},
{"name": "inp_step", "value": 2},
{"name": "startp", "value": 31},
]