BEEflow API

BEEflow

BEEflow Core

Builder (Container)

Build Interfaces

Mid-level interface for managing a build system from WFM.

The WFM may request a Runtime Environment (RTE) that must be built. This RTE build should be considered a separate stage in the workflow. The build_interface will access components of the build_driver and components of the gdb_interface as required.

beeflow.common.build_interfaces.build_main(task)

Process build instructions - main code.

Build Driver

Abstract base class for the handling build systems.

class beeflow.common.build.build_driver.BuildDriver(task, kwargs)

Driver interface between WFM and a generic build system.

A driver object must implement an __init__ method that requests a Runtime Environment (RTE), and a method to return the requested RTE.

abstract process_container_name()

Get and process BEE CWL extension for containerName dockerRequirement.

This is a BEE extension to CWL to refer to containers with human-readable name.

The CWL spec currently uses dockerImageId to refer to the name of a container but this is explicitly not how Docker defines it. We need a way to name containers in a human readable format.

abstract process_copy_container(force)

Get and process the BEE CWL extension copyContainer dockerRequirement.

This CWL extension will copy an existing container to the build archive.

If you have a container tarball, and all you need to do is stage it, that is, all you need to do is copy it to a location that BEE knows, use this to put the container into the build archive.

abstract process_docker_file(task_dockerfile, force)

Get and process the CWL compliant dockerFile dockerRequirement.

CWL spec 09-23-2020: Supply the contents of a Dockerfile which will be built using docker build.

abstract process_docker_image_id(param_imageid)

Get and process the CWL compliant dockerImageId dockerRequirement.

CWL spec 09-23-2020: The image id that will be used for docker run. May be a human-readable image name or the image identifier hash. May be skipped if dockerPull is specified, in which case the dockerPull image id must be used.

abstract process_docker_import(param_import)

Get and process the CWL compliant dockerImport dockerRequirement.

CWL spec 09-23-2020: Provide HTTP URL to download and gunzip a Docker images using docker import.

abstract process_docker_load()

Get and process the CWL compliant dockerLoad dockerRequirement.

CWL spec 09-23-2020: Specify a HTTP URL from which to download a Docker image using docker load.

abstract process_docker_output_directory(param_output_directory)

Get and process the CWL compliant dockerOutputDirectory dockerRequirement.

CWL spec 09-23-2020: Set the designated output directory to a specific location inside the Docker container.

abstract process_docker_pull(addr, force)

Get and process the CWL compliant dockerPull dockerRequirement.

CWL spec 09-23-2020: Specify a Docker image to retrieve using docker pull. Can contain the immutable digest to ensure an exact container is used.

resolve_priority()

Given multiple DockerRequirements, set order of execution.

The CWL spec as of 04-15-2021 does not specify order of execution, but the cwltool gives some guidance by example. We mimic cwltool in how we resolve priority, favoring fast, cached container specs over slower specs. For example, if both a docker pull and a docker file are supported, the build interface will try to pull first, and only on pull failure will the builder build the docker file.

Build Container Drivers

Container build driver.

All container-based build systems belong here.

class beeflow.common.build.container_drivers.CharliecloudBuildDriver(task)

Driver interface between WFM and a container build system.

A driver object must implement an __init__ method that requests a RTE, and a method to return the requested RTE.

get_docker_req(docker_req_param)

Get dockerRequirement, prioritizing requirements over hints.

Parameters:

docker_req_param (str) – the dockerRequirement parameter (e.g. ‘dockerFile’)

When requirements are specified hints will be ignored. By default, tasks need not specify hints or requirements

process_container_name()

Get and process BEE CWL extension for containerName dockerRequirement.

This is a BEE extension to CWL to refer to containers with human-readable name.

The CWL spec currently uses dockerImageId to refer to the name of a container but this is explicitly not how Docker defines it. We need a way to name containers in a human readable format.

process_copy_container(force=False)

Get and process the BEE CWL extension copyContainer dockerRequirement.

This CWL extension will copy an existing container to the build archive.

If you have a container tarball, and all you need to do is stage it, that is, all you need to do is copy it to a location that BEE knows, use this to put the container into the build archive.

process_docker_file(task_dockerfile=None, force=False)

Get and process the CWL compliant dockerFile dockerRequirement.

CWL spec 09-23-2020: Supply the contents of a Dockerfile which will be built using docker build. We have discussed implementing CWL change to expect a file handle instead of file contents, and use the file handle expectation here.

process_docker_image_id(param_imageid=None)

Get and process the CWL compliant dockerImageId dockerRequirement.

A divergence from the CWL spec. Docker image Id is defined by docker as a checksum on a container, not a human-readable name. The Docker image ID must be produced after the container is built, and can not be used to tag the container for that reason. The param_imageid may be used to override DockerRequirement specs.

process_docker_import(param_import=None)

Get and process the CWL compliant dockerImport dockerRequirement.

CWL spec 09-23-2020: Provide HTTP URL to download and gunzip a Docker images using docker import. The param_import may be used to override DockerReuirement specs.

process_docker_load()

Get and process the CWL compliant dockerLoad dockerRequirment.

CWL spec 09-23-2020: Specify a HTTP URL from which to download a Docker image using docker load.

process_docker_output_directory(param_output_directory=None)

Get and process the CWL compliant dockerOutputDirectory dockerRequirement.

CWL spec 09-23-2020: Set the designated output directory to a specific location inside the Docker container. The param_output_directory may be used to override DockerRequirement specs.

process_docker_pull(addr=None, force=False)

Get the CWL compliant dockerPull dockerRequirement.

CWL spec 09-23-2020: Specify a Docker image to retrieve using docker pull. Can contain the immutable digest to ensure an exact container is used.

class beeflow.common.build.container_drivers.ContainerBuildDriver(task, kwargs)

Driver interface between WFM and a container build system.

A driver object must implement an __init__ method that requests a RTE, and a method to return the requested RTE.

class beeflow.common.build.container_drivers.SingularityBuildDriver(task, userconf_file=None)

Driver interface between WFM and a container build system.

A driver object must implement an __init__ method that requests a RTE, and a method to return the requested RTE.

process_docker_file(task_dockerfile=None, force=False)

Get and process the CWL compliant dockerFile dockerRequirement.

CWL spec 09-23-2020: Supply the contents of a Dockerfile which will be built using docker build.

process_docker_image_id(param_imageid=None)

Get and process the CWL compliant dockerImageId dockerRequirement.

CWL spec 09-23-2020: The image id that will be used for docker run. May be a human-readable image name or the image identifier hash. May be skipped if dockerPull is specified, in which case the dockerPull image id must be used. The param_imageid may be used to override DockerRequirement specs.

process_docker_import(param_import=None)

Get and process the CWL compliant dockerImport dockerRequirement.

CWL spec 09-23-2020: Provide HTTP URL to download and gunzip a Docker images using docker import. The param_import may be used to override DockerRequirement specs.

process_docker_load()

Get and process the CWL compliant dockerLoad dockerRequirement.

