Getting Started#

This tutorial demonstrates how the execution of Ewoks workflows can be integrated in Bliss acquisitions.

Note

The examples in this tutorial are strictly speaking not online data analysis because the workflows will be executed after the scan is finished, not while the scan is running.

In this tutorial, you will implement a loopscan that triggers an Ewoks workflow with results being returned to Bliss or saved in HDF5.

Trigger workflows#

After every loopscan we want to perform calculations on the raw data and return the result.

For this, we will create a custom scan command loopscan_with_calc.

To be accessible from Bliss, the command must be defined in the Bliss session script or in a user script. In this case, we will put it in a user script called oda_scans.py.

# oda_scans.py

from ewoksjob.client import submit
from bliss import setup_globals


def loopscan_with_calc(*args, **kwargs):
    # Run the scan
    scan = setup_globals.loopscan(*args, **kwargs)

    # Specify the workflow to run
    workflow = {"graph": {"id": "nothing_yet"}}

    # Trigger the workflow
    future = submit(args=(workflow,))

    # Wait for the result
    results = future.get()
    print("Results:", results)

Then, instead of running the normal scan command

# BLISS

DEMO_SESSION [1]: loopscan(10, 0.1, diode1, diode2)
         Out [1]: Scan(number=4, name=loopscan, path=.../RAW_DATA/sample/sample_0001/sample_0001.h5)

we run the new scan command after loading the script that defines it

# BLISS

DEMO_SESSION [2]: user_script_homedir("/path/to/scripts/directory")
DEMO_SESSION [3]: user_script_load("oda_scans")
DEMO_SESSION [4]: user.loopscan_with_calc(10, 0.1, diode1, diode2)
Results: {}

The printing of Results: {} means that the workflow was correctly launched. Now, we can start doing actual processing with this workflow.

Workflow Parameters#

Data processing needs parameters, at the very least a reference to the data that needs to be processed. Very often, this is the HDF5 URL of the scan data.

We modify our loopscan_with_calc function in oda_scans to retrieve this HDF5 URL from the scan:

# oda_scans.py

# Run the scan
scan = setup_globals.loopscan(*args)

# URL of the scan data
filename = scan.scan_info["filename"]
scan_nb = scan.scan_info["scan_nb"]
scan_url = f"{filename}::/{scan_nb}.1"

Warning

The user script needs to be reloaded via user_script_load("oda_scans") after each modification.

For the sake of the example, we will use a simple workflow that adds two strings and pass two parameters to it:

  • The scan URL we just retrieved

  • An additional parameter called prefix (in this case, a simple string)

# oda_scans.py

# Specify the workflow to run
workflow = {
    "graph": {"id": "nothing_yet"},
    "nodes": [
        {"id": "node1", "task_type": "method", "task_identifier": "operator.add"}
    ],
}

# Workflow parameters
prefix = "Process scan "
parameters = [
    {"id": "node1", "name": 0, "value": prefix},
    {"id": "node1", "name": 1, "value": scan_url},
]

# Trigger the workflow
future = submit(args=(workflow,), kwargs={"inputs": parameters})

The result of the workflow is now the concatenation of the prefix and the scan URL. We can run several scans to verify that it changes for each scan

# BLISS

DEMO_SESSION [5]: user.loopscan_with_calc(10, 0.1, diode1, diode2)
Results: {'return_value': 'Process scan .../RAW_DATA/sample/sample_0001/sample_0001.h5::/7.1'}
DEMO_SESSION [6]: user.loopscan_with_calc(10, 0.1, diode1, diode2)
Results: {'return_value': 'Process scan .../RAW_DATA/sample/sample_0001/sample_0001.h5::/8.1'}

Now that we have the scan URL, it is time to make real operations on the scan data.

Data processing#

In this example, the processing will consist in the division of the two diode signals diode1 and diode2. For this, we need to write a specific Ewoks task that implements this operation.

We will now create a task NormalizeData in a file tasks.py that must be accesible to the Ewoks worker (recall that it is the worker that is running the workflow).

# tasks.py in the working directory of the Ewoks worker

from ewokscore import Task
from blissdata.h5api import dynamic_hdf5


