|
|
import asyncio |
|
|
import uuid |
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Dict, Optional, Set |
|
|
|
|
|
import docker |
|
|
from docker.errors import APIError, ImageNotFound |
|
|
|
|
|
from app.config import SandboxSettings |
|
|
from app.logger import logger |
|
|
from app.sandbox.core.sandbox import DockerSandbox |
|
|
|
|
|
|
|
|
class SandboxManager: |
|
|
"""Docker sandbox manager. |
|
|
|
|
|
Manages multiple DockerSandbox instances lifecycle including creation, |
|
|
monitoring, and cleanup. Provides concurrent access control and automatic |
|
|
cleanup mechanisms for sandbox resources. |
|
|
|
|
|
Attributes: |
|
|
max_sandboxes: Maximum allowed number of sandboxes. |
|
|
idle_timeout: Sandbox idle timeout in seconds. |
|
|
cleanup_interval: Cleanup check interval in seconds. |
|
|
_sandboxes: Active sandbox instance mapping. |
|
|
_last_used: Last used time record for sandboxes. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_sandboxes: int = 100, |
|
|
idle_timeout: int = 3600, |
|
|
cleanup_interval: int = 300, |
|
|
): |
|
|
"""Initializes sandbox manager. |
|
|
|
|
|
Args: |
|
|
max_sandboxes: Maximum sandbox count limit. |
|
|
idle_timeout: Idle timeout in seconds. |
|
|
cleanup_interval: Cleanup check interval in seconds. |
|
|
""" |
|
|
self.max_sandboxes = max_sandboxes |
|
|
self.idle_timeout = idle_timeout |
|
|
self.cleanup_interval = cleanup_interval |
|
|
|
|
|
|
|
|
self._client = docker.from_env() |
|
|
|
|
|
|
|
|
self._sandboxes: Dict[str, DockerSandbox] = {} |
|
|
self._last_used: Dict[str, float] = {} |
|
|
|
|
|
|
|
|
self._locks: Dict[str, asyncio.Lock] = {} |
|
|
self._global_lock = asyncio.Lock() |
|
|
self._active_operations: Set[str] = set() |
|
|
|
|
|
|
|
|
self._cleanup_task: Optional[asyncio.Task] = None |
|
|
self._is_shutting_down = False |
|
|
|
|
|
|
|
|
self.start_cleanup_task() |
|
|
|
|
|
async def ensure_image(self, image: str) -> bool: |
|
|
"""Ensures Docker image is available. |
|
|
|
|
|
Args: |
|
|
image: Image name. |
|
|
|
|
|
Returns: |
|
|
bool: Whether image is available. |
|
|
""" |
|
|
try: |
|
|
self._client.images.get(image) |
|
|
return True |
|
|
except ImageNotFound: |
|
|
try: |
|
|
logger.info(f"Pulling image {image}...") |
|
|
await asyncio.get_event_loop().run_in_executor( |
|
|
None, self._client.images.pull, image |
|
|
) |
|
|
return True |
|
|
except (APIError, Exception) as e: |
|
|
logger.error(f"Failed to pull image {image}: {e}") |
|
|
return False |
|
|
|
|
|
@asynccontextmanager |
|
|
async def sandbox_operation(self, sandbox_id: str): |
|
|
"""Context manager for sandbox operations. |
|
|
|
|
|
Provides concurrency control and usage time updates. |
|
|
|
|
|
Args: |
|
|
sandbox_id: Sandbox ID. |
|
|
|
|
|
Raises: |
|
|
KeyError: If sandbox not found. |
|
|
""" |
|
|
if sandbox_id not in self._locks: |
|
|
self._locks[sandbox_id] = asyncio.Lock() |
|
|
|
|
|
async with self._locks[sandbox_id]: |
|
|
if sandbox_id not in self._sandboxes: |
|
|
raise KeyError(f"Sandbox {sandbox_id} not found") |
|
|
|
|
|
self._active_operations.add(sandbox_id) |
|
|
try: |
|
|
self._last_used[sandbox_id] = asyncio.get_event_loop().time() |
|
|
yield self._sandboxes[sandbox_id] |
|
|
finally: |
|
|
self._active_operations.remove(sandbox_id) |
|
|
|
|
|
async def create_sandbox( |
|
|
self, |
|
|
config: Optional[SandboxSettings] = None, |
|
|
volume_bindings: Optional[Dict[str, str]] = None, |
|
|
) -> str: |
|
|
"""Creates a new sandbox instance. |
|
|
|
|
|
Args: |
|
|
config: Sandbox configuration. |
|
|
volume_bindings: Volume mapping configuration. |
|
|
|
|
|
Returns: |
|
|
str: Sandbox ID. |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If max sandbox count reached or creation fails. |
|
|
""" |
|
|
async with self._global_lock: |
|
|
if len(self._sandboxes) >= self.max_sandboxes: |
|
|
raise RuntimeError( |
|
|
f"Maximum number of sandboxes ({self.max_sandboxes}) reached" |
|
|
) |
|
|
|
|
|
config = config or SandboxSettings() |
|
|
if not await self.ensure_image(config.image): |
|
|
raise RuntimeError(f"Failed to ensure Docker image: {config.image}") |
|
|
|
|
|
sandbox_id = str(uuid.uuid4()) |
|
|
try: |
|
|
sandbox = DockerSandbox(config, volume_bindings) |
|
|
await sandbox.create() |
|
|
|
|
|
self._sandboxes[sandbox_id] = sandbox |
|
|
self._last_used[sandbox_id] = asyncio.get_event_loop().time() |
|
|
self._locks[sandbox_id] = asyncio.Lock() |
|
|
|
|
|
logger.info(f"Created sandbox {sandbox_id}") |
|
|
return sandbox_id |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create sandbox: {e}") |
|
|
if sandbox_id in self._sandboxes: |
|
|
await self.delete_sandbox(sandbox_id) |
|
|
raise RuntimeError(f"Failed to create sandbox: {e}") |
|
|
|
|
|
async def get_sandbox(self, sandbox_id: str) -> DockerSandbox: |
|
|
"""Gets a sandbox instance. |
|
|
|
|
|
Args: |
|
|
sandbox_id: Sandbox ID. |
|
|
|
|
|
Returns: |
|
|
DockerSandbox: Sandbox instance. |
|
|
|
|
|
Raises: |
|
|
KeyError: If sandbox does not exist. |
|
|
""" |
|
|
async with self.sandbox_operation(sandbox_id) as sandbox: |
|
|
return sandbox |
|
|
|
|
|
def start_cleanup_task(self) -> None: |
|
|
"""Starts automatic cleanup task.""" |
|
|
|
|
|
async def cleanup_loop(): |
|
|
while not self._is_shutting_down: |
|
|
try: |
|
|
await self._cleanup_idle_sandboxes() |
|
|
except Exception as e: |
|
|
logger.error(f"Error in cleanup loop: {e}") |
|
|
await asyncio.sleep(self.cleanup_interval) |
|
|
|
|
|
self._cleanup_task = asyncio.create_task(cleanup_loop()) |
|
|
|
|
|
async def _cleanup_idle_sandboxes(self) -> None: |
|
|
"""Cleans up idle sandboxes.""" |
|
|
current_time = asyncio.get_event_loop().time() |
|
|
to_cleanup = [] |
|
|
|
|
|
async with self._global_lock: |
|
|
for sandbox_id, last_used in self._last_used.items(): |
|
|
if ( |
|
|
sandbox_id not in self._active_operations |
|
|
and current_time - last_used > self.idle_timeout |
|
|
): |
|
|
to_cleanup.append(sandbox_id) |
|
|
|
|
|
for sandbox_id in to_cleanup: |
|
|
try: |
|
|
await self.delete_sandbox(sandbox_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Error cleaning up sandbox {sandbox_id}: {e}") |
|
|
|
|
|
async def cleanup(self) -> None: |
|
|
"""Cleans up all resources.""" |
|
|
logger.info("Starting manager cleanup...") |
|
|
self._is_shutting_down = True |
|
|
|
|
|
|
|
|
if self._cleanup_task: |
|
|
self._cleanup_task.cancel() |
|
|
try: |
|
|
await asyncio.wait_for(self._cleanup_task, timeout=1.0) |
|
|
except (asyncio.CancelledError, asyncio.TimeoutError): |
|
|
pass |
|
|
|
|
|
|
|
|
async with self._global_lock: |
|
|
sandbox_ids = list(self._sandboxes.keys()) |
|
|
|
|
|
|
|
|
cleanup_tasks = [] |
|
|
for sandbox_id in sandbox_ids: |
|
|
task = asyncio.create_task(self._safe_delete_sandbox(sandbox_id)) |
|
|
cleanup_tasks.append(task) |
|
|
|
|
|
if cleanup_tasks: |
|
|
|
|
|
try: |
|
|
await asyncio.wait(cleanup_tasks, timeout=30.0) |
|
|
except asyncio.TimeoutError: |
|
|
logger.error("Sandbox cleanup timed out") |
|
|
|
|
|
|
|
|
self._sandboxes.clear() |
|
|
self._last_used.clear() |
|
|
self._locks.clear() |
|
|
self._active_operations.clear() |
|
|
|
|
|
logger.info("Manager cleanup completed") |
|
|
|
|
|
async def _safe_delete_sandbox(self, sandbox_id: str) -> None: |
|
|
"""Safely deletes a single sandbox. |
|
|
|
|
|
Args: |
|
|
sandbox_id: Sandbox ID to delete. |
|
|
""" |
|
|
try: |
|
|
if sandbox_id in self._active_operations: |
|
|
logger.warning( |
|
|
f"Sandbox {sandbox_id} has active operations, waiting for completion" |
|
|
) |
|
|
for _ in range(10): |
|
|
await asyncio.sleep(0.5) |
|
|
if sandbox_id not in self._active_operations: |
|
|
break |
|
|
else: |
|
|
logger.warning( |
|
|
f"Timeout waiting for sandbox {sandbox_id} operations to complete" |
|
|
) |
|
|
|
|
|
|
|
|
sandbox = self._sandboxes.get(sandbox_id) |
|
|
if sandbox: |
|
|
await sandbox.cleanup() |
|
|
|
|
|
|
|
|
async with self._global_lock: |
|
|
self._sandboxes.pop(sandbox_id, None) |
|
|
self._last_used.pop(sandbox_id, None) |
|
|
self._locks.pop(sandbox_id, None) |
|
|
logger.info(f"Deleted sandbox {sandbox_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error during cleanup of sandbox {sandbox_id}: {e}") |
|
|
|
|
|
async def delete_sandbox(self, sandbox_id: str) -> None: |
|
|
"""Deletes specified sandbox. |
|
|
|
|
|
Args: |
|
|
sandbox_id: Sandbox ID. |
|
|
""" |
|
|
if sandbox_id not in self._sandboxes: |
|
|
return |
|
|
|
|
|
try: |
|
|
await self._safe_delete_sandbox(sandbox_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to delete sandbox {sandbox_id}: {e}") |
|
|
|
|
|
async def __aenter__(self) -> "SandboxManager": |
|
|
"""Async context manager entry.""" |
|
|
return self |
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: |
|
|
"""Async context manager exit.""" |
|
|
await self.cleanup() |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
"""Gets manager statistics. |
|
|
|
|
|
Returns: |
|
|
Dict: Statistics information. |
|
|
""" |
|
|
return { |
|
|
"total_sandboxes": len(self._sandboxes), |
|
|
"active_operations": len(self._active_operations), |
|
|
"max_sandboxes": self.max_sandboxes, |
|
|
"idle_timeout": self.idle_timeout, |
|
|
"cleanup_interval": self.cleanup_interval, |
|
|
"is_shutting_down": self._is_shutting_down, |
|
|
} |
|
|
|