CWL spec 09-23-2020: Specify a HTTP URL from which to download a Docker image using docker load.

process_docker_output_directory(param_output_directory=None)

Get and process the CWL compliant dockerOutputDirectory.

CWL spec 09-23-2020: Set the designated output directory to a specific location inside the Docker container. The param_output_directory may be used to override DockerRequirement specs.

process_docker_pull(addr=None, force=False)

Get and process the CWL compliant dockerPull dockerRequirement.

CWL spec 09-23-2020: Specify a Docker image to retrieve using docker pull. Can contain the immutable digest to ensure an exact container is used.

Database

Task Manager Database

Task Manager database code.

class beeflow.common.db.tm_db.JobQueue(db_file)

Task Manager job queue.

clear()

Clear the job queue.

count()

Count the number of items in the job queue.

pop()

Pop the bottom element off the queue.

push(task, job_id, job_state)

Push the job info onto the queue.

remove_by_id(id_)

Remove a job from the queue by ID.

update_job_state(id_, job_state)

Update the job_state.

class beeflow.common.db.tm_db.SubmitQueue(db_file)

Task Manager submit queue.

clear()

Clear the submit queue.

count()

Count the number of items in the submit queue.

pop()

Pop the bottom element off the queue.

push(task)

Push the task onto the submit queue.

class beeflow.common.db.tm_db.TMDB(db_file)

Task Manager Database.

property job_queue

Return a JobQueue object.

property submit_queue

Return a SubmitQueue object.

property update_queue

Return an UpdateQueue object.

class beeflow.common.db.tm_db.UpdateQueue(db_file)

Task Manager update queue.

clear()

Clear the update queue.

push(wf_id, task_id, job_state, task_info=None, metadata=None, output=None)

Push an update onto the update queue.

updates()

Get a list of all updates from the update queue.

beeflow.common.db.tm_db.open_db(db_file)

Open and return a new database.

Scheduler Database

Scheduler database code.

class beeflow.common.db.sched_db.Resources(db_file)

Resources wrapper class.

clear()

Remove all resources.

extend(resources)

Add a list of new resources.

class beeflow.common.db.sched_db.SchedDB(db_file)

Scheduler database.

property resources

Get resources from the database.

beeflow.common.db.sched_db.open_db(db_file)

Open and return a new database.

Configuration

Configuration Driver

BEE configuration driver module.

class beeflow.common.config_driver.BeeConfig(**kwargs)

Class to manage and store all BEE configuration.

All configuration values can be retrieved by using the get(section, key) class method. If those particular values are not in the config file, then defaults will be set, or a validation error raised, based on the validation code within the module.

When new configuration keys are needed, they should be added to the validation code, along with information about the allowed input types, and general text for the user. If a key isn’t added, but other code tries to access it, then an error will be raised letting the programmer know that the key needs to be added to the validation. This will hopefully help manage complexity and act as documentation as more keys are added.

Configuration file locations by supported platform:

Linux:

sysconfig_file = ‘/etc/beeflow/bee.conf’

userconfig_file = ‘~/.config/beeflow/bee.conf’

MacOS:

sysconfig_file = ‘/Library/Application Support/beeflow/bee.conf’

userconfig_file = ‘~/Library/Application Support/beeflow/bee.conf’

Windows:

sysconfig_file = NOT SUPPORTED. Should be windows registry.

userconfig_file = ‘%APPDATA%\beeflow\bee.conf’

classmethod get(sec_name, opt_name)

Get a configuration value.

If this throws, then the configuration value is missing from the definition. Initialize the config if not already initialized. Default values are built into the ConfigValidator class, so there is no need to specify a default or a fallback here.

classmethod init(userconfig=None, **_kwargs)

Initialize BeeConfig class.

We check the platform and read in system and user configuration files. Note that this only needs to be called if one needs to initialize the config from a different file or with different keyword arguments. If so, then this must be called before any calls to bc.get() are made, since that call will initialize the config with default settings.

static resolve_path(relative_path)

Resolve relative paths to absolute paths.

Parameters:

relative_path (string, path to file) – Input path. May include “../”

classmethod userconfig_path()

Get the path of the user config.

class beeflow.common.config_driver.ConfigGenerator(fname, validator)

Config generator class.

choose_values()

Choose configuration values based on user input.

save()

Save the config to a file.

beeflow.common.config_driver.bee_workdir_init(path, _cur_opts)

BEE workdir init function.

Parameters:
  • path – chosen path for the bee workdir

  • _cur_opts – current chosen options form the config generator

Returns:

initialized bee workdir path

beeflow.common.config_driver.check_choice(msg, opts)

Ask the user to pick from opts.

beeflow.common.config_driver.check_yes(msg)

Check for a y/n answer.

beeflow.common.config_driver.filepath_completion_input(*pargs, **kwargs)

Input files/paths with tab completion.

beeflow.common.config_driver.info()

Display some info about bee.conf’s various options.

beeflow.common.config_driver.join_path(*pargs)

Join multiple dirnames together to form a path.

beeflow.common.config_driver.new(path: str = <typer.models.ArgumentInfo object>)

Create a new config file.

beeflow.common.config_driver.print_wrap(text, next_line_indent='')

Print while wrapping lines to make the output easier to read.

beeflow.common.config_driver.show(path: str = <typer.models.ArgumentInfo object>)

Show the contents of bee.conf.

beeflow.common.config_driver.validate(path: str = <typer.models.ArgumentInfo object>)

Validate an existing configuration file.

beeflow.common.config_driver.validate_chrun_opts(opts)

Ensure that chrun_opts don’t contain options that’ll conflict with BEE.

Configuration Validator

Config validation code.

exception beeflow.common.config_validator.ConfigError

Configuration error class.

class beeflow.common.config_validator.ConfigOption(info, validator=<class 'str'>, choices=None, default=None, input_fn=None)

Config option/validation class.

validate(value)

Validate the value and return it.

class beeflow.common.config_validator.ConfigSection(info, depends_on=None)

Config section.

class beeflow.common.config_validator.ConfigValidator(description)

Config validation/schema class for managing configuration.

is_section_valid(cur_conf, sec_name)

Determine if given the current configuration, sec_name is valid.

sec_name is assumed to be the name of a valid section.

option(sec_name, opt_name, *args, **kwargs)

Define a configuration option.

options(sec_name)

Return all options for a given section.

section(sec_name, info, depends_on=None)

Define a configuration section.

property sections

Return all sections in order as list of tuples (sec_name, section).

validate(conf)

Validate a config and return a new config with defaults and values converted.

Connection Communication

Connection class for connecting to other components over a socket.

class beeflow.common.connection.Connection(socket, prefix=None, error_handler=None)

Connection for sending/receiving requests from a component.

delete(path, *pargs, **kwargs)

Do an HTTP DELETE request.

get(path, *pargs, **kwargs)

Do an HTTP GET request.

handle_error(resp)

Handle an error, if there is one.

patch(path, *pargs, **kwargs)

Do an HTTP PATCH request.

post(path, *pargs, **kwargs)

Do an HTTP POST request.

put(path, *pargs, **kwargs)

