-
Notifications
You must be signed in to change notification settings - Fork 1
🛡️ Sentinel: [CRITICAL] Fix Path Traversal via Folder ID #310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
becb937
1666172
e731ebe
0794fdf
e4899c7
53bca53
94119f4
c7874a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,6 +183,9 @@ | |
|
|
||
| # Pre-compiled regex patterns for hot-path validation (>2x speedup on 10k+ items) | ||
| PROFILE_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$") | ||
| # Folder IDs (PK) are typically alphanumeric but can contain other safe chars. | ||
| # We whitelist to prevent path traversal and injection. | ||
| FOLDER_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_.-]+$") | ||
| RULE_PATTERN = re.compile(r"^[a-zA-Z0-9.\-_:*/@]+$") | ||
|
|
||
| # Parallel processing configuration | ||
|
|
@@ -737,6 +740,17 @@ | |
| return True | ||
|
|
||
|
|
||
| def validate_folder_id(folder_id: str, log_errors: bool = True) -> bool: | ||
| """Validates folder ID (PK) format to prevent path traversal.""" | ||
| if not folder_id: | ||
| return False | ||
| if folder_id in (".", "..") or not FOLDER_ID_PATTERN.match(folder_id): | ||
| if log_errors: | ||
| log.error(f"Invalid folder ID format: {sanitize_for_log(folder_id)}") | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def is_valid_rule(rule: str) -> bool: | ||
| """ | ||
| Validates that a rule is safe to use. | ||
|
|
@@ -1105,11 +1119,14 @@ | |
| try: | ||
| data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() | ||
| folders = data.get("body", {}).get("groups", []) | ||
| return { | ||
| f["group"].strip(): f["PK"] | ||
| for f in folders | ||
| if f.get("group") and f.get("PK") | ||
| } | ||
| result = {} | ||
| for f in folders: | ||
| if not f.get("group") or not f.get("PK"): | ||
| continue | ||
| pk = str(f["PK"]) | ||
| if validate_folder_id(pk): | ||
| result[f["group"].strip()] = pk | ||
| return result | ||
| except (httpx.HTTPError, KeyError) as e: | ||
| log.error(f"Failed to list existing folders: {sanitize_for_log(e)}") | ||
| return {} | ||
|
|
@@ -1173,7 +1190,12 @@ | |
| # Skip entries with empty or None values for required fields | ||
| if not name or not pk: | ||
| continue | ||
| result[str(name).strip()] = str(pk) | ||
|
|
||
| pk_str = str(pk) | ||
| if not validate_folder_id(pk_str): | ||
| continue | ||
|
|
||
| result[str(name).strip()] = pk_str | ||
|
|
||
| return result | ||
| except (ValueError, TypeError, AttributeError) as err: | ||
|
|
@@ -1373,87 +1395,98 @@ | |
| return False | ||
|
|
||
|
|
||
| def create_folder( | ||
| client: httpx.Client, profile_id: str, name: str, do: int, status: int | ||
| ) -> Optional[str]: | ||
| """ | ||
| Create a new folder and return its ID. | ||
| Attempts to read ID from response first, then falls back to polling. | ||
| """ | ||
| try: | ||
| # 1. Send the Create Request | ||
| response = _api_post( | ||
| client, | ||
| f"{API_BASE}/{profile_id}/groups", | ||
| data={"name": name, "do": do, "status": status}, | ||
| ) | ||
|
|
||
| # OPTIMIZATION: Try to grab ID directly from response to avoid the wait loop | ||
| try: | ||
| resp_data = response.json() | ||
| body = resp_data.get("body", {}) | ||
|
|
||
| # Check if it returned a single group object | ||
| if isinstance(body, dict) and "group" in body and "PK" in body["group"]: | ||
| pk = body["group"]["PK"] | ||
| pk = str(body["group"]["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| return None | ||
|
Comment on lines
+1420
to
+1423
|
||
| log.info( | ||
| "Created folder %s (ID %s) [Direct]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(pk), | ||
| ) | ||
| return str(pk) | ||
| return pk | ||
|
|
||
| # Check if it returned a list containing our group | ||
| if isinstance(body, dict) and "groups" in body: | ||
| for grp in body["groups"]: | ||
| if grp.get("group") == name: | ||
| pk = str(grp["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| continue | ||
| log.info( | ||
| "Created folder %s (ID %s) [Direct]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(grp["PK"]), | ||
| sanitize_for_log(pk), | ||
| ) | ||
| return str(grp["PK"]) | ||
| return pk | ||
| except Exception as e: | ||
| log.debug( | ||
| f"Could not extract ID from POST response: " f"{sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| # 2. Fallback: Poll for the new folder (The Robust Retry Logic) | ||
| for attempt in range(MAX_RETRIES + 1): | ||
| try: | ||
| data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() | ||
| groups = data.get("body", {}).get("groups", []) | ||
|
|
||
| for grp in groups: | ||
| if grp["group"].strip() == name.strip(): | ||
| pk = str(grp["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| return None | ||
| log.info( | ||
| "Created folder %s (ID %s) [Polled]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(grp["PK"]), | ||
| sanitize_for_log(pk), | ||
| ) | ||
| return str(grp["PK"]) | ||
| return pk | ||
| except Exception as e: | ||
| log.warning( | ||
| f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| if attempt < MAX_RETRIES: | ||
| wait_time = FOLDER_CREATION_DELAY * (attempt + 1) | ||
| log.info( | ||
| f"Folder '{sanitize_for_log(name)}' not found yet. Retrying in {wait_time}s..." | ||
| ) | ||
| time.sleep(wait_time) | ||
|
|
||
| log.error( | ||
| f"Folder {sanitize_for_log(name)} was not found after creation and retries." | ||
| ) | ||
| return None | ||
|
|
||
| except (httpx.HTTPError, KeyError) as e: | ||
| log.error( | ||
| f"Failed to create folder {sanitize_for_log(name)}: {sanitize_for_log(e)}" | ||
| ) | ||
| return None | ||
|
|
||
|
|
||
| def push_rules( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import importlib | ||
| import sys | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| def reload_main_with_env(monkeypatch): | ||
| monkeypatch.delenv("NO_COLOR", raising=False) | ||
| with patch("sys.stderr") as mock_stderr, patch("sys.stdout") as mock_stdout: | ||
| mock_stderr.isatty.return_value = True | ||
| mock_stdout.isatty.return_value = True | ||
|
|
||
| module = sys.modules.get("main") | ||
| if module is None: | ||
| module = importlib.import_module("main") | ||
|
|
||
| importlib.reload(module) | ||
| return module | ||
|
|
||
|
|
||
| def test_verify_access_and_get_folders_filters_malicious_ids(monkeypatch): | ||
| """ | ||
| Verify that verify_access_and_get_folders filters out malicious Folder IDs | ||
| containing path traversal characters (../). | ||
| """ | ||
| m = reload_main_with_env(monkeypatch) | ||
| mock_client = MagicMock() | ||
|
|
||
| # Malicious Folder ID with path traversal | ||
| malicious_id = "../../etc/passwd" | ||
| # Malicious Folder ID with dangerous characters | ||
| malicious_id_2 = "foo;rm -rf /" | ||
|
abhimehro marked this conversation as resolved.
|
||
|
|
||
| mock_response = MagicMock() | ||
| mock_response.json.return_value = { | ||
| "body": { | ||
| "groups": [ | ||
| {"group": "Safe Folder", "PK": "safe_id_123"}, | ||
| {"group": "Safe Folder 2", "PK": "safe-id-456_789"}, | ||
| {"group": "Malicious Folder", "PK": malicious_id}, | ||
| {"group": "Malicious Folder 2", "PK": malicious_id_2} | ||
| ] | ||
| } | ||
| } | ||
| mock_client.get.return_value = mock_response | ||
| mock_response.raise_for_status.return_value = None | ||
|
|
||
| # Function should filter out malicious IDs | ||
| result = m.verify_access_and_get_folders(mock_client, "valid_profile") | ||
|
|
||
| assert result is not None | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
|
|
||
| # Check that valid IDs are preserved | ||
| assert "Safe Folder" in result | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
| assert result["Safe Folder"] == "safe_id_123" | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
| assert "Safe Folder 2" in result | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
| assert result["Safe Folder 2"] == "safe-id-456_789" | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
|
|
||
| # Check that malicious IDs are removed | ||
| assert "Malicious Folder" not in result | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
| assert "Malicious Folder 2" not in result | ||
Check noticeCode scanning / Bandit Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate_folder_id()currently allows.and..becauseFOLDER_ID_PATTERNpermits dots. If a malicious API response returns.., it will pass validation and later be interpolated into URL path segments (e.g.,/groups/{folder_id}), reintroducing a dot-segment traversal vector on servers/proxies that normalize paths. Add an explicit block forfolder_id.strip()in (.,..) (similar tois_valid_folder_name) before the regex check.