Skip to content

workflows

BaseWorkflow

Provides an interface for creating a reusable workflow: encapsulated "boilerplate" for running, aggregating, and analyzing one or more Hydra jobs.

Attributes:

Name Type Description
cfgs List[Any]

List of configurations for each Hydra job.

metrics Dict[str, List[Any]]

Dictionary of metrics for across all jobs.

workflow_overrides Dict[str, Any]

Workflow parameters defined as additional arguments to run.

jobs List[Any]

List of jobs returned for each experiment within the workflow.

working_dir Optional[Path]

The working directory of the experiment defined by Hydra's sweep directory (hydra.sweep.dir).

multirun_task_overrides property

multirun_task_overrides

Returns override param-name -> value.

A sequence of overrides associated with a multirun will be stored in a mushin.multirun list. This enables one to distinguish this from an override whose sole value was a list of values.

Returns:

Name Type Description
multirun_task_overrides Dict[str, LoadedValue | Sequence[LoadedValue]]

Examples:

>>> from mushin import multirun, hydra_list
>>>
>>> class WorkFlow(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(*args, **kwargs):
...         return None
>>>
>>> wf = WorkFlow()
>>> wf.run(foo=hydra_list(["val"]), bar=multirun(["a", "b"]), apple=1)
>>> wf.multirun_task_overrides
{'foo': ['val'], 'bar': multirun(['a', 'b']), 'apple': 1}

__init__

__init__(eval_task_cfg=None)

Workflows and experiments using Hydra.

Parameters:

Name Type Description Default
eval_task_cfg

The workflow configuration object.

None

pre_task staticmethod

pre_task(*args, **kwargs)

Called prior to task

This can be useful for doing things like setting random seeds, which must occur prior to instantiating objects for the evaluation task.

Notes

This function is automatically wrapped by zen, which is responsible for parsing the function's signature and then extracting and instantiating the corresponding fields from a Hydra config object – passing them to the function. This behavior can be modified by self.run(pre_task_fn_wrapper=...)

task staticmethod

task(*args, **kwargs)

User-defined task that is run by the workflow. This should be a static method.

Arguments will be instantiated configuration variables. For example, if the the workflow configuration is structured as::

├── eval_task_cfg
│    ├── trainer
|    ├── module
|    ├── another_config

The inputs to task can be any of the three configurations: trainer, module, or another_config such as::

@staticmethod
def task(trainer: Trainer, module: LightningModule) -> None:
    trainer.fit(module)
Notes

This function is automatically wrapped by zen, which is responsible for parsing the function's signature and then extracting and instantiating the corresponding fields from a Hydra config object – passing them to the function. This behavior can be modified by self.run(task_fn_wrapper=...)

validate

validate(include_pre_task=True)

Validates that the configuration will execute with the user-defined evaluation task

run

run(
    *,
    working_dir=None,
    sweeper=None,
    launcher=None,
    overrides=None,
    task_fn_wrapper=zen,
    pre_task_fn_wrapper=zen,
    version_base=_VERSION_BASE_DEFAULT,
    to_dictconfig=False,
    config_name="rai_workflow",
    job_name="rai_workflow",
    with_log_configuration=True,
    **workflow_overrides,
)

Run the experiment.

Individual workflows can explicitly define workflow_overrides to improve readability and undstanding of what parameters are expected for a particular workflow.

Parameters:

Name Type Description Default
task_fn_wrapper Union[Callable[[Callable[..., T1]], Callable[[Any], T1]], None]

A wrapper applied to self.task prior to launching the task. The default wrapper is mushin.zen. Specify None for no wrapper to be applied.

zen
working_dir Optional[str]

The directory to run the experiment in. This value is used for setting hydra.sweep.dir.

None
sweeper Optional[str]

The configuration name of the Hydra Sweeper to use (i.e., the override for hydra/sweeper=sweeper)

None
launcher Optional[str]

The configuration name of the Hydra Launcher to use (i.e., the override for hydra/launcher=launcher)

None
overrides Optional[list[str]]

Parameter overrides not considered part of the workflow parameter set. This is helpful for filtering out parameters stored in self.workflow_overrides.

None
version_base (Optional[str], optional(default=1.1))

Available starting with Hydra 1.2.0. - If the version_base parameter is not specified, Hydra 1.x will use defaults compatible with version 1.1. Also in this case, a warning is issued to indicate an explicit version_base is preferred. - If the version_base parameter is None, then the defaults are chosen for the current minor Hydra version. For example for Hydra 1.2, then would imply config_path=None and hydra.job.chdir=False. - If the version_base parameter is an explicit version string like "1.1", then the defaults appropriate to that version are used.

_VERSION_BASE_DEFAULT
to_dictconfig bool

If True, convert a dataclasses.dataclass to a omegaconf.DictConfig. Note, this will remove Hydra's capability for validation with structured configurations.

False
config_name str (default: "rai_workflow")

Name of the stored configuration in Hydra's ConfigStore API.

'rai_workflow'
job_name str (default: "rai_workflow")

Name of job for logging.

'rai_workflow'
with_log_configuration bool (default: True)

If True, enables the configuration of the logging subsystem from the loaded config.

True
**workflow_overrides Union[str, int, float, bool, dict, multirun, hydra_list]

These parameters represent the values for configurations to use for the experiment.

Passing param=multirun([1, 2, 3]) will perform a multirun over those three param values, whereas passing param=hydra_list([1, 2, 3]) will pass the entire list as a single input.

These values will be appended to the overrides for the Hydra job.

{}

jobs_post_process

jobs_post_process()

Method to extract attributes and metrics relevant to the workflow.

plot

plot(**kwargs)

Plot workflow metrics.

to_xarray

to_xarray()

Convert workflow data to xArray Dataset or DataArray.

MultiRunMetricsWorkflow

Bases: BaseWorkflow

Abstract class for workflows that record metrics using Hydra multirun.

This workflow creates subdirectories of multirun experiments using Hydra. These directories contain the Hydra YAML configuration and any saved metrics file (defined by the evaluationf task)::

├── working_dir
│    ├── <experiment directory name: 0>
│    |    ├── <hydra output subdirectory: (default: .hydra)>
|    |    |    ├── config.yaml
|    |    |    ├── hydra.yaml
|    |    |    ├── overrides.yaml
│    |    ├── <metrics_filename>
│    ├── <experiment directory name: 1>
|    |    ...

The evaluation task is expected to return a dictionary that maps metric-name (str) -> value (number | Sequence[number])

Examples:

Let's create a simple workflow where we perform a multirun over a parameter, epsilon, and evaluate a task function that computes an accuracy and loss based on that epsilon value and a specified scale.

>>> from mushin.workflows import MultiRunMetricsWorkflow
>>> from mushin import multirun
>>> class LocalRobustness(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(epsilon: float, scale: float) -> dict:
...         epsilon *= scale
...         val = 100 - epsilon**2
...         result = dict(accuracies=val+2, loss=epsilon**2)
...         tr.save(result, "test_metrics.pt")
...         return result

We'll run this workflow for six total configurations of three epsilon values and two scale values. This will launch a Hydra multirun job and aggregate the results.

>>> wf = LocalRobustness()
>>> wf.run(epsilon=multirun([1.0, 2.0, 3.0]), scale=multirun([0.1, 1.0]))
[2022-05-02 11:57:59,219][HYDRA] Launching 6 jobs locally
[2022-05-02 11:57:59,220][HYDRA]    #0 : +epsilon=1.0 +scale=0.1
[2022-05-02 11:57:59,312][HYDRA]    #1 : +epsilon=1.0 +scale=1.0
[2022-05-02 11:57:59,405][HYDRA]    #2 : +epsilon=2.0 +scale=0.1
[2022-05-02 11:57:59,498][HYDRA]    #3 : +epsilon=2.0 +scale=1.0
[2022-05-02 11:57:59,590][HYDRA]    #4 : +epsilon=3.0 +scale=0.1
[2022-05-02 11:57:59,683][HYDRA]    #5 : +epsilon=3.0 +scale=1.0

Now that this workflow has run, we can view the results as an xarray-dataset whose coordinates reflect the multirun parameters that were varied, and whose data-variables are our recorded metrics: "accuracies" and "loss".

>>> ds = wf.to_xarray()
>>> ds
<xarray.Dataset>
Dimensions:     (epsilon: 3, scale: 2)
Coordinates:
* epsilon     (epsilon) float64 1.0 2.0 3.0
* scale       (scale) float64 0.1 1.0
Data variables:
    accuracies  (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
    loss        (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0

We can also load this workflow by providing the working directory where it was run.

>>> loaded = LocalRobustness().load_from_dir(wf.working_dir)
>>> loaded.to_xarray()
<xarray.Dataset>
Dimensions:     (epsilon: 3, scale: 2)
Coordinates:
* epsilon     (epsilon) float64 1.0 2.0 3.0
* scale       (scale) float64 0.1 1.0
Data variables:
    accuracies  (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
    loss        (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0

target_dir_multirun_overrides property

target_dir_multirun_overrides

For a multirun that sweeps over the target directories of a previous multirun, target_dir_multirun_overrides provides the flattened overrides for that previous run.

Examples:

>>> class A(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(value: float, scale: float):
...         pass
...
>>> class B(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task():
...         pass
>>> a = A()
>>> a.run(value=multirun([-1.0, 0.0, 1.0]), scale=multirun([11.0, 9.0]))
[2022-05-13 17:19:51,497][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:51,497][HYDRA]        #0 : +value=-1.0 +scale=11.0
[2022-05-13 17:19:51,555][HYDRA]        #1 : +value=-1.0 +scale=9.0
[2022-05-13 17:19:51,729][HYDRA]        #2 : +value=1.0 +scale=11.0
[2022-05-13 17:19:51,787][HYDRA]        #3 : +value=1.0 +scale=9.0
>>> b = B()
>>> b.run(target_job_dirs=a.multirun_working_dirs)
[2022-05-13 17:19:59,900][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:59,900][HYDRA]        #0 : +job_dir=/home/scratch/multirun/0
[2022-05-13 17:19:59,958][HYDRA]        #1 : +job_dir=/home/scratch/multirun/1
[2022-05-13 17:20:00,015][HYDRA]        #2 : +job_dir=/home/scratch/multirun/2
[2022-05-13 17:20:00,073][HYDRA]        #3 : +job_dir=/home/scratch/multirun/3
>>> b.target_dir_multirun_overrides
{'value': [-1.0, -1.0, 1.0, 1.0],
 'scale': [11.0, 9.0, 11.0, 9.0]}

task staticmethod

task(*args, **kwargs)

Abstract staticmethod for users to define the task that is configured and launched by the workflow

metric_load_fn staticmethod

metric_load_fn(file_path)

Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.

The default metric load function is torch.load.

Parameters:

Name Type Description Default
file_path Path
required

Returns:

Name Type Description
named_metrics Mapping[str, Any]

metric-name -> metric-value(s)

Examples:

Designing a workflow that uses the pickle module to save and load metrics

>>> from mushin import MultiRunMetricsWorkflow, multirun
>>> import pickle
>>>
>>> class PickledWorkFlow(MultiRunMetricsWorkflow):
...     @staticmethod
...     def metric_load_fn(file_path: Path):
...         with file_path.open("rb") as f:
...             return pickle.load(f)
...
...     @staticmethod
...     def task(a, b):
...         with open("./metrics.pkl", "wb") as f:
...             pickle.dump(dict(a=a, b=b), f)
>>>
>>> wf = PickleWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
>>> wf.load_metrics("metrics.pkl")
>>> wf.metrics
dict(a=[1, 2, 3], b=[False, False, False])

load_from_dir

load_from_dir(working_dir, metrics_filename)

Loading workflow job data from a given working directory. The workflow is loaded in-place and "self" is returned by this method.

Parameters:

Name Type Description Default
working_dir Union[Path, str]

The base working directory of the experiment. It is expected that subdirectories within this working directory will contain individual Hydra jobs data (yaml configurations) and saved metrics files.

required
metrics_filename Union[str, Sequence[str], None]

The filename(s) or glob-pattern(s) uses to load the metrics. If None, the metrics stored in self.metrics is used.

required

Returns:

Name Type Description
loaded_workflow Self

load_metrics

load_metrics(metrics_filename)

Loads and aggregates across all multirun working dirs, and stores the metrics in self.metrics.

self.metric_load_fn is used to load each job's metric file(s).

Parameters:

Name Type Description Default
metrics_filename str | Sequence[str]

The filename(s) or glob-pattern(s) uses to load the metrics. If None, the metrics stored in self.metrics is used.

required

Returns:

Name Type Description
metrics Dict[str, List[Any]]

Examples:

Creating a workflow that saves named metrics using torch.save

>>> from mushin.workflows import MultiRunMetricsWorkflow, multirun
>>> import torch as tr
>>>
... class TorchWorkFlow(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(a, b):
...         tr.save(dict(a=a, b=b), "metrics.pt")
...
>>> wf = TorchWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
[2022-06-01 12:35:51,650][HYDRA] Launching 3 jobs locally
[2022-06-01 12:35:51,650][HYDRA]        #0 : +a=1 +b=False
[2022-06-01 12:35:51,715][HYDRA]        #1 : +a=2 +b=False
[2022-06-01 12:35:51,780][HYDRA]        #2 : +a=3 +b=False

~MultiRunMetricsWorkflow uses torch.load by default to load metrics files (refer to ~MultiRunMetricsWorkflow.metric_load_fn to change this behavior).

>>> wf.load_metrics("metrics.pt")
defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]})
>>> wf.metrics
defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]})

to_xarray

to_xarray(
    include_working_subdirs_as_data_var=False,
    coord_from_metrics=None,
    non_multirun_params_as_singleton_dims=False,
    metrics_filename=None,
)

Convert workflow data to xarray Dataset.

Parameters:

Name Type Description Default
include_working_subdirs_as_data_var (bool, optional(default=False))

If True then the data-variable "working_subdir" will be included in the xarray. This data variable is used to lookup the working sub-dir path (a string) by multirun coordinate.

False
coord_from_metrics str | None (default: None)

If not None defines the metric key to use as a coordinate in the Dataset. This function assumes that this coordinate represents the leading dimension for all data-variables.

None
non_multirun_params_as_singleton_dims (bool, optional(default=False))

If True then non-multirun entries from workflow_overrides will be included as length-1 dimensions in the xarray. Useful for merging/ concatenation with other Datasets

False
metrics_filename Union[str, Sequence[str], None]

The filename or glob-pattern uses to load the metrics. If None, the metrics stored in self.metrics is used.

None

Returns:

Name Type Description
results Dataset

A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs.

RobustnessCurve

Bases: MultiRunMetricsWorkflow

Abstract class for workflows that measure performance for different perturbation values.

This workflow requires and uses parameter epsilon as the configuration option for varying the perturbation.

See Also

MultiRunMetricsWorkflow

run

run(
    *,
    epsilon,
    task_fn_wrapper=zen,
    pre_task_fn_wrapper=zen,
    target_job_dirs=None,
    version_base=_VERSION_BASE_DEFAULT,
    working_dir=None,
    sweeper=None,
    launcher=None,
    overrides=None,
    to_dictconfig=False,
    config_name="rai_workflow",
    job_name="rai_workflow",
    with_log_configuration=True,
    **workflow_overrides,
)

Run the experiment for varying value epsilon.

Parameters:

Name Type Description Default
epsilon Union[str, Sequence[float]]

The configuration parameter for the perturbation. Unlike Hydra overrides, this parameter can be a list of floats that will be converted into a multirun sequence override for Hydra.

required
task_fn_wrapper Union[Callable[[Callable[..., T1]], Callable[[Any], T1]], None]

A wrapper applied to self.task prior to launching the task. The default wrapper is mushin.zen. Specify None for no wrapper to be applied.

zen
working_dir Optional[str]

The directory to run the experiment in. This value is used for setting hydra.sweep.dir.

None
sweeper Optional[str]

The configuration name of the Hydra Sweeper to use (i.e., the override for hydra/sweeper=sweeper)

None
launcher Optional[str]

The configuration name of the Hydra Launcher to use (i.e., the override for hydra/launcher=launcher)

None
overrides Optional[list[str]]

Parameter overrides not considered part of the workflow parameter set. This is helpful for filtering out parameters stored in self.workflow_overrides.

None
**workflow_overrides Union[str, int, float, bool, multirun, hydra_list]

These parameters represent the values for configurations to use for the experiment.

These values will be appended to the overrides for the Hydra job.

{}

to_xarray

to_xarray(
    include_working_subdirs_as_data_var=False,
    coord_from_metrics=None,
    non_multirun_params_as_singleton_dims=False,
    metrics_filename=None,
)

Convert workflow data to xarray Dataset.

Parameters:

Name Type Description Default
include_working_subdirs_as_data_var (bool, optional(default=False))

If True then the data-variable "working_subdir" will be included in the xarray. This data variable is used to lookup the working sub-dir path (a string) by multirun coordinate.

False
coord_from_metrics str | None (default: None)

If not None defines the metric key to use as a coordinate in the Dataset. This function assumes that this coordinate represents the leading dimension for all data-variables.

None
non_multirun_params_as_singleton_dims (bool, optional(default=False))

If True then non-multirun entries from workflow_overrides will be included as length-1 dimensions in the xarray. Useful for merging/ concatenation with other Datasets

False
metrics_filename Union[str, Sequence[str], None]

The filename or glob-pattern uses to load the metrics. If None, the metrics stored in self.metrics is used.

None

Returns:

Name Type Description
results Dataset

A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs.

plot

plot(
    metric,
    ax=None,
    group=None,
    save_filename=None,
    non_multirun_params_as_singleton_dims=False,
    **kwargs,
)

Plot metrics versus epsilon.

Using the xarray.Dataset from to_xarray, plot the metrics against the workflow perturbation parameters.

Parameters:

Name Type Description Default
metric str

The metric saved

required
ax Any

If not None, the matplotlib.Axes to use for plotting.

None
group Optional[str]

Needed if other parameters besides epsilon were varied.

None
save_filename Optional[str]

If not None save figure to the filename provided.

None
non_multirun_params_as_singleton_dims (bool, optional(default=False))

If True then non-multirun entries from workflow_overrides will be included as length-1 dimensions in the xarray. Useful for merging/ concatenation with other Datasets

False
**kwargs

Additional arguments passed to xarray.plot.

{}