Do an HTTP PUT request.

Special connection for communicating with cli.py.

exception beeflow.common.cli_connection.BeeflowConnectionError

Connection error class.

class beeflow.common.cli_connection.ClientConnection(conn)

Connection to a client from the server end.

get()

Get and return the message.

put(msg)

Put a response message (closes the socket when done).

class beeflow.common.cli_connection.Server(s)

Socket server for use in cli.py.

accept()

Accept a new connection or return None.

beeflow.common.cli_connection.send(path, msg)

Send a single message to the server and get a response.

beeflow.common.cli_connection.server(path)

Create a new server connection.

Container Runtime

Container Runtime Driver

Abstract base class for crt_driver, the Container Runtime and drivers.

Builds text for job to run task in a Container

class beeflow.common.crt.crt_driver.Command(args, type_='default')

Command in a batch script.

class beeflow.common.crt.crt_driver.CommandType

Command types.

class beeflow.common.crt.crt_driver.ContainerRuntimeDriver

ContainerRuntimeDriver interface for generic container runtime.

abstract build_text(userconfig, task)

Create text for builder pre-run using the container runtime.

Parameters:

task – instance of Task

Return type:

string

abstract run_text(task)

Create commands for job using the container runtime.

Returns a tuple (pre-commands, main-command, post-commands). :param task: instance of Task :rtype: tuple of (list of list of str, list of str, list of list of str)

class beeflow.common.crt.crt_driver.ContainerRuntimeResult(env_code, pre_commands, main_command, post_commands)

Result to be used for returning to the worker code.

Charliecloud Driver

Charliecloud driver as the container runtime system for tasks.

Creates text for tasks using Charliecloud.

class beeflow.common.crt.charliecloud_driver.CharliecloudDriver

The ContainerRuntimeDriver for Charliecloud as container runtime system.

Creates the text for the task for using Charliecloud.

build_text(userconfig, task)

Build text for Charliecloud batch script.

static get_ccname(image_path)

Strip directories & .tar, .tar.gz, tar.xz, or .tgz from image path.

run_text(task)

Create text for Charliecloud batch script.

Singularity Driver

Singularity driver as the container runtime system for tasks.

Creates text for tasks using Singularity.

class beeflow.common.crt.singularity_driver.SingularityDriver

The ContainerRuntimeDriver for Singularity as container runtime system.

Creates the text for the task for using Singularity to test abstract class.

build_text(userconfig, task)

Build text for Singularity batch script.

run_text(task)

Build text for Singularity batch script.

Container Path

Path conversion code.

exception beeflow.common.container_path.PathError(*args)

Path error class.

beeflow.common.container_path.convert_path(path, bind_mounts)

Convert a path outside the container to a path inside the container.

Container Runtime Interface

Mid-level interface for container runtime system.

Delegates the writing of the text for job script to an instance of a subclass of the abstract base class ‘ContainerRuntimeDriver’. Default: ‘CharliecloudDriver’ class.

class beeflow.common.crt_interface.ContainerRuntimeInterface(crt_driver=<class 'beeflow.common.crt.charliecloud_driver.CharliecloudDriver'>)

Interface for the container runtime.

Requires an implemented subclass of ContainerRuntimeDriver to function.

build_text(userconfig, task)

Create text required to build a task environment.

Parameters:
  • task – instance of Task

  • userconfig – path to userconfig file

Return type:

string

run_text(task)

Create text required to run the task using the container_runtime.

Parameters:

task – instance of Task

Return type:

string

Graph Database

Evaluate CWL expressions

Code for CWL expressions.

beeflow.common.expr.eval_input(input_pairs, value_from)

Evaluate a simple input expression.

beeflow.common.expr.eval_output(input_pairs, glob)

Evaluate a simple output expression.

Graph Database Driver

Abstract base class for the handling of workflow DAGs.

class beeflow.common.gdb.gdb_driver.GraphDatabaseDriver

Driver interface for a generic graph database.

The driver must implement a __init__ method that creates/connects to the graph database and returns some kind of ‘connection’ interface object.

abstract close()

Close the connection to the graph database.

abstract execute_workflow()

Begin execution of the stored workflow.

Set the initial tasks’ states to ‘READY’.

abstract export_graphml()

Export a BEE workflow as a graphml.

abstract finalize_task(task)

Set task state to ‘COMPLETED’ and set inputs from source.

Parameters:

task (Task) – the task to finalize

abstract get_dependent_tasks(task)

Return the dependent tasks of a workflow task in the graph database.

Parameters:

task (Task) – the task whose dependents to retrieve

Return type:

list of Task

abstract get_ready_tasks()

Return tasks with state ‘READY’ from the graph database.

Return type:

list of Task

abstract get_task_by_id(task_id)

Return a reconstructed Task object from the graph database by its ID.

Parameters:

task_id (str) – a task’s ID

Return type:

Task

abstract get_task_input(task, input_id)

Get a task input object.

Parameters:
  • task (Task) – the task whose input to retrieve

  • input_id (str) – the ID of the input

Return type:

StepInput

abstract get_task_metadata(task)

Return the metadata of a task in the graph database.

Parameters:

task (Task) – the task whose metadata to retrieve

Return type:

dict

abstract get_task_output(task, output_id)

Get a task output object.

Parameters:
  • task (Task) – the task whose output to retrieve

  • output_id (str) – the ID of the output

Return type:

StepOutput

abstract get_task_state(task)

Return the state of a task in the graph database.

Parameters:

task (Task) – the task whose status to retrieve

Return type:

str

abstract get_workflow_description()

Return a reconstructed Workflow object from the graph database.

Return type:

Workflow

abstract get_workflow_inputs_and_outputs()

Return all workflow inputs and outputs from the graph database.

Returns a tuple of (inputs, outputs).

Return type:

(list of InputParameter, list of OutputParameter)

abstract get_workflow_requirements_and_hints()

Return all workflow requirements and hints from the graph database.

Must return a tuple with the format (requirements, hints)

Return type:

(list of Requirement, list of Hint)

abstract get_workflow_state()

Return the current state of the workflow.

Return type:

str

abstract get_workflow_tasks()

Return a list of all workflow tasks from the graph database.

Return type:

list of Task

abstract initialize_ready_tasks()

Set runnable tasks to state ‘READY’.

Runnable tasks are tasks with all input dependencies fulfilled.

abstract initialize_workflow(workflow)

Begin construction of a workflow in the graph database.

Should create the Workflow, Requirement, and Hint nodes in the graph database.

Parameters:

workflow (Workflow) – the workflow description

abstract load_task(task, task_state)

Load a task into a stored workflow.

Dependencies should be automatically deduced and generated by the graph database upon loading each task by matching workflow inputs with new task inputs, or task outputs with new task inputs.

Parameters:

task (Task) – a workflow task

abstract pause_workflow()

Pause execution of a running workflow.

Set workflow from state ‘RUNNING’ to ‘PAUSED’.

abstract reset_workflow(new_id)

Reset the execution state of a stored workflow.

