Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions percy/screenshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,71 @@ def fetch_percy_dom():
return response.text


def _walk_nodes(node, closed_pairs):
"""Walk CDP DOM tree to find closed shadow roots, skipping iframe boundaries."""
if "contentDocument" in node:
return
if "shadowRoots" in node:
for sr in node["shadowRoots"]:
if sr.get("shadowRootType") == "closed":
closed_pairs.append({
"hostBackendNodeId": node["backendNodeId"],
"shadowBackendNodeId": sr["backendNodeId"]
})
_walk_nodes(sr, closed_pairs)
if "children" in node:
for child in node["children"]:
_walk_nodes(child, closed_pairs)


def expose_closed_shadow_roots(page):
"""Use CDP to discover closed shadow roots and expose them to PercyDOM.serialize().
Closed shadow roots are inaccessible from JS (element.shadowRoot === null),
but CDP's DOM domain can pierce them."""
cdp_session = None
try:
cdp_session = page.context.new_cdp_session(page)
except Exception as err:
log(f"CDP session unavailable: {err}", lvl="debug")
return

try:
cdp_session.send("DOM.enable")
doc_result = cdp_session.send("DOM.getDocument", {"depth": -1, "pierce": True})
root = doc_result["root"]

closed_pairs = []
_walk_nodes(root, closed_pairs)

if not closed_pairs:
return

log(f"Found {len(closed_pairs)} closed shadow root(s), exposing via CDP", lvl="debug")

page.evaluate("() => { window.__percyClosedShadowRoots = window.__percyClosedShadowRoots || new WeakMap(); }")

for pair in closed_pairs:
host_result = cdp_session.send("DOM.resolveNode", {"backendNodeId": pair["hostBackendNodeId"]})
host_object_id = host_result["object"]["objectId"]

shadow_result = cdp_session.send("DOM.resolveNode", {"backendNodeId": pair["shadowBackendNodeId"]})
shadow_object_id = shadow_result["object"]["objectId"]

cdp_session.send("Runtime.callFunctionOn", {
"functionDeclaration": "function(shadowRoot) { window.__percyClosedShadowRoots.set(this, shadowRoot); }",
"objectId": host_object_id,
"arguments": [{"objectId": shadow_object_id}]
})
except Exception as err:
log(f"Could not expose closed shadow roots via CDP: {err}", lvl="debug")
finally:
if cdp_session:
try:
cdp_session.detach()
except Exception:
pass


def process_frame(page, frame, options, percy_dom_script):
"""
Processes a single cross-origin frame to capture its snapshot and resources.
Expand Down Expand Up @@ -392,6 +457,10 @@ def percy_snapshot(page, name, **kwargs):
# Inject the DOM serialization script
percy_dom_script = fetch_percy_dom()
page.evaluate(percy_dom_script)

# Expose closed shadow roots via CDP before serialization
expose_closed_shadow_roots(page)

cookies = page.context.cookies()

# Serialize and capture the DOM
Expand Down
86 changes: 86 additions & 0 deletions tests/test_screenshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,5 +1290,91 @@ def test_create_region_with_invalid_algorithm(self):
self.assertEqual(result, expected_result)


class TestClosedShadowDOM(unittest.TestCase):
"""Tests for expose_closed_shadow_roots and _walk_nodes."""

def test_walk_nodes_finds_closed_shadow_roots(self):
from percy.screenshot import _walk_nodes
node = {
"backendNodeId": 1,
"shadowRoots": [
{"backendNodeId": 2, "shadowRootType": "closed", "children": []},
{"backendNodeId": 3, "shadowRootType": "open", "children": []}
],
"children": []
}
pairs = []
_walk_nodes(node, pairs)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["hostBackendNodeId"], 1)
self.assertEqual(pairs[0]["shadowBackendNodeId"], 2)

def test_walk_nodes_skips_content_document(self):
from percy.screenshot import _walk_nodes
node = {
"backendNodeId": 1,
"contentDocument": {"backendNodeId": 2, "children": [
{"backendNodeId": 3, "shadowRoots": [
{"backendNodeId": 4, "shadowRootType": "closed", "children": []}
], "children": []}
]},
"children": []
}
pairs = []
_walk_nodes(node, pairs)
self.assertEqual(len(pairs), 0)

def test_expose_non_chromium_browser(self):
from percy.screenshot import expose_closed_shadow_roots
page = MagicMock()
page.context.new_cdp_session.side_effect = Exception("Not Chromium")
# Should not throw
expose_closed_shadow_roots(page)

def test_expose_no_closed_roots(self):
from percy.screenshot import expose_closed_shadow_roots
page = MagicMock()
cdp = MagicMock()
page.context.new_cdp_session.return_value = cdp
cdp.send.side_effect = lambda method, params=None: (
{"root": {"backendNodeId": 1, "children": []}} if method == "DOM.getDocument" else None
)
expose_closed_shadow_roots(page)
cdp.detach.assert_called_once()
page.evaluate.assert_not_called()

def test_expose_closed_roots_found(self):
from percy.screenshot import expose_closed_shadow_roots
page = MagicMock()
cdp = MagicMock()
page.context.new_cdp_session.return_value = cdp

def cdp_send(method, params=None):
if method == "DOM.getDocument":
return {"root": {"backendNodeId": 1, "children": [
{"backendNodeId": 10, "shadowRoots": [
{"backendNodeId": 20, "shadowRootType": "closed", "children": []}
], "children": []}
]}}
if method == "DOM.resolveNode":
return {"object": {"objectId": f"obj-{params['backendNodeId']}"}}
return None

cdp.send.side_effect = cdp_send
expose_closed_shadow_roots(page)
page.evaluate.assert_called_once()
cdp.detach.assert_called_once()

def test_expose_cdp_error_non_fatal(self):
from percy.screenshot import expose_closed_shadow_roots
page = MagicMock()
cdp = MagicMock()
page.context.new_cdp_session.return_value = cdp
cdp.send.side_effect = Exception("CDP failed")
# Should not throw
expose_closed_shadow_roots(page)
cdp.detach.assert_called_once()


if __name__ == "__main__":
unittest.main()
Loading