Skip to content

Commit ed77e71

Browse files
committed
here goes my sunday
1 parent acefff9 commit ed77e71

7 files changed

Lines changed: 893 additions & 61 deletions

File tree

Lib/profiling/sampling/_control.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
"""Control runtime for the sampling profiler."""
2+
3+
import os
4+
import selectors
5+
import socket
6+
import warnings
7+
8+
from .errors import ControlError, ControlURIError
9+
10+
11+
class ProfilerControl:
12+
def __init__(self):
13+
self.enabled = True
14+
self.running = True
15+
self.sample_interval_usec = 0
16+
17+
18+
def parse_control_uri(uri, *, allowed_schemes=("unix",)):
19+
if ":" not in uri:
20+
raise ControlURIError("control URI must include a scheme")
21+
22+
scheme, path = uri.split(":", 1)
23+
if scheme not in allowed_schemes:
24+
expected = ", ".join(allowed_schemes)
25+
raise ControlURIError(
26+
f"unsupported control URI scheme {scheme!r}; "
27+
f"expected one of: {expected}"
28+
)
29+
if not path:
30+
raise ControlURIError("control URI path must not be empty")
31+
return scheme, path
32+
33+
34+
_MAX_OUTBUF_BYTES = 64 * 1024
35+
_MAX_INBUF_BYTES = 4 * 1024
36+
_MAX_CONNECTIONS = 8
37+
_SOCKET_PERMISSIONS = 0o600
38+
39+
40+
class _Connection:
41+
def __init__(self, sock):
42+
self.sock = sock
43+
self.inbuf = bytearray()
44+
self.outbuf = bytearray()
45+
self.close_after_write = False
46+
47+
48+
class ControlServer:
49+
def __init__(self, uri):
50+
self.uri = uri
51+
self.control = ProfilerControl()
52+
_, self._path = parse_control_uri(uri)
53+
self.selector = selectors.DefaultSelector()
54+
self._connections = set()
55+
self._listener = None
56+
self._created_stat = None
57+
58+
def start(self):
59+
self._listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
60+
try:
61+
self._listener.bind(self._path)
62+
os.chmod(self._path, _SOCKET_PERMISSIONS)
63+
self._created_stat = os.lstat(self._path)
64+
self._listener.listen(socket.SOMAXCONN)
65+
self._listener.setblocking(False)
66+
self.selector.register(self._listener, selectors.EVENT_READ, None)
67+
except OSError as exc:
68+
self._close_listener()
69+
raise ControlError(
70+
f"failed to start control socket {self._path!r}: {exc}"
71+
) from exc
72+
73+
def stop(self):
74+
for conn in list(self._connections):
75+
self._close_connection(conn)
76+
self.selector.close()
77+
self._close_listener()
78+
79+
def _close_listener(self):
80+
listener, self._listener = self._listener, None
81+
if listener is not None:
82+
listener.close()
83+
84+
created_stat, self._created_stat = self._created_stat, None
85+
if created_stat is None:
86+
return
87+
try:
88+
current_stat = os.lstat(self._path)
89+
if (current_stat.st_ino, current_stat.st_dev) == (
90+
created_stat.st_ino,
91+
created_stat.st_dev,
92+
):
93+
os.unlink(self._path)
94+
except FileNotFoundError:
95+
pass
96+
except OSError:
97+
pass
98+
99+
def poll(self, timeout):
100+
try:
101+
ready = self.selector.select(timeout)
102+
except OSError as exc:
103+
warnings.warn(
104+
f"control selector.select() failed: {exc}",
105+
RuntimeWarning,
106+
stacklevel=2,
107+
)
108+
return
109+
110+
for key, events in ready:
111+
if key.fileobj is self._listener:
112+
self._accept_connection()
113+
else:
114+
self._handle_connection(key.data, events)
115+
116+
def _accept_connection(self):
117+
try:
118+
sock, _addr = self._listener.accept()
119+
except BlockingIOError:
120+
return
121+
except OSError as exc:
122+
warnings.warn(
123+
f"control accept failed: {exc}",
124+
RuntimeWarning,
125+
stacklevel=2,
126+
)
127+
return
128+
129+
if len(self._connections) >= _MAX_CONNECTIONS:
130+
sock.close()
131+
return
132+
133+
try:
134+
sock.setblocking(False)
135+
conn = _Connection(sock)
136+
self.selector.register(sock, selectors.EVENT_READ, conn)
137+
except OSError:
138+
sock.close()
139+
return
140+
141+
self._connections.add(conn)
142+
143+
def _handle_connection(self, conn, events):
144+
if events & selectors.EVENT_READ:
145+
self._read_connection(conn)
146+
if conn in self._connections and events & selectors.EVENT_WRITE:
147+
self._flush_connection(conn)
148+
149+
def _read_connection(self, conn):
150+
try:
151+
chunk = conn.sock.recv(_MAX_INBUF_BYTES)
152+
except (BlockingIOError, InterruptedError):
153+
return
154+
except OSError:
155+
self._close_connection(conn)
156+
return
157+
158+
if not chunk:
159+
self._close_connection(conn)
160+
return
161+
162+
conn.inbuf.extend(chunk)
163+
if len(conn.inbuf) > _MAX_INBUF_BYTES:
164+
self._close_connection(conn)
165+
return
166+
167+
while True:
168+
newline = conn.inbuf.find(b"\n")
169+
if newline == -1:
170+
break
171+
raw = conn.inbuf.take_bytes(newline + 1)
172+
line = raw[:-1].decode("utf-8", "replace").strip()
173+
self._dispatch(conn, line)
174+
if conn not in self._connections or conn.close_after_write:
175+
break
176+
177+
if conn in self._connections:
178+
self._flush_connection(conn)
179+
180+
def _dispatch(self, conn, command):
181+
match command:
182+
case "enable":
183+
self.control.enabled = True
184+
reply = "ok\n"
185+
case "disable":
186+
self.control.enabled = False
187+
reply = "ok\n"
188+
case "ping":
189+
reply = "ok\n"
190+
case "status":
191+
reply = (
192+
f"ok enabled={self.control.enabled} "
193+
f"rate_usec={self.control.sample_interval_usec}\n"
194+
)
195+
case "quit":
196+
self.control.running = False
197+
conn.close_after_write = True
198+
reply = "ok\n"
199+
case _:
200+
reply = "err unknown_command\n"
201+
202+
conn.outbuf.extend(reply.encode("ascii"))
203+
if len(conn.outbuf) > _MAX_OUTBUF_BYTES:
204+
self._close_connection(conn)
205+
206+
def _flush_connection(self, conn):
207+
while conn.outbuf:
208+
try:
209+
sent = conn.sock.send(conn.outbuf)
210+
except (BlockingIOError, InterruptedError):
211+
break
212+
except OSError:
213+
self._close_connection(conn)
214+
return
215+
216+
if sent == 0:
217+
self._close_connection(conn)
218+
return
219+
220+
del conn.outbuf[:sent]
221+
222+
if not conn.outbuf and conn.close_after_write:
223+
self._close_connection(conn)
224+
return
225+
226+
events = selectors.EVENT_READ
227+
if conn.outbuf:
228+
events |= selectors.EVENT_WRITE
229+
try:
230+
self.selector.modify(conn.sock, events, conn)
231+
except (KeyError, OSError):
232+
self._close_connection(conn)
233+
234+
def _close_connection(self, conn):
235+
if conn not in self._connections:
236+
return
237+
self._connections.discard(conn)
238+
239+
try:
240+
self.selector.unregister(conn.sock)
241+
except (KeyError, OSError):
242+
pass
243+
244+
conn.sock.close()

0 commit comments

Comments
 (0)