Set all task states to ‘WAITING’. Change the workflow ID of the Workflow and Task nodes to new_id. Delete all task metadata except for task state.

Parameters:

new_id (str) – the new workflow ID

abstract restart_task(old_task, new_task)

Restart a failed task.

Create a Task node for new_task with state ‘RESTARTED’ and an edge to indicate that it is the child of the Task node of old_task.

Parameters:
  • old_task (Task) – the failed task

  • new_task (Task) – the new (restarted) task

abstract resume_workflow()

Resume execution of a paused workflow.

Set workflow state from ‘PAUSED’ to ‘RUNNING’.

abstract set_task_input(task, input_id, value)

Set the value of a task input.

Parameters:
  • task (Task) – the task whose input to set

  • input_id (str) – the ID of the input

  • value – str or int or float

abstract set_task_input_type(task, input_id, type_)

Set the type of a task input.

Parameters:
  • task (Task) – the task whose input type to set

  • input_id (str) – the ID of the input

  • type – the input type to set

  • type – str

abstract set_task_metadata(task, metadata)

Set the metadata of a task in the graph database.

Parameters:
  • task (Task) – the task whose metadata to set

  • metadata (dict) – the job description metadata

abstract set_task_output(task, output_id, value)

Set the value of a task output.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output

  • value (str or int or float) – the output value to set

abstract set_task_output_glob(task, output_id, glob)

Set the glob of a task output.

Parameters:
  • task (Task) – the task whose output glob to set

  • output_id (str) – the ID of the output

  • glob (str) – the output glob to set

abstract set_task_state(task, state)

Set the state of a task in the graph database.

Parameters:
  • task (Task) – the task whose state to set

  • state (str) – the new state

abstract set_workflow_state(state)

Set the state of the workflow.

Parameters:

state (str) – the new state of the workflow

abstract workflow_completed()

Determine if a workflow has completed.

A workflow has completed if each of its final tasks has state ‘COMPLETED’.

Return type:

bool

Graph Database Interface

Neo4j Cypher Module

Neo4j/Cypher transaction functions used by the Neo4jDriver class.

beeflow.common.gdb.neo4j_cypher.add_dependencies(tx, task, old_task=None, restarted_task=False)

Create dependencies between tasks.

Parameters:
  • task (Task) – the workflow task

  • old_task (Task) – the failed task, ignored if not used with restarted_task=True

  • restarted_task (bool) – restarted from failed task, only create dependencies for outputs

beeflow.common.gdb.neo4j_cypher.cleanup(tx)

Clean up all workflow data in the database.

beeflow.common.gdb.neo4j_cypher.copy_task_outputs(tx, task)

Use task outputs to set dependent task inputs or workflow outputs.

Sets dependent task inputs to default value if necessary.

Parameters:

task (Task) – the task whose outputs to set

beeflow.common.gdb.neo4j_cypher.create_bee_node(tx)

Create a BEE node in the Neo4j database.

This node connects to all workflows and allows them to exist in the same graph

beeflow.common.gdb.neo4j_cypher.create_task(tx, task)

Create a Task node in the Neo4j database.

Parameters:

task (Task) – the new task to create

beeflow.common.gdb.neo4j_cypher.create_task_hint_nodes(tx, task)

Create Hint nodes for a task.

Parameters:

task (Task) – the task whose hints to add to the graph

beeflow.common.gdb.neo4j_cypher.create_task_input_nodes(tx, task)

Create Input nodes for a task.

Parameters:

task (Task) – the task whose inputs to add to the graph

beeflow.common.gdb.neo4j_cypher.create_task_metadata_node(tx, task, task_state)

Create a task metadata node in the Neo4j database.

The node holds metadata about a task’s execution state.

Parameters:

task (Task) – the task for which to create a metadata node

beeflow.common.gdb.neo4j_cypher.create_task_output_nodes(tx, task)

Create Output nodes for a task.

Parameters:

task (Task) – the task whose outputs to add to the graph

beeflow.common.gdb.neo4j_cypher.create_task_requirement_nodes(tx, task)

Create Requirement nodes for a task.

Parameters:

task (Task) – the task whose requirements to add to the graph

beeflow.common.gdb.neo4j_cypher.create_workflow_hint_nodes(tx, workflow)

Create Hint nodes for the workflow.

Parameters:

workflow (Workflow) – the workflow whose hints to add to the graph

beeflow.common.gdb.neo4j_cypher.create_workflow_input_nodes(tx, workflow)

Create Input nodes for the workflow.

Parameters:

workflow (Workflow) – the workflow whose inputs to add to the graph

beeflow.common.gdb.neo4j_cypher.create_workflow_node(tx, workflow)

Create a Workflow node in the Neo4j database.

The workflow node is the entry point to the workflow.

Parameters:

workflow (Workflow) – the workflow

beeflow.common.gdb.neo4j_cypher.create_workflow_output_nodes(tx, workflow)

Create Output nodes for the workflow.

Parameters:

workflow (Workflow) – the workflow whose outputs to add to the graph

beeflow.common.gdb.neo4j_cypher.create_workflow_requirement_nodes(tx, workflow)

Create Requirement nodes for the workflow.

Parameters:

workflow (Workflow) – the workflow whose requirements to add to the graph

beeflow.common.gdb.neo4j_cypher.export_graphml(tx, wf_id)

Export BEE workflow as graphml.

beeflow.common.gdb.neo4j_cypher.final_tasks_completed(tx, wf_id)

Return true if each of a workflow’s final Task nodes has state ‘COMPLETED’.

Parameters:

wf_id (str) – the workflow’s id

Return type:

bool

beeflow.common.gdb.neo4j_cypher.get_dependent_tasks(tx, task)

Get the tasks that depend on a specified task.

Parameters:

task (Task) – the task whose dependencies to obtain

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_ready_tasks(tx, wf_id)

Get all tasks that are ready to execute.

Parameters:

workflow_id (str) – the workflow id

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_by_id(tx, task_id)

Get a workflow task from the Neo4j database by its ID.

Parameters:

task_id (str) – the task’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_hints(tx, task_id)

Get task hints from the Neo4j database by the task’s ID.

Parameters:

task_id (str) – the task’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_input(tx, task, input_id)

Get a task input object.

Parameters:
  • task (Task) – the task whose input to retrieve

  • input_id (str) – the ID of the input

Return type:

StepInput

beeflow.common.gdb.neo4j_cypher.get_task_inputs(tx, task_id)

Get task inputs from the Neo4j database by the task’s ID.

Parameters:

task_id (str) – the task’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_metadata(tx, task)

Get a task’s metadata.

Parameters:

task (Task) – the task whose metadata to get

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_output(tx, task, output_id)

Get a task output object.

Parameters:
  • task (Task) – the task whose output to retrieve

  • output_id (str) – the ID of the output

Return type:

StepOutput

beeflow.common.gdb.neo4j_cypher.get_task_outputs(tx, task_id)

Get task outputs from the Neo4j database by the task’s ID.

Parameters:

task_id (str) – the task’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_requirements(tx, task_id)

