Getting Started =============== This tutorial demonstrates how the execution of Ewoks workflows can be integrated in Bliss acquisitions. .. note:: The examples in this tutorial are strictly speaking **not** online data analysis because the workflows will be executed **after** the scan is finished, not while the scan is running. In this tutorial, you will implement a ``loopscan`` that triggers an Ewoks workflow with results being returned to Bliss or saved in HDF5. Trigger workflows ----------------- After every ``loopscan`` we want to perform calculations on the raw data and return the result. For this, we will create a custom scan command ``loopscan_with_calc``. To be accessible from Bliss, the command must be defined in the Bliss session script or in a user script. In this case, we will put it in a user script called ``oda_scans.py``. .. code-block:: python # oda_scans.py from ewoksjob.client import submit from bliss import setup_globals def loopscan_with_calc(*args, **kwargs): # Run the scan scan = setup_globals.loopscan(*args, **kwargs) # Specify the workflow to run workflow = {"graph": {"id": "nothing_yet"}} # Trigger the workflow future = submit(args=(workflow,)) # Wait for the result results = future.get() print("Results:", results) Then, instead of running the normal scan command .. code-block:: python # BLISS DEMO_SESSION [1]: loopscan(10, 0.1, diode1, diode2) Out [1]: Scan(number=4, name=loopscan, path=.../RAW_DATA/sample/sample_0001/sample_0001.h5) we run the new scan command after loading the script that defines it .. code-block:: python # BLISS DEMO_SESSION [2]: user_script_homedir("/path/to/scripts/directory") DEMO_SESSION [3]: user_script_load("oda_scans") DEMO_SESSION [4]: user.loopscan_with_calc(10, 0.1, diode1, diode2) Results: {} The printing of ``Results: {}`` means that the workflow was correctly launched. Now, we can start doing actual processing with this workflow. Workflow Parameters ------------------- Data processing needs parameters, at the very least a reference to the data that needs to be processed. Very often, this is the HDF5 URL of the scan data. We modify our ``loopscan_with_calc`` function in ``oda_scans`` to retrieve this HDF5 URL from the scan: .. code-block:: python # oda_scans.py # Run the scan scan = setup_globals.loopscan(*args) # URL of the scan data filename = scan.scan_info["filename"] scan_nb = scan.scan_info["scan_nb"] scan_url = f"{filename}::/{scan_nb}.1" .. warning:: The user script needs to be reloaded via ``user_script_load("oda_scans")`` after each modification. For the sake of the example, we will use a simple workflow that adds two strings and pass two parameters to it: - The scan URL we just retrieved - An additional parameter called ``prefix`` (in this case, a simple string) .. code-block:: python # oda_scans.py # Specify the workflow to run workflow = { "graph": {"id": "nothing_yet"}, "nodes": [ {"id": "node1", "task_type": "method", "task_identifier": "operator.add"} ], } # Workflow parameters prefix = "Process scan " parameters = [ {"id": "node1", "name": 0, "value": prefix}, {"id": "node1", "name": 1, "value": scan_url}, ] # Trigger the workflow future = submit(args=(workflow,), kwargs={"inputs": parameters}) The result of the workflow is now the concatenation of the prefix and the scan URL. We can run several scans to verify that it changes for each scan .. code-block:: python # BLISS DEMO_SESSION [5]: user.loopscan_with_calc(10, 0.1, diode1, diode2) Results: {'return_value': 'Process scan .../RAW_DATA/sample/sample_0001/sample_0001.h5::/7.1'} DEMO_SESSION [6]: user.loopscan_with_calc(10, 0.1, diode1, diode2) Results: {'return_value': 'Process scan .../RAW_DATA/sample/sample_0001/sample_0001.h5::/8.1'} Now that we have the scan URL, it is time to make real operations on the scan data. Data processing --------------- In this example, the processing will consist in the division of the two diode signals ``diode1`` and ``diode2``. For this, we need to write a specific Ewoks task that implements this operation. We will now create a task `NormalizeData` in a file `tasks.py` that **must be accesible to the Ewoks worker** (recall that it is the worker that is running the workflow). .. code-block:: python # tasks.py in the working directory of the Ewoks worker from ewokscore import Task from blissdata.h5api import dynamic_hdf5 class NormalizeData( Task, input_names=["scan_url", "i0name", "i1name"], output_names=["result"], ): def run(self): filename_in, _, scan = self.inputs.scan_url.rpartition("::") with dynamic_hdf5.File(filename_in) as nxroot_in: nxentry_in = nxroot_in[scan] nxentry_in["end_time"] # wait until scan is fully written I1 = nxentry_in[f"instrument/{self.inputs.i1name}/data"] I0 = nxentry_in[f"instrument/{self.inputs.i0name}/data"] I1norm = I1[()] / I0[()] self.outputs.result = I1norm .. note:: Since the HDF5 file content might still be changing when executing the workflow, we cannot simply read the dataset content (e.g. ``I1[()]``) because it would only return the data that has been written at the moment of reading. To ensure we read all dataset values, we read a dataset (here ``end_time``) that only appears at the end of the scan. Another solution is to iterate over the datasets instead slicing them as shown in the `BLISS documentation `_. Again, we update in `oda_scans` the definition of the workflow to use our new task (``tasks.NormalizeData``) and the workflow parameters to specify its inputs: .. code-block:: python # oda_scans.py # Specify the workflow to run workflow = { "graph": {"id": "normalize"}, "nodes": [ { "id": "node1", "task_type": "class", "task_identifier": "tasks.NormalizeData", } ], } # Workflow parameters parameters = [ {"id": "node1", "name": "scan_url", "value": scan_url}, {"id": "node1", "name": "i0name", "value": "diode1"}, {"id": "node1", "name": "i1name", "value": "diode2"}, ] The workflow can then be triggered as before by using our ``loopscan_with_calc`` command. .. code-block:: python # BLISS DEMO_SESSION [7]: user.loopscan_with_calc(10, 0.1, diode1, diode2) Results: {'i1norm': array([1.07374567, 0.65471348, 0.54708319, 1.53021916, 0.77015426, 0.8388511 , 0.98417602, 1.03111438, 1.61111265, 0.70460637])} We can check that the results indeed contain ``i1norm = diode2 / diode1``. Save results ------------ Often we want to save the processed data that is being generated in this way. The usual way is to save an HDF5 file in the workflow itself. For this, we will expand our previous task ``NormalizeData`` to add HDF5 saving of the results: .. code-block:: python # tasks.py import os from blissoda.utils.directories import get_processed_dir class NormalizeDataWithSaving( Task, input_names=["scan_url", "i0name", "i1name"], output_names=["result_url"], ): def run(self): filename_in, _, scan = self.inputs.scan_url.rpartition("::") with dynamic_hdf5.File(filename_in) as nxroot_in: nxentry_in = nxroot_in[scan] nxentry_in["end_time"] # wait until scan is fully written I1 = nxentry_in[f"instrument/{self.inputs.i1name}/data"] I0 = nxentry_in[f"instrument/{self.inputs.i0name}/data"] I1norm = I1[()] / I0[()] # Write data in the processed directory PROCESSED_DATA processed_dir = get_processed_dir(filename_in) os.makedirs(processed_dir, exist_ok=True) basename = os.path.splitext(os.path.basename(filename_in))[0] filename_out = os.path.join(processed_dir, f"{basename}.h5") self.outputs.result_url = save_result( filename_in, filename_out, nxentry_in, scan, self.inputs.i1name, I1norm ) The ``save_result`` function saves results in a `NeXus-compliant `_ HDF5 file with links to the raw scan data. The implementation below can be reproduced as-is. .. dropdown:: Implementation of ``save_result`` .. code-block:: python # tasks.py import os import json import numpy import h5py from silx.io import h5py_utils from blissoda.utils.directories import get_processed_dir def save_result( filename_in: str, filename_out: str, nxentry_in: h5py.Group, scan: str, signal_name: str, signal_values: numpy.ndarray, ) -> str: with h5py_utils.open_item(filename_out, "/", mode="a") as nxroot_out: result_url = f"{filename_out}::{scan}" if scan in nxroot_out: print(f"Already processed : {result_url}") return result_url # Prepare NeXus structure for saving scan processing results nxroot_out.attrs["NX_class"] = "NXroot" nxentry_out = nxroot_out.create_group(scan) nxroot_out.attrs["default"] = scan nxentry_out.attrs["NX_class"] = "NXentry" # Save processing results normalized_dataset = save_normalized_results(nxentry_out, signal_values, dict()) measurement = nxentry_out.create_group("measurement") measurement.attrs["NX_class"] = "NXcollection" measurement[f"{signal_name}_normalized"] = h5py.SoftLink( normalized_dataset.name ) # Links to raw data save_raw_data_links(filename_in, nxentry_in, nxentry_out) return result_url def save_normalized_results( nxentry_out: h5py.Group, normalized: numpy.ndarray, parameters: dict ) -> h5py.Dataset: nxprocess = nxentry_out.create_group("normalize") nxentry_out.attrs["default"] = "normalize" nxprocess.attrs["NX_class"] = "NXprocess" nxprocess["program"] = "ID26" nxdata = nxprocess.create_group("results") nxprocess.attrs["default"] = "results" nxdata.attrs["NX_class"] = "NXdata" nxdata.attrs["signal"] = "normalized" nxdata["normalized"] = normalized nxnote = nxprocess.create_group("config") nxnote.attrs["NX_class"] = "NXnote" nxnote["data"] = json.dumps(parameters) nxnote["type"] = "application/json" return nxdata["normalized"] def save_raw_data_links( filename_in: str, nxentry_in: h5py.Group, nxentry_out: h5py.Group ) -> None: nxentry_out["instrument"] = h5py.ExternalLink( filename_in, nxentry_in["instrument"].name ) nxentry_out["sample"] = h5py.ExternalLink(filename_in, nxentry_in["sample"].name) measurement_in = nxentry_in["measurement"] measurement_out = nxentry_out["measurement"] for name in measurement_in: measurement_out[name] = h5py.ExternalLink( filename_in, measurement_in[name].name ) The workflow in ``oda_scans.py`` then needs to be modified once again to use the ``NormalizeDataWithSaving`` task: .. code-block:: python # oda_scans.py # Specify the workflow to run workflow = { "graph": {"id": "normalize_with_saving"}, "nodes": [ { "id": "node1", "task_type": "class", "task_identifier": "tasks.NormalizeDataWithSaving", } ], } By running the workflow through our custom command, we can see that the result of this workflow is now the URL where the results are saved .. code-block:: python # BLISS DEMO_SESSION [3]: user.loopscan_with_calc(10, 0.1, diode1, diode2) Results: {'result_url': '.../PROCESSED_DATA/sample_0001.h5::/1.1'} In addition to saving the results we also create links to the raw data HDF5 file. As a result you have a file that looks exactly like the raw HDF5 file, with additional calculated channels, in this case ``diode2_normalized`` which is ``diode2`` divided by ``diode1`` .. image:: ../img/getting_started.png :alt: A screen capture of Silx view showing the HDF5 file where workflow results are saved and comparing it to the raw data HDF5 file Plot results ------------ Plotting tools like `h5web `_ and `silx `_ allow for plotting scan data in NeXus compliant HDF5 files (see previous image). A more advanced way of plotting workflow results is automatic displaying in Flint. .. note:: To be expanded soon 🏗️ Save workflows -------------- Every triggered workflow can be saved in JSON. For this, we only need to provide the argument ``convert_destination`` to specify a filename in which the workflow will be saved through: .. code-block:: python # oda_scans.py import os from blissoda.utils.directories import get_processed_dir basename = os.path.splitext(os.path.basename(filename))[0] workflow_filename = os.path.join( get_processed_dir(filename), "workflows", f"{basename}_scan{scan_nb}.json" ) future = submit( args=(workflow,), kwargs={"inputs": parameters, "convert_destination": workflow_filename}, ) The resulting file is a JSON file that can be re-executed very easily with `ewoks`. This makes the saving useful for the following reasons: * **Development of Ewoks tasks**: you want to modify the Ewoks tasks and re-run a workflow until it produces the result you expect * **Re-execute failed workflows**: perhaps workflows failed because of some temporary issue * **Re-run workflows with modified parameters**: perhaps they were chosen wrongly during the experiment or you want to try out different values to obtain a better result * **Data provenance**: the workflow file allows anyone to reproduce the results from the raw data (e.g. could be part of a scientific publication)