Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions agb/modules/browser/eval/local_page_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import logging
import os
import stat
import tempfile
import concurrent.futures
from typing import Dict, Any
from playwright.async_api import async_playwright
Expand Down Expand Up @@ -148,6 +150,8 @@ def __init__(self, session=None):
self._cdp_port = 9222
self.agent: LocalPageAgent = LocalPageAgent(session, self)
self._worker_thread = None
self._cdp_ports_path = None
self._user_data_dir = None

async def initialize_async(self, options: BrowserOption) -> bool:
if (self._worker_thread is None):
Expand All @@ -158,11 +162,26 @@ async def _launch_local_browser() -> None:
logger.info("Start launching local browser")
try:
async with async_playwright() as p:
# Define CDP port
# Recreate /tmp/chrome_cdp_ports.json with the required content
chrome_cdp_ports_path = "/tmp/chrome_cdp_ports.json"
with open(chrome_cdp_ports_path, "w") as f:
json.dump({"chrome": str(self._cdp_port), "router": str(self._cdp_port)}, f)
# Create a secure per-session temporary directory
# for browser data (owner-only permissions)
self._user_data_dir = tempfile.mkdtemp(
prefix="browser_user_data_"
)
os.chmod(self._user_data_dir, stat.S_IRWXU)

# Write CDP port config to a secure temporary file
# using mkstemp to avoid symlink attacks (CWE-377)
fd, self._cdp_ports_path = tempfile.mkstemp(
prefix="chrome_cdp_ports_", suffix=".json"
)
try:
with os.fdopen(fd, "w") as f:
json.dump({"chrome": str(self._cdp_port), "router": str(self._cdp_port)}, f)
except Exception:
os.close(fd)
raise
# Restrict file permissions to owner-only read/write
os.chmod(self._cdp_ports_path, stat.S_IRUSR | stat.S_IWUSR)

# Launch headless browser and create a page for all tests
self._browser = await p.chromium.launch_persistent_context(
Expand All @@ -171,7 +190,7 @@ async def _launch_local_browser() -> None:
args=[
f'--remote-debugging-port={self._cdp_port}',
],
user_data_dir="/tmp/browser_user_data")
user_data_dir=self._user_data_dir)

logger.info("Local browser launched successfully:")
success = True
Expand Down Expand Up @@ -205,4 +224,4 @@ def __init__(self):
self.browser = LocalBrowser(self)

def delete(self, sync_context: bool = False) -> None:
pass
pass
43 changes: 31 additions & 12 deletions python/agb/modules/browser/eval/local_page_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import logging
import os
import stat
import tempfile
import concurrent.futures
from typing import Dict, Any
from playwright.async_api import async_playwright
Expand Down Expand Up @@ -187,6 +189,8 @@ def __init__(self, session=None):
self._cdp_port = 9222
self.agent: LocalPageAgent = LocalPageAgent(session, self)
self._worker_thread = None
self._cdp_ports_path = None
self._user_data_dir = None

async def initialize_async(self, options: BrowserOption) -> bool:
if self._worker_thread is None:
Expand All @@ -198,17 +202,32 @@ async def _launch_local_browser() -> None:
logger.info("Start launching local browser")
try:
async with async_playwright() as p:
# Define CDP port
# Recreate /tmp/chrome_cdp_ports.json with the required content
chrome_cdp_ports_path = "/tmp/chrome_cdp_ports.json"
with open(chrome_cdp_ports_path, "w") as f:
json.dump(
{
"chrome": str(self._cdp_port),
"router": str(self._cdp_port),
},
f,
)
# Create a secure per-session temporary directory
# for browser data (owner-only permissions)
self._user_data_dir = tempfile.mkdtemp(
prefix="browser_user_data_"
)
os.chmod(self._user_data_dir, stat.S_IRWXU)

# Write CDP port config to a secure temporary file
# using mkstemp to avoid symlink attacks (CWE-377)
fd, self._cdp_ports_path = tempfile.mkstemp(
prefix="chrome_cdp_ports_", suffix=".json"
)
try:
with os.fdopen(fd, "w") as f:
json.dump(
{
"chrome": str(self._cdp_port),
"router": str(self._cdp_port),
},
f,
)
except Exception:
os.close(fd)
raise
# Restrict file permissions to owner-only read/write
os.chmod(self._cdp_ports_path, stat.S_IRUSR | stat.S_IWUSR)

# Launch headless browser and create a page for all tests
self._browser = await p.chromium.launch_persistent_context(
Expand All @@ -217,7 +236,7 @@ async def _launch_local_browser() -> None:
args=[
f"--remote-debugging-port={self._cdp_port}",
],
user_data_dir="/tmp/browser_user_data",
user_data_dir=self._user_data_dir,
)

logger.info("Local browser launched successfully:")
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/test_cwe377_local_browser_temp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
PoC test for CWE-377: Insecure fixed-path temporary file write in LocalBrowser.

Demonstrates that LocalBrowser.initialize_async() writes CDP port config to
a hardcoded /tmp/chrome_cdp_ports.json path without symlink protection or
restrictive permissions, and uses a hardcoded world-readable user data dir.
"""

import inspect
import json
import os
import stat
import tempfile
import unittest
from unittest.mock import AsyncMock, MagicMock, patch


class TestCWE377LocalBrowserInsecureTempFile(unittest.TestCase):
"""Test that LocalBrowser does NOT write to predictable /tmp paths."""

@classmethod
def setUpClass(cls):
try:
import importlib
cls.mod = importlib.import_module("agb.modules.browser.eval.local_page_agent")
except ImportError:
cls.mod = None

def test_cdp_ports_path_not_hardcoded(self):
"""The CDP ports file path should not be the hardcoded /tmp/chrome_cdp_ports.json."""
if self.mod is None:
self.skipTest("Cannot import local_page_agent module")

source = inspect.getsource(self.mod.LocalBrowser)
# The fix should remove the hardcoded /tmp/chrome_cdp_ports.json path
self.assertNotIn(
'"/tmp/chrome_cdp_ports.json"',
source,
"CDP ports file should not use a hardcoded /tmp path (CWE-377 symlink attack vector)",
)
self.assertNotIn(
"'/tmp/chrome_cdp_ports.json'",
source,
"CDP ports file should not use a hardcoded /tmp path (CWE-377 symlink attack vector)",
)

def test_user_data_dir_not_hardcoded_tmp(self):
"""The browser user data dir should not be hardcoded to /tmp/browser_user_data."""
if self.mod is None:
self.skipTest("Cannot import local_page_agent module")

source = inspect.getsource(self.mod.LocalBrowser)
self.assertNotIn(
'"/tmp/browser_user_data"',
source,
"Browser user data dir should not use a hardcoded /tmp path (world-readable)",
)
self.assertNotIn(
"'/tmp/browser_user_data'",
source,
"Browser user data dir should not use a hardcoded /tmp path (world-readable)",
)

def test_module_uses_tempfile(self):
"""
After the fix, the module should use Python's tempfile module
for secure temporary file/directory creation.
"""
if self.mod is None:
self.skipTest("Cannot import local_page_agent module")

source = inspect.getsource(self.mod)
self.assertIn(
"tempfile",
source,
"Module should use tempfile for secure temporary file creation",
)


if __name__ == "__main__":
unittest.main()