Skip to content

Docker

Docker Environment class

Full source code
import logging
import os
import platform
import shlex
import subprocess
import uuid
from typing import Any

from pydantic import BaseModel

from minisweagent.exceptions import Submitted
from minisweagent.utils.serialize import recursive_merge


class DockerEnvironmentConfig(BaseModel):
    image: str
    cwd: str = "/"
    """Working directory in which to execute commands."""
    env: dict[str, str] = {}
    """Environment variables to set in the container."""
    forward_env: list[str] = []
    """Environment variables to forward to the container.
    Variables are only forwarded if they are set in the host environment.
    In case of conflict with `env`, the `env` variables take precedence.
    """
    timeout: int = 30
    """Timeout for executing commands in the container."""
    executable: str = os.getenv("MSWEA_DOCKER_EXECUTABLE", "docker")
    """Path to the docker/container executable."""
    run_args: list[str] = ["--rm"]
    """Additional arguments to pass to the docker/container executable.
    Default is ["--rm"], which removes the container after it exits.
    """
    container_timeout: str = "2h"
    """Max duration to keep container running. Uses the same format as the sleep command."""
    pull_timeout: int = 120
    """Timeout in seconds for pulling images."""
    interpreter: list[str] = ["bash", "-lc"]
    """Interpreter to use to execute commands. Default is ["bash", "-lc"].
    The actual command will be appended as argument to this. Override this to e.g., modify shell flags
    (e.g., to remove the `-l` flag to disable login shell) or to use python instead of bash to interpret commands.
    """


class DockerEnvironment:
    def __init__(
        self,
        *,
        config_class: type = DockerEnvironmentConfig,
        logger: logging.Logger | None = None,
        **kwargs,
    ):
        """This class executes bash commands in a Docker container using direct docker commands.
        See `DockerEnvironmentConfig` for keyword arguments.
        """
        self.logger = logger or logging.getLogger("minisweagent.environment")
        self.container_id: str | None = None
        self.config = config_class(**kwargs)
        self._start_container()

    def get_template_vars(self, **kwargs) -> dict[str, Any]:
        return recursive_merge(self.config.model_dump(), platform.uname()._asdict(), kwargs)

    def serialize(self) -> dict:
        return {
            "info": {
                "config": {
                    "environment": self.config.model_dump(mode="json"),
                    "environment_type": f"{self.__class__.__module__}.{self.__class__.__name__}",
                }
            }
        }

    def _start_container(self):
        """Start the Docker container and return the container ID."""
        container_name = f"minisweagent-{uuid.uuid4().hex[:8]}"
        cmd = [
            self.config.executable,
            "run",
            "-d",
            "--name",
            container_name,
            "-w",
            self.config.cwd,
            *self.config.run_args,
            self.config.image,
            "sleep",
            self.config.container_timeout,
        ]
        self.logger.debug(f"Starting container with command: {shlex.join(cmd)}")
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=self.config.pull_timeout,  # docker pull might take a while
            check=True,
        )
        self.logger.info(f"Started container {container_name} with ID {result.stdout.strip()}")
        self.container_id = result.stdout.strip()

    def execute(self, action: dict, cwd: str = "", *, timeout: int | None = None) -> dict[str, Any]:
        """Execute a command in the Docker container and return the result as a dict."""
        command = action.get("command", "")
        cwd = cwd or self.config.cwd
        assert self.container_id, "Container not started"

        cmd = [self.config.executable, "exec", "-w", cwd]
        for key in self.config.forward_env:
            if (value := os.getenv(key)) is not None:
                cmd.extend(["-e", f"{key}={value}"])
        for key, value in self.config.env.items():
            cmd.extend(["-e", f"{key}={value}"])
        cmd.extend([self.container_id, *self.config.interpreter, command])

        try:
            result = subprocess.run(
                cmd,
                text=True,
                timeout=timeout or self.config.timeout,
                encoding="utf-8",
                errors="replace",
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
            )
            output = {"output": result.stdout, "returncode": result.returncode, "exception_info": ""}
        except Exception as e:
            raw_output = getattr(e, "output", None)
            raw_output = (
                raw_output.decode("utf-8", errors="replace") if isinstance(raw_output, bytes) else (raw_output or "")
            )
            output = {
                "output": raw_output,
                "returncode": -1,
                "exception_info": f"An error occurred while executing the command: {e}",
                "extra": {"exception_type": type(e).__name__, "exception": str(e)},
            }
        self._check_finished(output)
        return output

    def _check_finished(self, output: dict):
        """Raises Submitted if the output indicates task completion."""
        lines = output.get("output", "").lstrip().splitlines(keepends=True)
        if lines and lines[0].strip() == "COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT" and output["returncode"] == 0:
            submission = "".join(lines[1:])
            raise Submitted(
                {
                    "role": "exit",
                    "content": submission,
                    "extra": {"exit_status": "Submitted", "submission": submission},
                }
            )

    def cleanup(self):
        """Stop and remove the Docker container."""
        if getattr(self, "container_id", None) is not None:  # if init fails early, container_id might not be set
            cmd = f"(timeout 60 {self.config.executable} stop {self.container_id} || {self.config.executable} rm -f {self.container_id}) >/dev/null 2>&1 &"
            subprocess.Popen(cmd, shell=True)

    def __del__(self):
        """Cleanup container when object is destroyed."""
        self.cleanup()

