Source code for blissoda.demo.tests.itest_submit

"""Send a workflow to the ewoks Celery application and recieve the
intermediate results (ewoks events) or final result (job return value)
"""

import os
import tempfile
from typing import List

from ewoksjob.events.readers import instantiate_reader

from ...ewoks_utils import submit
from .. import EWOKS_EVENTS_DIR
from .. import testing


[docs] @testing.integration_test def test_submit_with_events_in_redis(): events_url = "redis://localhost:10002/5" handlers = [ { "class": "ewoksjob.events.handlers.RedisEwoksEventHandler", "arguments": [{"name": "url", "value": events_url}], } ] _test_submit_with_events(events_url, handlers)
[docs] @testing.integration_test def test_submit_with_events_in_sqlite(): events_url = f"file://{EWOKS_EVENTS_DIR}/ewoks_event.db" handlers = [ { "class": "ewoksjob.events.handlers.Sqlite3EwoksEventHandler", "arguments": [{"name": "uri", "value": events_url}], } ] _test_submit_with_events(events_url, handlers)
def _test_submit_with_events(events_url: str, handlers: List[dict]): # Test workflow to execute workflow = { "graph": {"id": "mygraph"}, "nodes": [ {"id": "task1", "task_type": "method", "task_identifier": "numpy.add"}, {"id": "task2", "task_type": "method", "task_identifier": "numpy.add"}, ], "links": [ { "source": "task1", "target": "task2", "data_mapping": [{"source_output": "return_value", "target_input": 0}], } ], } # Job arguments out_dir = tempfile.mkdtemp(dir=os.environ["DEMO_TMP_ROOT"], prefix="test_job_") varinfo = {"root_uri": out_dir, "scheme": "nexus"} inputs = [ {"id": "task1", "name": 0, "value": 1}, {"id": "task1", "name": 1, "value": 2}, {"id": "task2", "name": 1, "value": 3}, ] execinfo = {"handlers": handlers} args = (workflow,) kwargs = { "engine": None, "execinfo": execinfo, "inputs": inputs, "varinfo": varinfo, "outputs": [{"all": False}], } # Trigger workflow future = submit(args=args, kwargs=kwargs) job_id = future.uuid # events could be received in the mean time (see below) workflow_results = future.result(timeout=20, interval=0.1) assert workflow_results == {"return_value": 6} reader = instantiate_reader(events_url) # Get intermediate results from ewoks events results_during_execution = list(reader.get_events(job_id=job_id)) assert len(results_during_execution) == 8 # start/stop for job, workflow and node # Get start event of node "task1" result_event = list(reader.get_events(job_id=job_id, node_id="task1", type="start")) assert len(result_event) == 1 result_event = result_event[0] # Verify output URI's if varinfo.get("root_uri"): results = result_event["output_uris"] assert len(results) == 1 output_uri = results[0]["value"] assert out_dir in output_uri testing.assert_equal_hdf5_content(output_uri, {"value": 3}) # Wait for event to contain small data in-place instead of URI's. # Or the Variable should move from ewokscore to ewoksutils. # # # Get start event of node "task1" # result_event = list( # reader.get_events_with_variables( # job_id=job_id, node_id="task1", type="start" # ) # ) # # assert len(result_event) == 1 # result_event = result_event[0] # # # Get access to all output variables of "task1" # if varinfo.get("root_uri"): # results = result_event["outputs"] # assert results["return_value"].value == 3