class NormalizeData(
    Task,
    input_names=["scan_url", "i0name", "i1name"],
    output_names=["result"],
):
    def run(self):
        filename_in, _, scan = self.inputs.scan_url.rpartition("::")

        with dynamic_hdf5.File(filename_in) as nxroot_in:
            nxentry_in = nxroot_in[scan]
            nxentry_in["end_time"]  # wait until scan is fully written
            I1 = nxentry_in[f"instrument/{self.inputs.i1name}/data"]
            I0 = nxentry_in[f"instrument/{self.inputs.i0name}/data"]
            I1norm = I1[()] / I0[()]
            self.outputs.result = I1norm

Note

Since the HDF5 file content might still be changing when executing the workflow, we cannot simply read the dataset content (e.g. I1[()]) because it would only return the data that has been written at the moment of reading.

To ensure we read all dataset values, we read a dataset (here end_time) that only appears at the end of the scan. Another solution is to iterate over the datasets instead slicing them as shown in the BLISS documentation.

Again, we update in oda_scans the definition of the workflow to use our new task (tasks.NormalizeData) and the workflow parameters to specify its inputs:

# oda_scans.py

# Specify the workflow to run
workflow = {
    "graph": {"id": "normalize"},
    "nodes": [
        {
            "id": "node1",
            "task_type": "class",
            "task_identifier": "tasks.NormalizeData",
        }
    ],
}

# Workflow parameters
parameters = [
    {"id": "node1", "name": "scan_url", "value": scan_url},
    {"id": "node1", "name": "i0name", "value": "diode1"},
    {"id": "node1", "name": "i1name", "value": "diode2"},
]

The workflow can then be triggered as before by using our loopscan_with_calc command.

# BLISS

DEMO_SESSION [7]: user.loopscan_with_calc(10, 0.1, diode1, diode2)
Results: {'i1norm': array([1.07374567, 0.65471348, 0.54708319, 1.53021916, 0.77015426,
    0.8388511 , 0.98417602, 1.03111438, 1.61111265, 0.70460637])}

We can check that the results indeed contain i1norm = diode2 / diode1.

Save results#

Often we want to save the processed data that is being generated in this way. The usual way is to save an HDF5 file in the workflow itself.

For this, we will expand our previous task NormalizeData to add HDF5 saving of the results:

# tasks.py
import os
from blissoda.utils.directories import get_processed_dir

class NormalizeDataWithSaving(
    Task,
    input_names=["scan_url", "i0name", "i1name"],
    output_names=["result_url"],
):
    def run(self):
        filename_in, _, scan = self.inputs.scan_url.rpartition("::")

        with dynamic_hdf5.File(filename_in) as nxroot_in:
            nxentry_in = nxroot_in[scan]
            nxentry_in["end_time"]  # wait until scan is fully written
            I1 = nxentry_in[f"instrument/{self.inputs.i1name}/data"]
            I0 = nxentry_in[f"instrument/{self.inputs.i0name}/data"]
            I1norm = I1[()] / I0[()]

            # Write data in the processed directory PROCESSED_DATA
            processed_dir = get_processed_dir(filename_in)
            os.makedirs(processed_dir, exist_ok=True)
            basename = os.path.splitext(os.path.basename(filename_in))[0]
            filename_out = os.path.join(processed_dir, f"{basename}.h5")

            self.outputs.result_url = save_result(
                filename_in, filename_out, nxentry_in, scan, self.inputs.i1name, I1norm
            )

The save_result function saves results in a NeXus-compliant HDF5 file with links to the raw scan data. The implementation below can be reproduced as-is.

Implementation of save_result
# tasks.py

import os
import json
import numpy
import h5py

from silx.io import h5py_utils
from blissoda.utils.directories import get_processed_dir

def save_result(
    filename_in: str,
    filename_out: str,
    nxentry_in: h5py.Group,
    scan: str,
    signal_name: str,
    signal_values: numpy.ndarray,
) -> str:
    with h5py_utils.open_item(filename_out, "/", mode="a") as nxroot_out:
        result_url = f"{filename_out}::{scan}"
        if scan in nxroot_out:
            print(f"Already processed : {result_url}")
            return result_url

        # Prepare NeXus structure for saving scan processing results
        nxroot_out.attrs["NX_class"] = "NXroot"
        nxentry_out = nxroot_out.create_group(scan)
        nxroot_out.attrs["default"] = scan
        nxentry_out.attrs["NX_class"] = "NXentry"

        # Save processing results
        normalized_dataset = save_normalized_results(nxentry_out, signal_values, dict())

        measurement = nxentry_out.create_group("measurement")
        measurement.attrs["NX_class"] = "NXcollection"
        measurement[f"{signal_name}_normalized"] = h5py.SoftLink(
            normalized_dataset.name
        )

        # Links to raw data
        save_raw_data_links(filename_in, nxentry_in, nxentry_out)

        return result_url