Get task requirements from the Neo4j database by the task’s ID.

Parameters:

task_id (str) – the task’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_task_state(tx, task)

Get the state of a task.

Parameters:

task (Task) – the task whose state to get

Return type:

str

beeflow.common.gdb.neo4j_cypher.get_workflow_by_id(tx, wf_id)

Get the workflow from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_workflow_hints(tx, wf_id)

Get workflow hints from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_workflow_inputs(tx, wf_id)

Get workflow inputs from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_workflow_outputs(tx, wf_id)

Get workflow outputs from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_workflow_requirements(tx, wf_id)

Get workflow requirements from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.get_workflow_state(tx, wf_id)

Get workflow state from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

str

beeflow.common.gdb.neo4j_cypher.get_workflow_tasks(tx, wf_id)

Get workflow tasks from the Neo4j database.

Parameters:

wf_id (str) – the workflow’s ID

Return type:

neo4j.Result

beeflow.common.gdb.neo4j_cypher.is_empty(tx)

Return true if the database is empty, else false.

Return type:

bool

beeflow.common.gdb.neo4j_cypher.reset_tasks_metadata(tx, wf_id)

Reset the metadata for each of a workflow’s tasks.

Parameters:

wf_id (str) – the workflow’s ID

beeflow.common.gdb.neo4j_cypher.reset_workflow_id(tx, old_id, new_id)

Reset the workflow ID of the workflow using uuid4.

Parameters:
  • old_id (str) – the old workflow ID

  • new_id (str) – the new workflow ID

beeflow.common.gdb.neo4j_cypher.set_init_task_inputs(tx, wf_id)

Set the initial workflow tasks’ inputs from workfow inputs or defaults if necessary.

Parameters:

wf_id (str) – the workflow id

beeflow.common.gdb.neo4j_cypher.set_paused_tasks_to_running(tx)

Set ‘PAUSED’ task states to ‘RUNNING’.

beeflow.common.gdb.neo4j_cypher.set_runnable_tasks_to_ready(tx, wf_id)

Set task states to ‘READY’ if all required inputs have values.

beeflow.common.gdb.neo4j_cypher.set_running_tasks_to_paused(tx)

Set ‘RUNNING’ task states to ‘PAUSED’.

beeflow.common.gdb.neo4j_cypher.set_task_input(tx, task, input_id, value)

Set the value of a task input.

Parameters:
  • task (Task) – the task whose input to set

  • input_id (str) – the ID of the input

  • value – str or int or float

beeflow.common.gdb.neo4j_cypher.set_task_input_type(tx, task, input_id, type_)

Set the type of a task input.

Parameters:
  • task (Task) – the task whose input type to set

  • input_id (str) – the ID of the input

  • type – the input type to set

  • type – str

beeflow.common.gdb.neo4j_cypher.set_task_metadata(tx, task, metadata)

Set a task’s metadata.

Parameters:
  • task (Task) – the task whose metadata to set

  • metadata (dict) – the task metadata

beeflow.common.gdb.neo4j_cypher.set_task_output(tx, task, output_id, value)

Set a task’s output value.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output to set

  • value (str) – the value of the output

beeflow.common.gdb.neo4j_cypher.set_task_output_glob(tx, task, output_id, glob)

Set a task’s output value.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output to set

  • glob (str) – the glob of the output

beeflow.common.gdb.neo4j_cypher.set_task_state(tx, task, state)

Set a task’s state.

Parameters:
  • task (Task) – the task whose state to set

  • state (str) – the new task state

beeflow.common.gdb.neo4j_cypher.set_workflow_state(tx, state, wf_id)

Get workflow state from the Neo4j database.

Parameters:
  • state (str) – the state the workflow will be set to

  • wf_id (str) – the workflow’s ID

Neo4j Driver

Neo4j interface module.

Connection requires a valid URI, Username, and Password. The current defaults are defined below, but should later be either standardized or read from a config file.

class beeflow.common.gdb.neo4j_driver.Neo4jDriver

The driver for a Neo4j Database.

Implements GraphDatabaseDriver. Wraps the neo4j package proprietary driver. This class is a SINGLETON and will always return the same instance of Neo4jDriver

close()

Close the connection to the Neo4j database.

connect(user='neo4j', password='password', **kwargs)

Connect driver to the neo4j database.

Parameters:
  • uri (str) – the URI of the Neo4j database

  • user (str) – the username for the database user account

  • password (str) – the password for the database user account

create_bee_node()

Create the “BEE” node for all workflows to connect to.

execute_workflow(workflow_id)

Begin execution of a workflow stored in the Neo4j database.

Parameters:

workflow_id (str) – the workflow id

export_graphml(workflow_id)

Export a BEE workflow as a graphml.

finalize_task(task)

Set task state to ‘COMPLETED’ and set inputs from source.

Parameters:

task (Task) – the task to finalize

get_dependent_tasks(task)

Return the dependent tasks of a specified workflow task.

Parameters:

task (Task) – the task whose dependents to retrieve

Return type:

list of Task

get_ready_tasks(workflow_id)

Return tasks with state ‘READY’ from the graph database from a particular workflow.

Parameters:

workflow_id (str) – the workflow id

Return type:

list of Task

get_task_by_id(task_id)

Return a reconstructed task from the Neo4j database.

Parameters:

task_id (str) – a task’s ID

Return type:

Task

get_task_input(task, input_id)

Get a task input object.

Parameters:
  • task (Task) – the task whose input to retrieve

  • input_id (str) – the ID of the input

Return type:

StepInput

get_task_metadata(task)

Return the metadata of a task in the Neo4j workflow.

Parameters:

task (Task) – the task whose metadata to retrieve

Return type:

dict

get_task_output(task, output_id)

Get a task output object.

Parameters:
  • task (Task) – the task whose output to retrieve

  • output_id (str) – the ID of the output

Return type:

StepOutput

get_task_state(task)

Return the state of a task in the Neo4j workflow.

Parameters:

task (Task) – the task whose state to retrieve

Return type:

str

get_workflow_description(workflow_id)

Return a reconstructed Workflow object from the Neo4j database.

Parameters:

workflow_id (str) – the workflow id

Return type:

Workflow

get_workflow_inputs_and_outputs(workflow_id)

Return all workflow inputs and outputs for a workflow from the Neo4j database.

Returns a tuple of (inputs, outputs).

Parameters:

workflow_id (str) – the workflow id

Return type:

(list of InputParameter, list of OutputParameter)

get_workflow_requirements_and_hints(workflow_id)

Return all workflow requirements and hints from the Neo4j database.

Returns a tuple of (requirements, hints).

Parameters:

workflow_id (str) – the workflow id

Return type:

(list of Requirement, list of Hint)

get_workflow_state(workflow_id)

Return the current workflow state from the Neo4j database.

Parameters:

workflow_id (str) – the workflow id

Return type:

str

get_workflow_tasks(workflow_id)

Return all workflow task records for a workflow from the Neo4j database.

Parameters:

workflow_id (str) – the workflow id

Return type:

list of Task

initialize_ready_tasks(workflow_id)

