Code Generation

Code Generation#

Echodataflow packages function calls to provide flexibility in configuring and running stages, as well as managing their attributes. This encapsulation allows for the execution and deployment of individual flows on any infrastructure. To facilitate this, Echodataflow provides a CLI command that generates the boilerplate code needed to create new stages. Simply add the necessary function calls, and your new stage is ready to go. Remember to add the new stage to the rules configuration.

Subcommand#

echodataflow gs#

Usage: To generate boilerplate code for a specific stage

echodataflow gs <stage_name>

Arguments:

<stage_name>: Name of the stage for which to generate boilerplate code.

Example

echodataflow gs compute_Sv

This command creates a template configuration file for the specified stage, allowing you to customize and integrate it into your workflow. The generated file includes:

  • A flow: this orchestrates the execution of all files that need to be processed, either concurrently or in parallel, based on the configuration.

  • A task (helper function): this assists the flow by processing individual files.

Here is a snippet of the generated file. Logging statements have been removed for brevity:


@flow
@echodataflow(processing_stage="example-stage", type="FLOW")
def echodataflow_example_stage(
        groups: Dict[str, Group], config: Dataset, stage: Stage, prev_stage: Optional[Stage]
):
    working_dir = get_working_dir(stage=stage, config=config)

    futures = defaultdict(list)

    for name, gr in groups.items():
        for ed in gr.data:
            gname = ed.out_path.split(".")[0] + ".Examplestage"
            new_process = process_example_stage.with_options(
                task_run_name=gname, name=gname, retries=3
                    )
            future = new_process.submit(
                ed=ed, working_dir=working_dir, config=config, stage=stage
                )
            futures[name].append(future)

    for name, flist in futures.items():
        try:
            groups[name].data = [f.result() for f in flist]
        except Exception as e:
            groups[name].data[0].error = ErrorObject(errorFlag=True, error_desc=str(e))

    return groups

@task
@echodataflow()
def process_example_stage(
    config: Dataset, stage: Stage, out_data: Union[Dict, Output], working_dir: str
):
    
    file_name = ed.filename + "_example stage.zarr"
    try:
        out_zarr = get_out_zarr(
            group=stage.options.get("group", True),
            working_dir=working_dir,
            transect=ed.group_name,
            file_name=file_name,
            storage_options=config.output.storage_options_dict,
        )

        if (
            stage.options.get("use_offline") == False
            or isFile(out_zarr, config.output.storage_options_dict) == False
        ):

            ed_list = get_ed_list.fn(config=config, stage=stage, transect_data=ed)
            xr_d = # Processing code
            xr_d.to_zarr(
                store=out_zarr,
                mode="w",
                consolidated=True,
                storage_options=config.output.storage_options_dict,
            )
        
        ed.out_path = out_zarr
        ed.error = ErrorObject(errorFlag=False)
    except Exception as e:
        ed.error = ErrorObject(errorFlag=True, error_desc=e)
    finally:
        return ed