-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker_template.py
More file actions
193 lines (162 loc) · 6.84 KB
/
worker_template.py
File metadata and controls
193 lines (162 loc) · 6.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
worker_template.py
CoW-aware FaaS worker template (preloader) process.
Architecture:
1. On startup, this process performs the ONE-TIME cold start:
it imports all heavy dependencies (Pillow/OpenCV) and simulates
AI model weight loading (~800ms). This is the only true cold start.
2. It then signals the C++ gateway that it is ready (via a pipe fd).
3. It listens on a control Unix socket for fork requests from the gateway.
4. For each request, it fork()s a worker child process.
The child inherits all loaded library pages via Linux Copy-on-Write (CoW),
so the child starts serving in ~20ms instead of ~800ms.
5. SIGCHLD is set to SIG_IGN so worker children are auto-reaped on exit.
Usage (invoked by WorkerPool.hpp):
python3 worker_template.py <ready_pipe_write_fd>
CSCI 599: Network Systems for Cloud Computing
University of Southern California
"""
import sys
import os
import socket
import signal
import time
import base64
import io
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CTRL_SOCK_PATH = "/tmp/faas_template_ctrl.sock"
# ---------------------------------------------------------------------------
# Step 1: ONE-TIME cold start — import heavy dependencies BEFORE any fork().
# All worker children will inherit these memory pages via CoW.
# ---------------------------------------------------------------------------
print("[Template] Cold-starting: loading dependencies...", flush=True)
time.sleep(0.8) # Simulate heavy AI model / library initialization (e.g. PyTorch weights)
from PIL import Image, ImageOps
print("[Template] Dependencies loaded. CoW baseline established.", flush=True)
# ---------------------------------------------------------------------------
# Worker payload (same logic as original worker.py, but lives here so that
# the forked child inherits the already-imported PIL module via CoW).
# ---------------------------------------------------------------------------
def _process_image(base64_str: str) -> str:
"""Decode Base64 image -> invert colors -> re-encode Base64."""
if "base64," in base64_str:
base64_str = base64_str.split("base64,")[1]
image_data = base64.b64decode(base64_str)
image = Image.open(io.BytesIO(image_data)).convert('RGB')
inverted = ImageOps.invert(image)
buf = io.BytesIO()
inverted.save(buf, format="JPEG")
return base64.b64encode(buf.getvalue()).decode('utf-8')
def _run_worker(sock_path: str) -> None:
"""Worker main loop: serve requests on sock_path until killed."""
if os.path.exists(sock_path):
os.remove(sock_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(sock_path)
server.listen(5)
print(f"[Worker pid={os.getpid()}] Listening on {sock_path}", flush=True)
while True:
conn, _ = server.accept()
data = b""
while True:
chunk = conn.recv(8192)
if not chunk:
break
data += chunk
if len(chunk) < 8192:
break
if not data:
conn.close()
continue
print(f"[Worker pid={os.getpid()}] Processing request", flush=True)
time.sleep(0.5) # Simulate inference time (~500ms per request)
try:
req_text = data.decode('utf-8', errors='ignore')
body = (req_text.split("\r\n\r\n", 1)[1].strip()
if "\r\n\r\n" in req_text else req_text.strip())
try:
result_body = _process_image(body)
except Exception:
result_body = f"Fallback: {body[::-1]}"
http_response = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
f"Content-Length: {len(result_body)}\r\n"
"Connection: close\r\n\r\n"
f"{result_body}"
)
conn.sendall(http_response.encode('utf-8'))
except Exception as e:
print(f"[Worker pid={os.getpid()}] Error: {e}", flush=True)
finally:
conn.close()
# ---------------------------------------------------------------------------
# Template main loop
# ---------------------------------------------------------------------------
def _run_template(ready_fd: int) -> None:
"""
Signal readiness to the C++ gateway, then serve fork requests.
Protocol (per connection):
C++ sends: "<sock_path>\n"
Template: fork() -> worker child runs _run_worker(sock_path)
Template sends back: "<worker_pid>\n"
"""
# Auto-reap worker children to prevent zombies.
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
# Set up control socket BEFORE signalling ready, so C++ can connect immediately.
if os.path.exists(CTRL_SOCK_PATH):
os.remove(CTRL_SOCK_PATH)
ctrl = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
ctrl.bind(CTRL_SOCK_PATH)
ctrl.listen(64)
# Signal the C++ gateway: template is ready, workers can now be forked cheaply.
os.write(ready_fd, b"READY\n")
os.close(ready_fd)
print("[Template] Signalled READY to gateway. Awaiting fork requests.", flush=True)
while True:
try:
conn, _ = ctrl.accept()
except Exception:
break
try:
raw = conn.recv(256).decode('utf-8', errors='ignore').strip()
if not raw:
conn.close()
continue
sock_path = raw
# Fork a worker child. Child inherits Pillow pages via CoW.
pid = os.fork()
if pid == 0:
# ---- WORKER CHILD ----
# Close the template's control socket; child doesn't need it.
ctrl.close()
conn.close()
_run_worker(sock_path)
sys.exit(0)
else:
# ---- TEMPLATE PARENT ----
# Send the worker PID back to C++ so it can track / kill the worker.
conn.sendall(f"{pid}\n".encode('utf-8'))
conn.close()
print(f"[Template] Forked worker pid={pid} -> {sock_path}", flush=True)
except Exception as e:
print(f"[Template] Error handling fork request: {e}", flush=True)
try:
conn.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python3 worker_template.py <ready_pipe_write_fd>", flush=True)
sys.exit(1)
try:
ready_fd = int(sys.argv[1])
except ValueError:
print("[Template] Invalid ready_fd argument.", flush=True)
sys.exit(1)
_run_template(ready_fd)