Set runnable tasks to state ‘READY’.

Runnable tasks are tasks with all input dependencies fulfilled.

Parameters:

workflow_id (str) – the workflow id

initialize_workflow(workflow)

Begin construction of a workflow stored in Neo4j.

Creates the Workflow, Requirement, and Hint nodes in the Neo4j database.

Parameters:

workflow (Workflow) – the workflow description

load_task(task, task_state)

Load a task into a workflow stored in the Neo4j database.

Dependencies are automatically deduced and generated by Neo4j upon loading each task by matching task inputs and outputs.

Task hint nodes and metadata nodes are created for querying convenience.

Parameters:

task (Task) – a workflow task

pause_workflow(workflow_id)

Pause execution of a running workflow in Neo4j.

Sets tasks with state ‘RUNNING’ to ‘PAUSED’.

Parameters:

workflow_id (str) – the workflow id

reset_workflow(old_id, new_id)

Reset the execution state of an entire workflow.

Sets all task states to ‘WAITING’. Changes the workflow ID of the Workflow and Task nodes with new_id.

Parameters:

new_id (str) – the new workflow ID

restart_task(old_task, new_task)

Restart a failed task.

Create a Task node for new_task with ‘RESTARTED_FROM’ relationship to the Task node of old_task.

Parameters:
  • old_task (Task) – the failed task

  • new_task (Task) – the new (restarted) task

resume_workflow(workflow_id)

Resume execution of a paused workflow in Neo4j.

Sets workflow state to ‘RUNNING’

Parameters:

workflow_id (str) – the workflow id

set_task_input(task, input_id, value)

Set the value of a task input.

Parameters:
  • task (Task) – the task whose input to set

  • input_id (str) – the ID of the input

  • value – str or int or float

set_task_input_type(task, input_id, type_)

Set the type of a task input.

Parameters:
  • task (Task) – the task whose input type to set

  • input_id (str) – the ID of the input

  • type – the input type to set

  • type – str

set_task_metadata(task, metadata)

Set the metadata of a task in the Neo4j workflow.

Parameters:
  • task (Task) – the task whose metadata to set

  • metadata (dict) – the job description metadata

set_task_output(task, output_id, value)

Set the value of a task output.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output

  • value (str or int or float) – the output value to set

set_task_output_glob(task, output_id, glob)

Set the glob of a task output.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output

  • glob (str) – the output glob to set

set_task_state(task, state)

Set the state of a task in the Neo4j workflow.

Parameters:
  • task (Task) – the task whose state to change

  • state (str) – the new state

set_workflow_state(workflow_id, state)

Set the state of the workflow in the Neo4j database.

Parameters:
  • workflow_id (str) – the workflow id

  • state (str) – the new state of the workflow

workflow_completed(workflow_id)

Determine if a workflow in the Neo4j database has completed.

A workflow has completed if each of its final task nodes have state ‘COMPLETED’. :param workflow_id: the workflow id :type workflow_id: str :rtype: bool

exception beeflow.common.gdb.neo4j_driver.Neo4jNotRunning

Exception thrown when connection attempted while Neo4j is not running.

Logging

Logging interface for BEE.

beeflow.common.log.setup(name)

Set up and return logger.

Parameters:

name – Name to be used for logger (best would be __name__)

Parser (CWL)

CWL workflow description parser.

Parses the contents of a CWL file and generates the graph representation of parsed workflows and tools in the graph database. This parser supports a subset of the CWL v1.0 standard. It is inspired by the CWL parser written by the CWL parser written by the SABER team at John Hopkins University (see SABER project at https://github.com/aplbrain/saber).

exception beeflow.common.parser.parser.CwlParseError(*args)

Parser error class.

class beeflow.common.parser.parser.CwlParser

Class for parsing CWL files.

parse_job(job)

Parse a CWL input job file.

Input parameters are stored in the params attribute.

Parameters:

job (str) – the path of the input job file (YAML or JSON)

parse_requirements(requirements, as_hints=False)

Parse CWL hints/requirements.

Parameters:
  • requirements (list of ordereddict or any cwl_utils Requirement class) – the CWL requirements

  • as_hints (bool) – parse as hints instead of requirements

Return type:

list of Hint or list of Requirement or None

parse_step(step, workflow_id)

Parse a CWL step object.

Calling this to parse a CommandLineTool file without a corresponding Workflow file will fail.

Parameters:
  • step (WorkflowStep) – the CWL step object

  • workflow_id (str) – the workflow ID

Return type:

Task

static parse_step_inputs(cwl_in, step_inputs)

Parse step inputs from CWL step input objects.

Parameters:
  • cwl_in (list of WorkflowStepInput) – the step inputs from the Workflow file

  • step_inputs (list of CommandInputParameter) – the step inputs from the CommandLineTool file

Return type:

list of StepInput

static parse_step_outputs(cwl_out, step_outputs, stdout, stderr)

Parse step outputs from CWL step output objects.

Parameters:
  • cwl_out (list of str) – the step outputs from the Workflow file

  • step_outputs (list of CommandOutputParameter) – the step outputs from the CommandLineTool file

  • stdout (str or None) – name of file to which stdout should be redirected

  • stderr (str or None) – name of file to which stderr should be redirected

Return type:

list of StepOutput

parse_workflow(workflow_id, cwl_path, job=None)

Parse a CWL Workflow file and load it into the graph database.

Returns an instance of the WorkflowInterface.

Parameters:
  • workflow_id (str) – the workflow ID

  • cwl_path (str) – the CWL file path

  • job (str) – the input job file (YAML or JSON)

Return type:

WorkflowInterface

beeflow.common.parser.parser.main()

Run the parser on a CWL Workflow and job file directly.

beeflow.common.parser.parser.parse_args(args=None)

Parse arguments.

Worker

Abstract base class for worker, the workload manager.

class beeflow.common.worker.worker.Worker(bee_workdir, **kwargs)

Worker interface for a generic workload manager.

abstract build_text(task)

Build text for task script.

Parameters:

task (Task) – task that we’re building a script for

Return type:

string

abstract cancel_task(job_id)

Cancel task with job_id; returns job_state.

Parameters:

job_id (integer) – to be cancelled

Return type:

string

prepare(task)

Prepare for the task; create the task save directory, etc.

abstract query_task(job_id)

Query job state for the task.

Parameters:

job_id (int) – job id to query for status.

Return type:

string

abstract submit_task(task)

Worker submits task; returns job_id, job_state.

Parameters:

task – instance of Task

Return type:

tuple (int, string)

task_save_path(task)

Return the task save path used for storing submission scripts output logs.

exception beeflow.common.worker.worker.WorkerError(*args)

Worker error class.

LSF Worker

LSF worker for workload management.

Builds command for submitting batch job.

class beeflow.common.worker.lsf_worker.LSFWorker(bee_workdir, **kwargs)

The Worker for systems where LSF is the Workload Manager.

cancel_task(job_id)

Worker cancels job, returns job_state.

query_job(job_id)

Query lsf for job status.

query_task(job_id)

Worker queries job; returns job_state.

submit_job(script)

Worker submits job-returns (job_id, job_state).

submit_task(task)

Worker builds & submits script.

write_script(task)

Build task script; returns filename of script.

Simple Worker

Simple Worker class for launching tasks on a system with no workload manager.

class beeflow.common.worker.simple_worker.SimpleWorker(container_runtime, **kwargs)

Worker interface for system with no workload manager.

cancel_task(job_id)

Cancel task with job_id; returns job_state.

Parameters:

job_id (integer) – to be cancelled

Return type:

string

query_task(job_id)

Query job state for the task.

Parameters:

job_id (int) – job id to query for status.

Return type:

string

submit_task(task)

Worker submits task; returns job_id, job_state.

Parameters:

task – instance of Task

Return type:

tuple (int, string)

Slurm Worker

Slurm worker for work load management.

Builds command for submitting batch job.

class beeflow.common.worker.slurm_worker.BaseSlurmWorker(default_account='', default_time_limit='', default_partition='', **kwargs)

Base slurm worker code.

build_text(task)

Build text for task script.

submit_job(script)

Worker submits job-returns (job_id, job_state).

submit_task(task)

Worker builds & submits script.

write_script(task)

Build task script; returns filename of script.

class beeflow.common.worker.slurm_worker.SlurmCLIWorker(default_account='', default_time_limit='', default_partition='', **kwargs)

Slurm worker interface that uses the CLI.

cancel_task(job_id)

Cancel task with job_id; returns job_state.

query_task(job_id)

Query job state for the task.

class beeflow.common.worker.slurm_worker.SlurmWorker(use_commands, **kwargs)

Main slurm worker class.

build_text(task)

Build text for task script; use template if it exists.

cancel_task(job_id)

Cancel task with job_id; returns job_state.

query_task(job_id)

Query job state for the task.

submit_task(task)

Worker submits task; returns job_id, job_state.

class beeflow.common.worker.slurm_worker.SlurmrestdWorker(bee_workdir, openapi_version, **kwargs)

Worker class for when slurmrestd is available.

cancel_task(job_id)

Worker cancels job, returns job_state.

query_task(job_id)

Worker queries job; returns job_state.

beeflow.common.worker.slurm_worker.check_slurm_error(data, msg)

Check for an error in a Slurm response.

Task Manager

Worker Interface

Mid-level interface for worker, a work load manager.

Delegates the actual work to an instance of a subclass of the abstract base class ‘Worker’. Default: ‘SlurmWorker’ class.

class beeflow.common.worker_interface.WorkerInterface(worker=<class 'beeflow.common.worker.slurm_worker.SlurmWorker'>, **kwargs)

Interface for monitoring and managing workloads and jobs.

Requires an implemented subclass of Worker to function.

cancel_task(job_id)

Cancel job for task with job_id.

Parameters:

job_id (integer) – job_id to be cancelled

Return type:

string

query_task(job_id)

Query state of job with job_id; returns job_state.

Parameters:

job_id (int) – job id to query for status.

Return type:

tuple (int, string)

submit_task(task)

Worker builds script and submits task as job returns job_id, job_state.

Parameters:

task – instance of Task

Return type:

tuple (int, string)

Workflow Data Structures

Defines data structures for holding task and workflow data.

class beeflow.common.wf_data.Hint(class_, params)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, class_, params)

