workflows¶
BaseWorkflow ¶
Provides an interface for creating a reusable workflow: encapsulated "boilerplate" for running, aggregating, and analyzing one or more Hydra jobs.
Attributes:
| Name | Type | Description |
|---|---|---|
cfgs |
List[Any]
|
List of configurations for each Hydra job. |
metrics |
Dict[str, List[Any]]
|
Dictionary of metrics for across all jobs. |
workflow_overrides |
Dict[str, Any]
|
Workflow parameters defined as additional arguments to |
jobs |
List[Any]
|
List of jobs returned for each experiment within the workflow. |
working_dir |
Optional[Path]
|
The working directory of the experiment defined by Hydra's sweep directory
( |
multirun_task_overrides
property
¶
Returns override param-name -> value.
A sequence of overrides associated with a multirun will
be stored in a mushin.multirun list. This
enables one to distinguish this from an override whose sole
value was a list of values.
Returns:
| Name | Type | Description |
|---|---|---|
multirun_task_overrides |
Dict[str, LoadedValue | Sequence[LoadedValue]]
|
|
Examples:
>>> from mushin import multirun, hydra_list
>>>
>>> class WorkFlow(MultiRunMetricsWorkflow):
... @staticmethod
... def task(*args, **kwargs):
... return None
>>>
>>> wf = WorkFlow()
>>> wf.run(foo=hydra_list(["val"]), bar=multirun(["a", "b"]), apple=1)
>>> wf.multirun_task_overrides
{'foo': ['val'], 'bar': multirun(['a', 'b']), 'apple': 1}
__init__ ¶
Workflows and experiments using Hydra.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
eval_task_cfg
|
The workflow configuration object. |
None
|
pre_task
staticmethod
¶
Called prior to task
This can be useful for doing things like setting random seeds, which must occur prior to instantiating objects for the evaluation task.
Notes
This function is automatically wrapped by zen, which is responsible
for parsing the function's signature and then extracting and instantiating
the corresponding fields from a Hydra config object – passing them to the
function. This behavior can be modified by self.run(pre_task_fn_wrapper=...)
task
staticmethod
¶
User-defined task that is run by the workflow. This should be a static method.
Arguments will be instantiated configuration variables. For example, if the the workflow configuration is structured as::
├── eval_task_cfg
│ ├── trainer
| ├── module
| ├── another_config
The inputs to task can be any of the three configurations:
trainer, module, or another_config such as::
@staticmethod
def task(trainer: Trainer, module: LightningModule) -> None:
trainer.fit(module)
Notes
This function is automatically wrapped by zen, which is responsible
for parsing the function's signature and then extracting and instantiating
the corresponding fields from a Hydra config object – passing them to the
function. This behavior can be modified by self.run(task_fn_wrapper=...)
validate ¶
Validates that the configuration will execute with the user-defined evaluation task
run ¶
run(
*,
working_dir=None,
sweeper=None,
launcher=None,
overrides=None,
task_fn_wrapper=zen,
pre_task_fn_wrapper=zen,
version_base=_VERSION_BASE_DEFAULT,
to_dictconfig=False,
config_name="rai_workflow",
job_name="rai_workflow",
with_log_configuration=True,
**workflow_overrides,
)
Run the experiment.
Individual workflows can explicitly define workflow_overrides to improve
readability and undstanding of what parameters are expected for a particular
workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_fn_wrapper
|
Union[Callable[[Callable[..., T1]], Callable[[Any], T1]], None]
|
A wrapper applied to |
zen
|
working_dir
|
Optional[str]
|
The directory to run the experiment in. This value is used for
setting |
None
|
sweeper
|
Optional[str]
|
The configuration name of the Hydra Sweeper to use (i.e., the override for
|
None
|
launcher
|
Optional[str]
|
The configuration name of the Hydra Launcher to use (i.e., the override for
|
None
|
overrides
|
Optional[list[str]]
|
Parameter overrides not considered part of the workflow parameter set.
This is helpful for filtering out parameters stored in
|
None
|
version_base
|
(Optional[str], optional(default=1.1))
|
Available starting with Hydra 1.2.0.
- If the |
_VERSION_BASE_DEFAULT
|
to_dictconfig
|
bool
|
If |
False
|
config_name
|
str (default: "rai_workflow")
|
Name of the stored configuration in Hydra's ConfigStore API. |
'rai_workflow'
|
job_name
|
str (default: "rai_workflow")
|
Name of job for logging. |
'rai_workflow'
|
with_log_configuration
|
bool (default: True)
|
If |
True
|
**workflow_overrides
|
Union[str, int, float, bool, dict, multirun, hydra_list]
|
These parameters represent the values for configurations to use for the experiment. Passing These values will be appended to the |
{}
|
jobs_post_process ¶
Method to extract attributes and metrics relevant to the workflow.
MultiRunMetricsWorkflow ¶
Bases: BaseWorkflow
Abstract class for workflows that record metrics using Hydra multirun.
This workflow creates subdirectories of multirun experiments using Hydra. These directories contain the Hydra YAML configuration and any saved metrics file (defined by the evaluationf task)::
├── working_dir
│ ├── <experiment directory name: 0>
│ | ├── <hydra output subdirectory: (default: .hydra)>
| | | ├── config.yaml
| | | ├── hydra.yaml
| | | ├── overrides.yaml
│ | ├── <metrics_filename>
│ ├── <experiment directory name: 1>
| | ...
The evaluation task is expected to return a dictionary that maps
metric-name (str) -> value (number | Sequence[number])
Examples:
Let's create a simple workflow where we perform a multirun over a parameter,
epsilon, and evaluate a task function that computes an accuracy and loss based on
that epsilon value and a specified scale.
>>> class LocalRobustness(MultiRunMetricsWorkflow):
... @staticmethod
... def task(epsilon: float, scale: float) -> dict:
... epsilon *= scale
... val = 100 - epsilon**2
... result = dict(accuracies=val+2, loss=epsilon**2)
... tr.save(result, "test_metrics.pt")
... return result
We'll run this workflow for six total configurations of three epsilon values and
two scale values. This will launch a Hydra multirun job and aggregate the results.
>>> wf = LocalRobustness()
>>> wf.run(epsilon=multirun([1.0, 2.0, 3.0]), scale=multirun([0.1, 1.0]))
[2022-05-02 11:57:59,219][HYDRA] Launching 6 jobs locally
[2022-05-02 11:57:59,220][HYDRA] #0 : +epsilon=1.0 +scale=0.1
[2022-05-02 11:57:59,312][HYDRA] #1 : +epsilon=1.0 +scale=1.0
[2022-05-02 11:57:59,405][HYDRA] #2 : +epsilon=2.0 +scale=0.1
[2022-05-02 11:57:59,498][HYDRA] #3 : +epsilon=2.0 +scale=1.0
[2022-05-02 11:57:59,590][HYDRA] #4 : +epsilon=3.0 +scale=0.1
[2022-05-02 11:57:59,683][HYDRA] #5 : +epsilon=3.0 +scale=1.0
Now that this workflow has run, we can view the results as an xarray-dataset whose coordinates reflect the multirun parameters that were varied, and whose data-variables are our recorded metrics: "accuracies" and "loss".
>>> ds = wf.to_xarray()
>>> ds
<xarray.Dataset>
Dimensions: (epsilon: 3, scale: 2)
Coordinates:
* epsilon (epsilon) float64 1.0 2.0 3.0
* scale (scale) float64 0.1 1.0
Data variables:
accuracies (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
loss (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0
We can also load this workflow by providing the working directory where it was run.
>>> loaded = LocalRobustness().load_from_dir(wf.working_dir)
>>> loaded.to_xarray()
<xarray.Dataset>
Dimensions: (epsilon: 3, scale: 2)
Coordinates:
* epsilon (epsilon) float64 1.0 2.0 3.0
* scale (scale) float64 0.1 1.0
Data variables:
accuracies (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
loss (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0
target_dir_multirun_overrides
property
¶
For a multirun that sweeps over the target directories of a
previous multirun, target_dir_multirun_overrides provides
the flattened overrides for that previous run.
Examples:
>>> class A(MultiRunMetricsWorkflow):
... @staticmethod
... def task(value: float, scale: float):
... pass
...
>>> a = A()
>>> a.run(value=multirun([-1.0, 0.0, 1.0]), scale=multirun([11.0, 9.0]))
[2022-05-13 17:19:51,497][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:51,497][HYDRA] #0 : +value=-1.0 +scale=11.0
[2022-05-13 17:19:51,555][HYDRA] #1 : +value=-1.0 +scale=9.0
[2022-05-13 17:19:51,729][HYDRA] #2 : +value=1.0 +scale=11.0
[2022-05-13 17:19:51,787][HYDRA] #3 : +value=1.0 +scale=9.0
>>> b = B()
>>> b.run(target_job_dirs=a.multirun_working_dirs)
[2022-05-13 17:19:59,900][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:59,900][HYDRA] #0 : +job_dir=/home/scratch/multirun/0
[2022-05-13 17:19:59,958][HYDRA] #1 : +job_dir=/home/scratch/multirun/1
[2022-05-13 17:20:00,015][HYDRA] #2 : +job_dir=/home/scratch/multirun/2
[2022-05-13 17:20:00,073][HYDRA] #3 : +job_dir=/home/scratch/multirun/3
task
staticmethod
¶
Abstract staticmethod for users to define the task that is configured and
launched by the workflow
metric_load_fn
staticmethod
¶
Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.
The default metric load function is torch.load.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Path
|
|
required |
Returns:
| Name | Type | Description |
|---|---|---|
named_metrics |
Mapping[str, Any]
|
metric-name -> metric-value(s) |
Examples:
Designing a workflow that uses the pickle module to save and load
metrics
>>> from mushin import MultiRunMetricsWorkflow, multirun
>>> import pickle
>>>
>>> class PickledWorkFlow(MultiRunMetricsWorkflow):
... @staticmethod
... def metric_load_fn(file_path: Path):
... with file_path.open("rb") as f:
... return pickle.load(f)
...
... @staticmethod
... def task(a, b):
... with open("./metrics.pkl", "wb") as f:
... pickle.dump(dict(a=a, b=b), f)
>>>
>>> wf = PickleWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
>>> wf.load_metrics("metrics.pkl")
>>> wf.metrics
dict(a=[1, 2, 3], b=[False, False, False])
load_from_dir ¶
Loading workflow job data from a given working directory. The workflow is loaded in-place and "self" is returned by this method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
working_dir
|
Union[Path, str]
|
The base working directory of the experiment. It is expected that subdirectories within this working directory will contain individual Hydra jobs data (yaml configurations) and saved metrics files. |
required |
metrics_filename
|
Union[str, Sequence[str], None]
|
The filename(s) or glob-pattern(s) uses to load the metrics.
If |
required |
Returns:
| Name | Type | Description |
|---|---|---|
loaded_workflow |
Self
|
|
load_metrics ¶
Loads and aggregates across all multirun working dirs, and stores
the metrics in self.metrics.
self.metric_load_fn is used to load each job's metric file(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics_filename
|
str | Sequence[str]
|
The filename(s) or glob-pattern(s) uses to load the metrics.
If |
required |
Returns:
| Name | Type | Description |
|---|---|---|
metrics |
Dict[str, List[Any]]
|
|
Examples:
Creating a workflow that saves named metrics using torch.save
>>> from mushin.workflows import MultiRunMetricsWorkflow, multirun
>>> import torch as tr
>>>
... class TorchWorkFlow(MultiRunMetricsWorkflow):
... @staticmethod
... def task(a, b):
... tr.save(dict(a=a, b=b), "metrics.pt")
...
>>> wf = TorchWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
[2022-06-01 12:35:51,650][HYDRA] Launching 3 jobs locally
[2022-06-01 12:35:51,650][HYDRA] #0 : +a=1 +b=False
[2022-06-01 12:35:51,715][HYDRA] #1 : +a=2 +b=False
[2022-06-01 12:35:51,780][HYDRA] #2 : +a=3 +b=False
~MultiRunMetricsWorkflow uses torch.load by default to load metrics files
(refer to ~MultiRunMetricsWorkflow.metric_load_fn to change this behavior).
to_xarray ¶
to_xarray(
include_working_subdirs_as_data_var=False,
coord_from_metrics=None,
non_multirun_params_as_singleton_dims=False,
metrics_filename=None,
)
Convert workflow data to xarray Dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_working_subdirs_as_data_var
|
(bool, optional(default=False))
|
If |
False
|
coord_from_metrics
|
str | None (default: None)
|
If not |
None
|
non_multirun_params_as_singleton_dims
|
(bool, optional(default=False))
|
If |
False
|
metrics_filename
|
Union[str, Sequence[str], None]
|
The filename or glob-pattern uses to load the metrics.
If |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
results |
Dataset
|
A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs. |
RobustnessCurve ¶
Bases: MultiRunMetricsWorkflow
Abstract class for workflows that measure performance for different perturbation values.
This workflow requires and uses parameter epsilon as the configuration option for
varying the perturbation.
See Also
MultiRunMetricsWorkflow
run ¶
run(
*,
epsilon,
task_fn_wrapper=zen,
pre_task_fn_wrapper=zen,
target_job_dirs=None,
version_base=_VERSION_BASE_DEFAULT,
working_dir=None,
sweeper=None,
launcher=None,
overrides=None,
to_dictconfig=False,
config_name="rai_workflow",
job_name="rai_workflow",
with_log_configuration=True,
**workflow_overrides,
)
Run the experiment for varying value epsilon.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
epsilon
|
Union[str, Sequence[float]]
|
The configuration parameter for the perturbation. Unlike Hydra overrides, this parameter can be a list of floats that will be converted into a multirun sequence override for Hydra. |
required |
task_fn_wrapper
|
Union[Callable[[Callable[..., T1]], Callable[[Any], T1]], None]
|
A wrapper applied to |
zen
|
working_dir
|
Optional[str]
|
The directory to run the experiment in. This value is used for
setting |
None
|
sweeper
|
Optional[str]
|
The configuration name of the Hydra Sweeper to use (i.e., the override for
|
None
|
launcher
|
Optional[str]
|
The configuration name of the Hydra Launcher to use (i.e., the override for
|
None
|
overrides
|
Optional[list[str]]
|
Parameter overrides not considered part of the workflow parameter set.
This is helpful for filtering out parameters stored in
|
None
|
**workflow_overrides
|
Union[str, int, float, bool, multirun, hydra_list]
|
These parameters represent the values for configurations to use for the experiment. These values will be appended to the |
{}
|
to_xarray ¶
to_xarray(
include_working_subdirs_as_data_var=False,
coord_from_metrics=None,
non_multirun_params_as_singleton_dims=False,
metrics_filename=None,
)
Convert workflow data to xarray Dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_working_subdirs_as_data_var
|
(bool, optional(default=False))
|
If |
False
|
coord_from_metrics
|
str | None (default: None)
|
If not |
None
|
non_multirun_params_as_singleton_dims
|
(bool, optional(default=False))
|
If |
False
|
metrics_filename
|
Union[str, Sequence[str], None]
|
The filename or glob-pattern uses to load the metrics.
If |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
results |
Dataset
|
A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs. |
plot ¶
plot(
metric,
ax=None,
group=None,
save_filename=None,
non_multirun_params_as_singleton_dims=False,
**kwargs,
)
Plot metrics versus epsilon.
Using the xarray.Dataset from to_xarray, plot the metrics
against the workflow perturbation parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric
|
str
|
The metric saved |
required |
ax
|
Any
|
If not |
None
|
group
|
Optional[str]
|
Needed if other parameters besides |
None
|
save_filename
|
Optional[str]
|
If not |
None
|
non_multirun_params_as_singleton_dims
|
(bool, optional(default=False))
|
If |
False
|
**kwargs
|
Additional arguments passed to |
{}
|