def save_normalized_results(
    nxentry_out: h5py.Group, normalized: numpy.ndarray, parameters: dict
) -> h5py.Dataset:
    nxprocess = nxentry_out.create_group("normalize")
    nxentry_out.attrs["default"] = "normalize"
    nxprocess.attrs["NX_class"] = "NXprocess"
    nxprocess["program"] = "ID26"

    nxdata = nxprocess.create_group("results")
    nxprocess.attrs["default"] = "results"
    nxdata.attrs["NX_class"] = "NXdata"
    nxdata.attrs["signal"] = "normalized"
    nxdata["normalized"] = normalized

    nxnote = nxprocess.create_group("config")
    nxnote.attrs["NX_class"] = "NXnote"
    nxnote["data"] = json.dumps(parameters)
    nxnote["type"] = "application/json"

    return nxdata["normalized"]


def save_raw_data_links(
    filename_in: str, nxentry_in: h5py.Group, nxentry_out: h5py.Group
) -> None:
    nxentry_out["instrument"] = h5py.ExternalLink(
        filename_in, nxentry_in["instrument"].name
    )
    nxentry_out["sample"] = h5py.ExternalLink(filename_in, nxentry_in["sample"].name)
    measurement_in = nxentry_in["measurement"]
    measurement_out = nxentry_out["measurement"]
    for name in measurement_in:
        measurement_out[name] = h5py.ExternalLink(
            filename_in, measurement_in[name].name
        )

The workflow in oda_scans.py then needs to be modified once again to use the NormalizeDataWithSaving task:

# oda_scans.py

# Specify the workflow to run
workflow = {
    "graph": {"id": "normalize_with_saving"},
    "nodes": [
        {
            "id": "node1",
            "task_type": "class",
            "task_identifier": "tasks.NormalizeDataWithSaving",
        }
    ],
}

By running the workflow through our custom command, we can see that the result of this workflow is now the URL where the results are saved

# BLISS

DEMO_SESSION [3]: user.loopscan_with_calc(10, 0.1, diode1, diode2)
Results: {'result_url': '.../PROCESSED_DATA/sample_0001.h5::/1.1'}

In addition to saving the results we also create links to the raw data HDF5 file. As a result you have a file that looks exactly like the raw HDF5 file, with additional calculated channels, in this case diode2_normalized which is diode2 divided by diode1

A screen capture of Silx view showing the HDF5 file where workflow results are saved and comparing it to the raw data HDF5 file

Plot results#

Plotting tools like h5web and silx allow for plotting scan data in NeXus compliant HDF5 files (see previous image).

A more advanced way of plotting workflow results is automatic displaying in Flint.

Note

To be expanded soon 🏗️

Save workflows#

Every triggered workflow can be saved in JSON. For this, we only need to provide the argument convert_destination to specify a filename in which the workflow will be saved through:

# oda_scans.py

import os
from blissoda.utils.directories import get_processed_dir

basename = os.path.splitext(os.path.basename(filename))[0]
workflow_filename = os.path.join(
    get_processed_dir(filename), "workflows", f"{basename}_scan{scan_nb}.json"
)

future = submit(
    args=(workflow,),
    kwargs={"inputs": parameters, "convert_destination": workflow_filename},
)

The resulting file is a JSON file that can be re-executed very easily with ewoks. This makes the saving useful for the following reasons:

  • Development of Ewoks tasks: you want to modify the Ewoks tasks and re-run a workflow until it produces the result you expect

  • Re-execute failed workflows: perhaps workflows failed because of some temporary issue

  • Re-run workflows with modified parameters: perhaps they were chosen wrongly during the experiment or you want to try out different values to obtain a better result

  • Data provenance: the workflow file allows anyone to reproduce the results from the raw data (e.g. could be part of a scientific publication)