Create new instance of Hint(class_, params)

__repr__()

Return a nicely formatted representation string

class_

Alias for field number 0

params

Alias for field number 1

class beeflow.common.wf_data.InputParameter(id, type, value)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, id, type, value)

Create new instance of InputParameter(id, type, value)

__repr__()

Return a nicely formatted representation string

id

Alias for field number 0

type

Alias for field number 1

value

Alias for field number 2

class beeflow.common.wf_data.OutputParameter(id, type, value, source)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, id, type, value, source)

Create new instance of OutputParameter(id, type, value, source)

__repr__()

Return a nicely formatted representation string

id

Alias for field number 0

source

Alias for field number 3

type

Alias for field number 1

value

Alias for field number 2

class beeflow.common.wf_data.Requirement(class_, params)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, class_, params)

Create new instance of Requirement(class_, params)

__repr__()

Return a nicely formatted representation string

class_

Alias for field number 0

params

Alias for field number 1

class beeflow.common.wf_data.StepInput(id, type, value, default, source, prefix, position, value_from)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, id, type, value, default, source, prefix, position, value_from)

Create new instance of StepInput(id, type, value, default, source, prefix, position, value_from)

__repr__()

Return a nicely formatted representation string

default

Alias for field number 3

id

Alias for field number 0

position

Alias for field number 6

prefix

Alias for field number 5

source

Alias for field number 4

type

Alias for field number 1

value

Alias for field number 2

value_from

Alias for field number 7

class beeflow.common.wf_data.StepOutput(id, type, value, glob)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, id, type, value, glob)

Create new instance of StepOutput(id, type, value, glob)

__repr__()

Return a nicely formatted representation string

glob

Alias for field number 3

id

Alias for field number 0

type

Alias for field number 1

value

Alias for field number 2

class beeflow.common.wf_data.Task(name, base_command, hints, requirements, inputs, outputs, stdout, stderr, workflow_id, task_id=None, workdir=None)

Data structure for holding data about a single task.

__eq__(other)

Test the equality of two tasks.

Task ID and dependencies do not factor into equality testing. :param other: the task with which to test equality :type other: Task :rtype: bool

__hash__()

Return the hash value for a task.

Return type:

int

__init__(name, base_command, hints, requirements, inputs, outputs, stdout, stderr, workflow_id, task_id=None, workdir=None)

Store a task description.

Task ID should only be given as a parameter when reconstructing the Task object from the graph database.

Parameters:
  • name (str) – the task name

  • base_command (str or list of str) – the base command to run for the task

  • hints (list of Hint) – the task hints (optional requirements)

  • requirements (list of Requirement) – the task requirements

  • inputs (list of StepInput) – the task inputs

  • outputs (list of StepOutput) – the task outputs

  • stdout (str) – the name of the file to which to redirect stdout

  • stderr (str) – the name of the file to which to redirect stderr

  • workflow_id (str) – the workflow ID

  • task_id (str, optional) – the task ID

  • workdir (path, optional) – the working directory from which to get and store data

__ne__(other)

Test the inequality of two tasks.

Parameters:

other (Task) – the task with which to test inequality

Return type:

bool

__repr__()

Construct a task’s string representation.

Return type:

str

__weakref__

list of weak references to the object (if defined)

property command

Construct a task’s command as a list.

Return type:

list of str

copy(new_id=False)

Make a copy of this task.

Parameters:

new_id (bool) – generate a new task ID

Return type:

Task

generate_task_id()

Generate a unique task ID.

Return type:

str

get_full_requirement(req_type)

Get the full requirement (or hint) for this task, if it has one.

Parameters:

req_type (str) – the type of requirement (e.g. ‘DockerRequirement’)

This prefers requirements over hints. Returns None if no hint or requirement found.

