import os
import json
import pytest
from unittest import mock
from ..id22.stscan_processor import StScanProcessor
_FILENAME = os.path.abspath(
"/data/visitor/id000000/id00/20230227/raw/Blank/Blank_0001/Blank_0001.h5"
)
_PROCESSED_DIR = os.path.abspath("/data/visitor/id000000/id00/20230227/processed")
[docs]
@pytest.fixture
def current_session_mock(mock_persistent, mocker):
m = mocker.patch("blissoda.persistent.parameters.current_session")
m = mocker.patch("blissoda.id22.stscan_processor.current_session")
m.scan_saving.proposal_name = "id000000"
m.scan_saving.proposal_type = "inhouse"
m.scan_saving.filename = _FILENAME
[docs]
@pytest.fixture
def submit_mock(mocker, current_session_mock):
return mocker.patch("blissoda.id22.stscan_processor.submit")
[docs]
@pytest.fixture
def stscan_processor(tmpdir, submit_mock):
return StScanProcessor(
_convert_workflow=str(tmpdir / "convert.json"),
_rebinsum_workflow=str(tmpdir / "rebinsum.json"),
_extract_workflow=str(tmpdir / "extract.json"),
)
[docs]
def test_create_workflows(stscan_processor, tmpdir):
stscan_processor.__info__()
assert (tmpdir / "convert.json").exists()
assert (tmpdir / "rebinsum.json").exists()
assert (tmpdir / "extract.json").exists()
[docs]
def test_persist_changes(stscan_processor, tmpdir):
stscan_processor.rebindirs = dict()
stscan_processor.binsize = [0.002, 0.003]
stscan_processor.delta2theta = 0.004
stscan_processor.range = 0, 10
stscan_processor.resfile = str(tmpdir / "temp.res")
stscan_processor.do_rebin = True
def assert_values():
assert stscan_processor.rebindirs == dict()
assert stscan_processor.binsize == [0.002, 0.003]
assert stscan_processor.delta2theta == [0.004]
assert stscan_processor.range == [0, 10]
assert stscan_processor.resfile == str(tmpdir / "temp.res")
assert stscan_processor.do_rebin
assert_values()
stscan_processor.convert_workflow = str(tmpdir / "convert2.json")
stscan_processor.rebinsum_workflow = str(tmpdir / "rebinsum2.json")
stscan_processor.extract_workflow = str(tmpdir / "extract2.json")
assert_values()
stscan_processor = StScanProcessor(
convert_workflow=str(tmpdir / "convert2.json"),
rebinsum_workflow=str(tmpdir / "rebinsum2.json"),
extract_workflow=str(tmpdir / "extract2.json"),
)
assert_values()
[docs]
def test_info(stscan_processor):
assert stscan_processor.__info__()
[docs]
def test_submit_convert(stscan_processor, submit_mock, tmpdir):
stscan_processor.submit_workflows(scannr=1)
calls = [
mock.call(
args=(_read_workflow(tmpdir / "convert.json"),),
kwargs={
"inputs": [
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": ["1.1", "1.2"]},
{"id": "convert", "name": "outprefix", "value": "id000000"},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
],
"convert_destination": os.path.join(
_PROCESSED_DIR, "workflows", "convert", "Blank_0001_scan1.json"
),
},
queue="solo1",
)
]
submit_mock.assert_has_calls(calls)
[docs]
def test_submit_rebin(stscan_processor, submit_mock, tmpdir):
stscan_processor.do_rebin = True
stscan_processor.submit_workflows()
calls = [
mock.call(
args=(_read_workflow(tmpdir / "convert.json"),),
kwargs={
"inputs": [
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "convert", "name": "outprefix", "value": "id000000"},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
],
"convert_destination": os.path.join(
_PROCESSED_DIR, "workflows", "convert", "Blank_0001.json"
),
},
queue="solo1",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.003},
{"id": "sum", "name": "binsize", "value": 0.002},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0003_b0002.json",
),
},
queue="solo2",
),
]
submit_mock.assert_has_calls(calls)
[docs]
def test_submit_sum(stscan_processor, submit_mock, tmpdir):
stscan_processor.do_sum_single = True
stscan_processor.do_sum_all = True
stscan_processor.submit_workflows()
calls = [
mock.call(
args=(_read_workflow(tmpdir / "convert.json"),),
kwargs={
"inputs": [
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "convert", "name": "outprefix", "value": "id000000"},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
],
"convert_destination": os.path.join(
_PROCESSED_DIR, "workflows", "convert", "Blank_0001.json"
),
},
queue="solo1",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.003},
{"id": "sum", "name": "binsize", "value": 0.002},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0003_b0002.json",
),
},
queue="solo2",
),
]
submit_mock.assert_has_calls(calls)
[docs]
def test_submit_sum_multiparams(stscan_processor, submit_mock, tmpdir):
stscan_processor.do_sum_single = True
stscan_processor.do_sum_all = True
stscan_processor.binsize = [0.002, 0.003]
stscan_processor.delta2theta = [0.004, 0.005]
stscan_processor.submit_workflows()
calls = [
mock.call(
args=(_read_workflow(tmpdir / "convert.json"),),
kwargs={
"inputs": [
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "convert", "name": "outprefix", "value": "id000000"},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
],
"convert_destination": os.path.join(
_PROCESSED_DIR, "workflows", "convert", "Blank_0001.json"
),
},
queue="solo1",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.004},
{"id": "sum", "name": "binsize", "value": 0.002},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0004_b0002.json",
),
},
queue="solo2",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.005},
{"id": "sum", "name": "binsize", "value": 0.002},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0005_b0002.json",
),
},
queue="solo2",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.004},
{"id": "sum", "name": "binsize", "value": 0.003},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0004_b0003.json",
),
},
queue="solo2",
),
mock.call(
args=(_read_workflow(tmpdir / "rebinsum.json"),),
kwargs={
"inputs": [
{"id": "rebin", "name": "delta2theta", "value": 0.005},
{"id": "sum", "name": "binsize", "value": 0.003},
{"id": "wait", "name": "filename", "value": _FILENAME},
{"id": "wait", "name": "entries", "value": []},
{"id": "rebin", "name": "outprefix", "value": "id000000"},
{"id": "rebin", "name": "primary_outdir", "value": _PROCESSED_DIR},
{
"id": "convert",
"name": "primary_outdir",
"value": _PROCESSED_DIR,
},
{"id": "sum", "name": "primary_outdir", "value": _PROCESSED_DIR},
],
"convert_destination": os.path.join(
_PROCESSED_DIR,
"workflows",
"rebinsum",
"Blank_0001_w0005_b0003.json",
),
},
queue="solo2",
),
]
submit_mock.assert_has_calls(calls)
def _read_workflow(filename):
with open(filename, "r") as f:
return json.load(f)