Skip to content

Executor Module

This section documents the executor components of the Nextmv Python SDK - Local experience.

executor

Executor module for executing local runs.

This module provides functionality to execute local runs. The main function is summoned from the run function in the runner module.

FUNCTION DESCRIPTION
main

Main function to execute a local run.

execute_run

Function to execute the decision model run.

options_args

Function to convert options dictionary to command-line arguments.

process_run_input

Function to process the run input based on the format.

process_run_output

Function to process the run output and handle results.

resolve_output_format

Function to determine the output format from manifest or directory structure.

process_run_information

Function to update run metadata including duration and status.

process_run_metrics

Function to process and save run metrics.

process_run_statistics

Function to process and save run statistics.

process_run_assets

Function to process and save run assets.

process_run_solutions

Function to process and save run solutions.

process_run_visuals

Function to process and save run visuals.

resolve_stdout

Function to parse subprocess stdout output.

execute_run

execute_run(
    run_id: str,
    src: str,
    manifest_dict: dict[str, Any],
    run_dir: str,
    run_config: dict[str, Any],
    inputs_dir_path: str | None = None,
    options: dict[str, Any] | None = None,
    input_data: dict[str, Any] | str | None = None,
) -> None

Executes the decision model run using a subprocess to call the entrypoint script with the appropriate input and options.

PARAMETER DESCRIPTION

run_id

The unique identifier for the run.

TYPE: str

src

The path to the application source code.

TYPE: str

manifest_dict

The manifest dictionary containing application configuration.

TYPE: dict[str, Any]

run_dir

The path to the run directory where outputs will be stored.

TYPE: str

run_config

The run configuration containing format and other settings.

TYPE: dict[str, Any]

inputs_dir_path

The path to the directory containing input files, by default None. If provided, this parameter takes precedence over input_data.

TYPE: Optional[str] DEFAULT: None

options

Additional command-line options for the run, by default None.

TYPE: Optional[dict[str, Any]] DEFAULT: None

input_data

The input data for the run, by default None. If inputs_dir_path is provided, this parameter is ignored.

TYPE: Optional[Union[dict[str, Any], str]] DEFAULT: None

Source code in nextmv/nextmv/local/executor.py
def execute_run(
    run_id: str,
    src: str,
    manifest_dict: dict[str, Any],
    run_dir: str,
    run_config: dict[str, Any],
    inputs_dir_path: str | None = None,
    options: dict[str, Any] | None = None,
    input_data: dict[str, Any] | str | None = None,
) -> None:
    """
    Executes the decision model run using a subprocess to call the entrypoint
    script with the appropriate input and options.

    Parameters
    ----------
    run_id : str
        The unique identifier for the run.
    src : str
        The path to the application source code.
    manifest_dict : dict[str, Any]
        The manifest dictionary containing application configuration.
    run_dir : str
        The path to the run directory where outputs will be stored.
    run_config : dict[str, Any]
        The run configuration containing format and other settings.
    inputs_dir_path : Optional[str], optional
        The path to the directory containing input files, by default None. If
        provided, this parameter takes precedence over `input_data`.
    options : Optional[dict[str, Any]], optional
        Additional command-line options for the run, by default None.
    input_data : Optional[Union[dict[str, Any], str]], optional
        The input data for the run, by default None. If `inputs_dir_path` is
        provided, this parameter is ignored.
    """

    # Create the logs dir to register whatever failure might happen during the
    # execution process.
    logs_dir = os.path.join(run_dir, LOGS_KEY)
    os.makedirs(logs_dir, exist_ok=True)

    # The complete execution is wrapped to capture any errors.
    try:
        # Create a temp dir, and copy the entire src there, to have a transient
        # place to work from, and be cleaned up afterwards.
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_src = os.path.join(temp_dir, "src")
            os.makedirs(temp_src, exist_ok=True)
            manifest = Manifest.from_dict(manifest_dict)

            # Copy only the files specified in manifest.files
            _copy_files_from_manifest(src, temp_src, manifest)

            stdin_input = process_run_input(
                temp_src=temp_src,
                run_format=run_config["format"]["input"]["type"],
                manifest=manifest,
                input_data=input_data,
                inputs_dir_path=inputs_dir_path,
            )

            # Set the run status to running.
            info_file = os.path.join(run_dir, f"{run_id}.json")
            with open(info_file, "r+", encoding="utf-8") as f:
                info = json.load(f)
                info["metadata"]["status_v2"] = "running"
                f.seek(0)
                json.dump(info, f, indent=2)
                f.truncate()

            # Start a subprocess to execute the entrypoint based on the application type.
            entrypoint = os.path.join(temp_src, __determine_entrypoint(manifest))
            cwd = __determine_cwd(manifest, default=temp_src)
            args = __determine_command(manifest) + [entrypoint] + options_args(options)

            # Determine whether stdout should be streamed live to the log file.
            # For MULTI_FILE format, stdout carries log output and must be streamed
            # immediately. For other formats (JSON, CSV_ARCHIVE), stdout carries
            # structured output consumed after the process completes.
            is_multi_file = (
                manifest.configuration is not None
                and manifest.configuration.content is not None
                and manifest.configuration.content.format == ContentFormat.MULTI_FILE
            ) or run_config["format"]["input"]["type"] == ContentFormat.MULTI_FILE.value

            log_file_path = os.path.join(logs_dir, LOGS_FILE)
            stdout_lines: list[str] = []
            stderr_lines: list[str] = []

            # Force unbuffered stdout/stderr in child processes so both
            # streams are written to the log file in real time. Also force
            # UTF-8 encoding for all stdio so Windows codepage mismatches
            # (e.g. cp1252) do not cause UnicodeDecodeError.
            child_env = os.environ.copy()
            child_env["PYTHONUNBUFFERED"] = "1"
            child_env["PYTHONIOENCODING"] = "utf-8"

            # This is the process that actually executes the entrypoint script.
            # We use subprocess.Popen instead of subprocess.run because we need
            # to stream the output in real time, which is not possible with
            # subprocess.run.
            process = subprocess.Popen(
                args,
                env=child_env,
                text=True,
                encoding="utf-8",
                errors="replace",
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                cwd=cwd,
            )

            def write_stdin() -> None:
                # Write all input at once then close so the child process receives EOF.
                process.stdin.write(stdin_input)
                process.stdin.close()

            # A lock to serialize writes from the stdout and stderr threads so log
            # lines are never interleaved in the log file.
            log_lock = threading.Lock()
            log_f = open(log_file_path, "a", encoding="utf-8")

            def stream_stderr() -> None:
                # Stderr is always streamed live to the log file so errors appear
                # immediately even if the process is still running.
                for line in process.stderr:
                    with log_lock:
                        log_f.write(line)
                        log_f.flush()
                    stderr_lines.append(line)

            def stream_stdout() -> None:
                # For multi-file runs, stdout carries log output, so mirror it to
                # the log file in real time just like stderr. For other formats,
                # stdout carries the structured result and is only buffered.
                if is_multi_file:
                    for line in process.stdout:
                        with log_lock:
                            log_f.write(line)
                            log_f.flush()
                        stdout_lines.append(line)
                else:
                    for line in process.stdout:
                        stdout_lines.append(line)

            # Run stdin, stdout, and stderr on separate threads so no single pipe
            # can fill its OS buffer and block the process while another pipe waits,
            # which would cause a deadlock.
            stdin_thread = threading.Thread(target=write_stdin)
            stderr_thread = threading.Thread(target=stream_stderr)
            stdout_thread = threading.Thread(target=stream_stdout)

            stdin_thread.start()
            stderr_thread.start()
            stdout_thread.start()

            # Wait for all streams to be fully consumed before calling process.wait()
            # to ensure no output is lost.
            stdin_thread.join()
            stderr_thread.join()
            stdout_thread.join()
            process.wait()
            log_f.close()

            # Assemble a CompletedProcess so the rest of the pipeline can treat this
            # the same as a synchronous subprocess.run() result.
            result = subprocess.CompletedProcess(
                args=args,
                returncode=process.returncode,
                stdout="".join(stdout_lines),
                stderr="".join(stderr_lines),
            )

            process_run_output(
                manifest=manifest,
                run_id=run_id,
                temp_src=temp_src,
                result=result,
                run_dir=run_dir,
                src=src,
            )

    except Exception as e:
        # If we encounter an exception, we log it to the stderr log file.
        with open(os.path.join(logs_dir, LOGS_FILE), "a", encoding="utf-8") as f:
            f.write(f"\nException during run execution: {str(e)}\n")

        # Also, we update the run information file to set the status to failed.
        info_file = os.path.join(run_dir, f"{run_id}.json")
        with open(info_file, "r+", encoding="utf-8") as f:
            info = json.load(f)
            info["metadata"]["status_v2"] = "failed"
            info["metadata"]["error"] = "Run failed, please check logs for details."
            f.seek(0)
            json.dump(info, f, indent=2)
            f.truncate()

main

main() -> None

Main function to execute a local run. This function is called when executing the script directly. It loads input data (arguments) from stdin and orders the execution of the run.

Source code in nextmv/nextmv/local/executor.py
def main() -> None:
    """
    Main function to execute a local run. This function is called when
    executing the script directly. It loads input data (arguments) from stdin
    and orders the execution of the run.
    """

    input = load()
    execute_run(
        run_id=input.data["run_id"],
        src=input.data["src"],
        manifest_dict=input.data["manifest_dict"],
        run_dir=input.data["run_dir"],
        run_config=input.data["run_config"],
        inputs_dir_path=input.data["inputs_dir_path"],
        options=input.data["options"],
        input_data=input.data["input_data"],
    )

options_args

options_args(
    options: dict[str, Any] | None = None,
) -> list[str]

Converts options dictionary to a list of command-line arguments.

PARAMETER DESCRIPTION

options

Additional options for the run, by default None.

TYPE: Optional[dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
list[str]

A list of command-line arguments derived from the options.

Source code in nextmv/nextmv/local/executor.py
def options_args(options: dict[str, Any] | None = None) -> list[str]:
    """
    Converts options dictionary to a list of command-line arguments.

    Parameters
    ----------
    options : Optional[dict[str, Any]], optional
        Additional options for the run, by default None.

    Returns
    -------
    list[str]
        A list of command-line arguments derived from the options.
    """
    option_args = []

    if options is not None:
        for key, value in options.items():
            option_args.append(f"-{key}")
            option_args.append(str(value))

    return option_args

process_run_input

process_run_input(
    temp_src: str,
    run_format: str,
    manifest: Manifest,
    input_data: dict[str, Any] | str | None = None,
    inputs_dir_path: str | None = None,
) -> str

In the temp source, writes the run input according to the run format. If the format is json or text, then the input is not written anywhere, rather, it is returned as a string in this function. If the format is csv-archive, then the input files are written to an input directory. If the format is multi-file, then the input files are written to an inputs directory or to a custom location specified in the manifest.

PARAMETER DESCRIPTION

temp_src

The path to the temporary source directory.

TYPE: str

run_format

The run format, one of json, text, csv-archive, or multi-file.

TYPE: str

manifest

The application manifest.

TYPE: Manifest

input_data

The input data for the run, by default None. If inputs_dir_path is provided, this parameter is ignored.

TYPE: Optional[Union[dict[str, Any], str]] DEFAULT: None

inputs_dir_path

The path to the directory containing input files, by default None. If provided, this parameter takes precedence over input_data.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
str

The input data as a string, if the format is json or text. Otherwise, returns an empty string.

Source code in nextmv/nextmv/local/executor.py
def process_run_input(
    temp_src: str,
    run_format: str,
    manifest: Manifest,
    input_data: dict[str, Any] | str | None = None,
    inputs_dir_path: str | None = None,
) -> str:
    """
    In the temp source, writes the run input according to the run format. If
    the format is `json` or `text`, then the input is not written anywhere,
    rather, it is returned as a string in this function. If the format is
    `csv-archive`, then the input files are written to an `input` directory. If
    the format is `multi-file`, then the input files are written to an `inputs`
    directory or to a custom location specified in the manifest.

    Parameters
    ----------
    temp_src : str
        The path to the temporary source directory.
    run_format : str
        The run format, one of `json`, `text`, `csv-archive`, or `multi-file`.
    manifest : Manifest
        The application manifest.
    input_data : Optional[Union[dict[str, Any], str]], optional
        The input data for the run, by default None. If `inputs_dir_path` is
        provided, this parameter is ignored.
    inputs_dir_path : Optional[str], optional
        The path to the directory containing input files, by default None. If
        provided, this parameter takes precedence over `input_data`.

    Returns
    -------
    str
        The input data as a string, if the format is `json` or `text`. Otherwise,
        returns an empty string.
    """

    # For JSON and TEXT formats, we return the input data as a string.
    if run_format in (ContentFormat.JSON.value, InputFormat.TEXT.value):
        if isinstance(input_data, dict) and run_format == ContentFormat.JSON.value:
            return json.dumps(input_data)

        if isinstance(input_data, str) and run_format == InputFormat.TEXT.value:
            return input_data

        raise ValueError(f"invalid input data for format {run_format}")

    if input_data is not None:
        raise ValueError("input data must be None for csv-archive or multi-file format")

    # For CSV-ARCHIVE format, we write the input files to an `input` directory.
    if run_format == InputFormat.CSV_ARCHIVE.value:
        input_dir = os.path.join(temp_src, "input")
        os.makedirs(input_dir, exist_ok=True)

        if inputs_dir_path is not None and inputs_dir_path != "":
            shutil.copytree(inputs_dir_path, input_dir, dirs_exist_ok=True)

        return ""

    # For MULTI-FILE format, we write the input files to an `inputs` directory,
    # or to a custom location specified in the manifest.
    if run_format == ContentFormat.MULTI_FILE.value:
        inputs_dir = os.path.join(temp_src, INPUTS_KEY)
        if (
            manifest.configuration is not None
            and manifest.configuration.content is not None
            and manifest.configuration.content.format == ContentFormat.MULTI_FILE
            and manifest.configuration.content.multi_file is not None
        ):
            inputs_dir = os.path.join(temp_src, manifest.configuration.content.multi_file.input.path)

        os.makedirs(inputs_dir, exist_ok=True)

        if inputs_dir_path is not None and inputs_dir_path != "":
            shutil.copytree(inputs_dir_path, inputs_dir, dirs_exist_ok=True)

        return ""

process_run_output

process_run_output(
    manifest: Manifest,
    run_id: str,
    temp_src: str,
    result: CompletedProcess[str],
    run_dir: str,
    src: str,
) -> None

Processes the result of the subprocess run. This function is in charge of handling the run results, including solutions, statistics, metrics, logs, assets, and visuals.

PARAMETER DESCRIPTION

manifest

The application manifest containing configuration details.

TYPE: Manifest

run_id

The unique identifier for the run.

TYPE: str

temp_src

The path to the temporary source directory.

TYPE: str

result

The result of the subprocess run containing stdout, stderr, and return code.

TYPE: CompletedProcess[str]

run_dir

The path to the run directory where outputs will be stored.

TYPE: str

src

The path to the application source code.

TYPE: str

Source code in nextmv/nextmv/local/executor.py
def process_run_output(
    manifest: Manifest,
    run_id: str,
    temp_src: str,
    result: subprocess.CompletedProcess[str],
    run_dir: str,
    src: str,
) -> None:
    """
    Processes the result of the subprocess run. This function is in charge of
    handling the run results, including solutions, statistics, metrics, logs, assets,
    and visuals.

    Parameters
    ----------
    manifest : Manifest
        The application manifest containing configuration details.
    run_id : str
        The unique identifier for the run.
    temp_src : str
        The path to the temporary source directory.
    result : subprocess.CompletedProcess[str]
        The result of the subprocess run containing stdout, stderr, and return code.
    run_dir : str
        The path to the run directory where outputs will be stored.
    src : str
        The path to the application source code.
    """

    stdout_output = resolve_stdout(result)

    # Create outputs directory.
    outputs_dir = os.path.join(run_dir, OUTPUTS_KEY)
    os.makedirs(outputs_dir, exist_ok=True)
    temp_run_outputs_dir = os.path.join(temp_src, OUTPUTS_KEY)

    output_format = manifest.configuration.content.format
    _process_run_metrics(
        temp_run_outputs_dir=temp_run_outputs_dir,
        outputs_dir=outputs_dir,
        stdout_output=stdout_output,
        temp_src=temp_src,
        manifest=manifest,
    )
    _process_run_statistics(
        temp_run_outputs_dir=temp_run_outputs_dir,
        outputs_dir=outputs_dir,
        stdout_output=stdout_output,
        temp_src=temp_src,
        manifest=manifest,
    )
    _process_run_assets(
        temp_run_outputs_dir=temp_run_outputs_dir,
        outputs_dir=outputs_dir,
        stdout_output=stdout_output,
        temp_src=temp_src,
        manifest=manifest,
    )
    _process_run_solutions(
        run_id=run_id,
        run_dir=run_dir,
        temp_run_outputs_dir=temp_run_outputs_dir,
        temp_src=temp_src,
        outputs_dir=outputs_dir,
        stdout_output=stdout_output,
        output_format=output_format,
        manifest=manifest,
        src=src,
    )
    _process_run_visuals(
        run_dir=run_dir,
        outputs_dir=outputs_dir,
    )
    # NOTE: _process_run_information must be called last. It sets status_v2 to "succeeded"
    # or "failed", which signals to pollers that the run is complete. format.output
    # (written by _process_run_solutions) must already be on disk before that status
    # transition is visible, otherwise a poller can observe status=succeeded with
    # format_output=None and crash.
    _process_run_information(
        run_id=run_id,
        run_dir=run_dir,
        result=result,
    )

resolve_stdout

resolve_stdout(
    result: CompletedProcess[str],
) -> str | dict[str, Any]

Resolves the stdout output of the subprocess run. If the stdout is valid JSON, it returns the parsed dictionary. Otherwise, it returns the raw string output.

PARAMETER DESCRIPTION

result

The result of the subprocess run.

TYPE: CompletedProcess[str]

RETURNS DESCRIPTION
Union[str, dict[str, Any]]

The parsed stdout output as a dictionary if valid JSON, otherwise the raw string output.

Source code in nextmv/nextmv/local/executor.py
def resolve_stdout(result: subprocess.CompletedProcess[str]) -> str | dict[str, Any]:
    """
    Resolves the stdout output of the subprocess run. If the stdout is valid
    JSON, it returns the parsed dictionary. Otherwise, it returns the raw
    string output.

    Parameters
    ----------
    result : subprocess.CompletedProcess[str]
        The result of the subprocess run.

    Returns
    -------
    Union[str, dict[str, Any]]
        The parsed stdout output as a dictionary if valid JSON, otherwise the
        raw string output.
    """
    raw_output = result.stdout
    if raw_output.strip() == "":
        return ""

    try:
        return json.loads(raw_output)
    except json.JSONDecodeError:
        return raw_output