get_requirement(req_type, req_param, default=None)

Get requirement from hints or requirements, prioritizing requirements over hints.

Parameters:
  • req_type (str) – the type of requirement (e.g. ‘DockerRequirement’)

  • req_param (str) – the requirement parameter (e.g. ‘dockerFile’)

  • default (any) – default value if the requirement is not found

When requirements are specified hints will be ignored. By default, tasks need not specify hints or requirements

class beeflow.common.wf_data.TaskStateUpdate(wf_id, task_id, job_state, task_info, output, metadata)
__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, wf_id, task_id, job_state, task_info, output, metadata)

Create new instance of TaskStateUpdate(wf_id, task_id, job_state, task_info, output, metadata)

__repr__()

Return a nicely formatted representation string

job_state

Alias for field number 2

metadata

Alias for field number 5

output

Alias for field number 4

task_id

Alias for field number 1

task_info

Alias for field number 3

wf_id

Alias for field number 0

class beeflow.common.wf_data.Workflow(name, hints, requirements, inputs, outputs, workflow_id)

Data structure for holding data about a workflow.

__eq__(other)

Test the equality of two workflows.

Workflow ID and dependencies do not factor into equality testing. Currently, the code is boilerplate. We do not support multiple workflows.

Parameters:

other (Workflow) – the workflow with which to test equality

__hash__ = None
__init__(name, hints, requirements, inputs, outputs, workflow_id)

Store a workflow description.

Parameters:
  • name (string) – the workflow name

  • hints (list of Requirements) – the workflow hints

  • requirements (list of Requirements) – the workflow requirements

  • inputs (list of InputParameter) – the workflow inputs

  • outputs (list of OutputParameter) – the workflow outputs

  • workflow_id (str) – the workflow ID

__ne__(other)

Test the inequality of two workflows.

Parameters:

other (Workflow) – the workflow with which to test inequality

__repr__()

Construct a workflow’s string representation.

__weakref__

list of weak references to the object (if defined)

beeflow.common.wf_data.generate_workflow_id()

Generate a unique workflow ID.

Return type:

str

Workflow Interface

High-level BEE workflow management interface.

Delegates its work to a GraphDatabaseInterface instance.

class beeflow.common.wf_interface.WorkflowInterface(wf_id, gdb_driver=<class 'beeflow.common.gdb.neo4j_driver.Neo4jDriver'>)

Interface for manipulating workflows.

add_task(task, task_state)

Add a new task to a BEE workflow.

Parameters:

task (Task) – the name of the file to which to redirect stderr

execute_workflow()

Begin execution of a BEE workflow.

export_graphml()

Export a BEE workflow as a graphml.

finalize_task(task)

Mark a BEE workflow task as completed.

This method also automatically deduces what tasks are now runnable, updating their states to ready and returning a list of the runnable tasks.

Parameters:

task (Task) – the task to finalize

Return type:

list of Task

get_dependent_tasks(task)

Get the dependents of a task in a BEE workflow.

Parameters:

task (Task) – the task whose dependents to retrieve

Return type:

list of Task

get_ready_tasks()

Get ready tasks from a BEE workflow.

Return type:

list of Task

get_task_by_id(task_id)

Get a task by its Task ID.

Parameters:

task_id (str) – the task’s ID

Return type:

Task

get_task_input(task, input_id)

Get a task input object.

Parameters:
  • task (Task) – the task whose input to retrieve

  • input_id (str) – the ID of the input

Return type:

StepInput

get_task_metadata(task)

Get the job description metadata of a task in a BEE workflow.

Parameters:

task (Task) – the task whose metadata to retrieve

Return type:

dict

get_task_output(task, output_id)

Get a task output object.

Parameters:
  • task (Task) – the task whose output to retrieve

  • output_id (str) – the ID of the output

Return type:

StepOutput

get_task_state(task)

Get the state of the task in a BEE workflow.

Parameters:

task (Task) – the task whose state to retrieve

Return type:

str

get_workflow()

Get a loaded BEE workflow.

Returns a tuple of (workflow_description, tasks)

Return type:

tuple of (Workflow, list of Task)

get_workflow_outputs()

Get the outputs from a BEE workflow.

Return type:

list of OutputParameter

get_workflow_state()

Get the value of the workflow state.

Return type:

str

initialize_workflow(workflow)

Begin construction of a BEE workflow.

Parameters:

workflow (Workflow) – the workflow object

pause_workflow()

Pause the execution of a BEE workflow.

reset_workflow(workflow_id)

Reset the execution state and ID of a BEE workflow.

restart_task(task, checkpoint_file)

Restart a failed BEE workflow task.

The task must have a beeflow:CheckpointRequirement hint. If there are no remaining retry attemps (num_tries = 0) then the graph database is unmodified and this method returns None.

Parameters:
  • task (Task) – the task to restart

  • checkpoint_file – the task checkpoint file

Return type:

Task or None

resume_workflow()

Resume the execution of a paused BEE workflow.

set_task_input(task, input_id, value)

Set the value of a task input.

Parameters:
  • task (Task) – the task whose input to set

  • input_id (str) – the ID of the input

  • value – str or int or float

set_task_metadata(task, metadata)

Set the job description metadata of a task in a BEE workflow.

This method should not be used to update task state. set_task_state() or finalize_task() should instead be used.

Parameters:
  • task (Task) – the task whose metadata to set

  • metadata (dict) – the job description metadata

set_task_output(task, output_id, value)

Set the value of a task output.

Parameters:
  • task (Task) – the task whose output to set

  • output_id (str) – the ID of the output

  • value (str or int or float) – the output value to set

set_task_state(task, state)

Set the state of the task in a BEE workflow.

This method should not be used to set a task as completed. finalize_task() should instead be used.

Parameters:
  • task (Task) – the task whose state to set

  • state (str) – the new state of the task

set_workflow_state(state)

Set workflow’s current state.

Parameters:

state (str) – the new state of the workflow

workflow_completed()

Return true if all of a workflow’s final tasks have completed, else false.

Return type:

bool

property workflow_id

Retrieve the workflow ID from the workflow interface.

If workflow ID is not populated, this grabs it from the database.

If no workflow is loaded, None is returned. :rtype: str

Workflow Manager

Workflow Manager Resources

Workflow endpoints for getting metadata.

class beeflow.wf_manager.resources.wf_metadata.WFMetadata

Class for getting metadata.

get(wf_id)

Get and return metadata.

methods: t.ClassVar[t.Collection[str] | None] = {'GET'}

The methods this view is registered for. Uses the same default (["GET", "HEAD", "OPTIONS"]) as route and add_url_rule by default.

Workflow Manager Dependency Manager

Workflow Profiler

Workflow profiling code.

class beeflow.common.wf_profiler.WorkflowProfiler(workflow_name, output_path)

Class for profiling a single workflow’s execution.

add_scheduling_results(tasks, resources, allocations)

Add scheduling results (given the set of available resources).

add_state_change(task, next_state)

Save a change of state for a task (at each task state change).

save()

Save the workflow results (run on workflow completion).