minisweagent.environments.docker

DockerEnvironmentConfig

Bases: BaseModel

image instance-attribute

image: str

cwd class-attribute instance-attribute

cwd: str = '/'

Working directory in which to execute commands.

env class-attribute instance-attribute

env: dict[str, str] = {}

Environment variables to set in the container.

forward_env class-attribute instance-attribute

forward_env: list[str] = []

Environment variables to forward to the container. Variables are only forwarded if they are set in the host environment. In case of conflict with env, the env variables take precedence.

timeout class-attribute instance-attribute

timeout: int = 30

Timeout for executing commands in the container.

executable class-attribute instance-attribute

executable: str = getenv(
    "MSWEA_DOCKER_EXECUTABLE", "docker"
)

Path to the docker/container executable.

run_args class-attribute instance-attribute

run_args: list[str] = ['--rm']

Additional arguments to pass to the docker/container executable. Default is ["--rm"], which removes the container after it exits.

container_timeout class-attribute instance-attribute

container_timeout: str = '2h'

Max duration to keep container running. Uses the same format as the sleep command.

pull_timeout class-attribute instance-attribute

pull_timeout: int = 120

Timeout in seconds for pulling images.

interpreter class-attribute instance-attribute

interpreter: list[str] = ['bash', '-lc']

Interpreter to use to execute commands. Default is ["bash", "-lc"]. The actual command will be appended as argument to this. Override this to e.g., modify shell flags (e.g., to remove the -l flag to disable login shell) or to use python instead of bash to interpret commands.

DockerEnvironment

DockerEnvironment(
    *,
    config_class: type = DockerEnvironmentConfig,
    logger: Logger | None = None,
    **kwargs,
)

This class executes bash commands in a Docker container using direct docker commands. See DockerEnvironmentConfig for keyword arguments.

Source code in src/minisweagent/environments/docker.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    *,
    config_class: type = DockerEnvironmentConfig,
    logger: logging.Logger | None = None,
    **kwargs,
):
    """This class executes bash commands in a Docker container using direct docker commands.
    See `DockerEnvironmentConfig` for keyword arguments.
    """
    self.logger = logger or logging.getLogger("minisweagent.environment")
    self.container_id: str | None = None
    self.config = config_class(**kwargs)
    self._start_container()

logger instance-attribute

logger = logger or getLogger('minisweagent.environment')

container_id instance-attribute

container_id: str | None = None

config instance-attribute

config = config_class(**kwargs)

get_template_vars

get_template_vars(**kwargs) -> dict[str, Any]
Source code in src/minisweagent/environments/docker.py
61
62
def get_template_vars(self, **kwargs) -> dict[str, Any]:
    return recursive_merge(self.config.model_dump(), platform.uname()._asdict(), kwargs)

serialize

serialize() -> dict
Source code in src/minisweagent/environments/docker.py
64
65
66
67
68
69
70
71
72
def serialize(self) -> dict:
    return {
        "info": {
            "config": {
                "environment": self.config.model_dump(mode="json"),
                "environment_type": f"{self.__class__.__module__}.{self.__class__.__name__}",
            }
        }
    }

execute

execute(
    action: dict,
    cwd: str = "",
    *,
    timeout: int | None = None,
) -> dict[str, Any]

Execute a command in the Docker container and return the result as a dict.

Source code in src/minisweagent/environments/docker.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def execute(self, action: dict, cwd: str = "", *, timeout: int | None = None) -> dict[str, Any]:
    """Execute a command in the Docker container and return the result as a dict."""
    command = action.get("command", "")
    cwd = cwd or self.config.cwd
    assert self.container_id, "Container not started"

    cmd = [self.config.executable, "exec", "-w", cwd]
    for key in self.config.forward_env:
        if (value := os.getenv(key)) is not None:
            cmd.extend(["-e", f"{key}={value}"])
    for key, value in self.config.env.items():
        cmd.extend(["-e", f"{key}={value}"])
    cmd.extend([self.container_id, *self.config.interpreter, command])

    try:
        result = subprocess.run(
            cmd,
            text=True,
            timeout=timeout or self.config.timeout,
            encoding="utf-8",
            errors="replace",
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        output = {"output": result.stdout, "returncode": result.returncode, "exception_info": ""}
    except Exception as e:
        raw_output = getattr(e, "output", None)
        raw_output = (
            raw_output.decode("utf-8", errors="replace") if isinstance(raw_output, bytes) else (raw_output or "")
        )
        output = {
            "output": raw_output,
            "returncode": -1,
            "exception_info": f"An error occurred while executing the command: {e}",
            "extra": {"exception_type": type(e).__name__, "exception": str(e)},
        }
    self._check_finished(output)
    return output

cleanup

cleanup()

Stop and remove the Docker container.

Source code in src/minisweagent/environments/docker.py
153
154
155
156
157
def cleanup(self):
    """Stop and remove the Docker container."""
    if getattr(self, "container_id", None) is not None:  # if init fails early, container_id might not be set
        cmd = f"(timeout 60 {self.config.executable} stop {self.container_id} || {self.config.executable} rm -f {self.container_id}) >/dev/null 2>&1 &"
        subprocess.Popen(cmd, shell=True)