Data Processing Pipeline Configuration#
This documentation outlines the YAML configuration used to define and manage the data processing pipeline. The configuration governs the execution of a series of stages, each responsible for specific processing tasks.
Active Recipe#
The active_recipe parameter specifies the recipe to be executed on the input data. This configuration influences the workflow by determining the specific actions taken within the pipeline.
Dask Cluster Configuration#
use_local_dask: When set totrue, initiates a local Dask cluster with three workers by default. This distributed computing framework enhances processing efficiency.n_workers: Specifies the number of workers for the local Dask cluster.scheduler_address: Option to specify the address for the Dask scheduler. Use this oruse_local_daskto control cluster creation. For more precise control, refer toprefect_configto setDaskTaskRunner.
Pipeline Configuration#
The pipeline is composed of a sequence of stages, each with its own distinct attributes.
Stage Configuration#
recipe_name: The name of the recipe associated with this stage.stages: A list of stages the data passes through within the pipeline.
Stage Attributes#
Each stage contains:
name: The function to execute within this stage.module: The module in which the function is located.options: Configurable settings specific to the Echodataflow functionality.
Options#
save_raw_file: Whentrue, saves the downloaded raw file to the output directory.use_raw_offline: Skips the download process, utilizing the raw file present in the output directory. Missing files are downloaded.use_offline: Skips the current process if Zarr files exist in the output directory.out_path: Configures the output directory for the current process.
Prefect Configuration#
The prefect_config section configures Prefect-related settings for the flow. Refer https://docs.prefect.io/2.11.5/concepts/flows/#flow-settings for all the options available for configuration.
retries: Determines the number of retries the flow attempts before transitioning to a failure state.task_runner: Sets the task runner configuration for this specific stage. E.g.,DaskTaskRunnerwith a designated address.persist_result: Whentrue, persists Prefect results. Useful for advanced Prefect configurations.result_storage: Specifies the location and serializer for storing results.
External Parameters#
external_params: This section allows configuring external parameters relevant to the function. Currently supports only primitive types. Parameters specific to the current process can be defined here.
Example Stages#
Here’s an overview of some of the pipeline stages:
echodataflow_open_raw: Executes theopen_rawfunction from theechodataflow.stages.subflows.open_rawmodule. Allows various options and Prefect configurations.echodataflow_combine_echodata: Utilizes thecombine_echodatafunction from the designated module, with relevant options and configurations.echodataflow_compute_Sv: Executes thecompute_Svfunction, supporting offline mode.echodataflow_compute_MVBS: Executes thecompute_MVBSfunction, supporting offline mode. External parameters likerange_meter_binandping_time_bincan be set here.
Example:
active_recipe: standard # Specify the recipe to execute on input data
use_local_dask: false # set to true to spin up a local dask cluster of n_workers workers
scheduler_address: tcp://127.0.0.1:61918 # Specify scheduler address or use_local_dask to control cluster creation. For more granular control, under prefect_config, use DaskTaskRunner(address=<scheduler_address>)
pipeline: # List of pipeline configurations; only the active_recipe will be executed.
- recipe_name: standard # Name of the recipe
stages: # list of processes to execute on the data
- name: echodataflow_open_raw # Name of the function
module: echodataflow.stages.subflows.open_raw # Module of the function
options: # Echodataflow configuration options
save_raw_file: true # Save the downloaded raw file to output directory. Refer <link> for more information on how to configure output directory.
use_raw_offline: true # Skip the download process and take the raw file present in the output directory. Note: Missing files will be downloaded in the output directory.
use_offline: true # Skip this process if zarr files are already present in the output directory.
out_path: ./temp_files # Configure the output directory for this process
prefect_config: # Configure any prefect related settings for a flow. For an exhaustive list of configurations refer <https://docs.prefect.io/2.11.5/concepts/flows/#flow-settings>. Task based configurations are optimized and handled by echodataflow
retries: 3 # Number of retries before failing the flow
task_runner: DaskTaskRunner(address=tcp://127.0.0.1:59487) # Configure Runner setting for this specific stage
persist_result: true # Persist the prefect results. Note: By default the output will be stored in the output directory, this option should only be used if dealing with advanced prefect configuration and integration
result_storage: LocalFileSystem(basepath=my-results) # Location and type of serializer to be used for storing the result
external_params: # External parameters relevant to the function can be configured using below. Currently only primitive types are supported under this configuration
sonar_model: EK60
xml_path: s3//
- name: echodataflow_combine_echodata
module: echodataflow.stages.subflows.combine_echodata
options:
use_offline: true
prefect_config:
retries: 0
task_runner: DaskTaskRunner(address=tcp://127.0.0.1:59487)
- name: echodataflow_compute_Sv
module: echodataflow.stages.subflows.compute_Sv
options:
use_offline: true
- name: echodataflow_compute_MVBS
module: echodataflow.stages.subflows.compute_MVBS
options:
use_offline: true
external_params:
range_meter_bin: 20m
ping_time_bin: 20S
Here are a couple of example configurations used during the demo: