-
Notifications
You must be signed in to change notification settings - Fork 121
Expand file tree
/
Copy pathmain.py
More file actions
305 lines (266 loc) · 10.9 KB
/
main.py
File metadata and controls
305 lines (266 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env python3
"""
DomainFront Tunnel — Bypass DPI censorship via Google Apps Script.
Run a local HTTP proxy that tunnels all traffic through a Google Apps
Script relay fronted by www.google.com (TLS SNI shows www.google.com
while the encrypted Host header points at script.google.com).
"""
import argparse
import asyncio
import json
import logging
import os
import sys
# Project modules live under ./src — put that folder on sys.path so the
# historical flat imports ("from proxy_server import …") keep working.
_SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src")
if _SRC_DIR not in sys.path:
sys.path.insert(0, _SRC_DIR)
from cert_installer import install_ca, is_ca_trusted
from constants import __version__
from lan_utils import log_lan_access
from google_ip_scanner import scan_sync
from logging_utils import configure as configure_logging, print_banner
from mitm import CA_CERT_FILE
from proxy_server import ProxyServer
def setup_logging(level_name: str):
configure_logging(level_name)
_PLACEHOLDER_AUTH_KEYS = {
"",
"CHANGE_ME_TO_A_STRONG_SECRET",
"your-secret-password-here",
}
def parse_args():
parser = argparse.ArgumentParser(
prog="domainfront-tunnel",
description="Local HTTP proxy that relays traffic through Google Apps Script.",
)
parser.add_argument(
"-c", "--config",
default=os.environ.get("DFT_CONFIG", "config.json"),
help="Path to config file (default: config.json, env: DFT_CONFIG)",
)
parser.add_argument(
"-p", "--port",
type=int,
default=None,
help="Override listen port (env: DFT_PORT)",
)
parser.add_argument(
"--host",
default=None,
help="Override listen host (env: DFT_HOST)",
)
parser.add_argument(
"--socks5-port",
type=int,
default=None,
help="Override SOCKS5 listen port (env: DFT_SOCKS5_PORT)",
)
parser.add_argument(
"--disable-socks5",
action="store_true",
help="Disable the built-in SOCKS5 listener.",
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default=None,
help="Override log level (env: DFT_LOG_LEVEL)",
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"%(prog)s {__version__}",
)
parser.add_argument(
"--install-cert",
action="store_true",
help="Install the MITM CA certificate as a trusted root and exit.",
)
parser.add_argument(
"--no-cert-check",
action="store_true",
help="Skip the certificate installation check on startup.",
)
parser.add_argument(
"--scan",
action="store_true",
help="Scan Google IPs to find the fastest reachable one and exit.",
)
return parser.parse_args()
def main():
args = parse_args()
config_path = args.config
try:
with open(config_path) as f:
config = json.load(f)
except FileNotFoundError:
print(f"Config not found: {config_path}")
# Offer the interactive wizard if it's available and we're on a TTY.
wizard = os.path.join(os.path.dirname(os.path.abspath(__file__)), "setup.py")
if os.path.exists(wizard) and sys.stdin.isatty():
try:
answer = input("Run the interactive setup wizard now? [Y/n]: ").strip().lower()
except EOFError:
answer = "n"
if answer in ("", "y", "yes"):
import subprocess
rc = subprocess.call([sys.executable, wizard])
if rc != 0:
sys.exit(rc)
try:
with open(config_path) as f:
config = json.load(f)
except Exception as e:
print(f"Could not load config after setup: {e}")
sys.exit(1)
else:
print("Copy config.example.json to config.json and fill in your values,")
print("or run: python setup.py")
sys.exit(1)
else:
print("Run: python setup.py (or copy config.example.json to config.json)")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Invalid JSON in config: {e}")
sys.exit(1)
# Environment variable overrides
if os.environ.get("DFT_AUTH_KEY"):
config["auth_key"] = os.environ["DFT_AUTH_KEY"]
if os.environ.get("DFT_SCRIPT_ID"):
config["script_id"] = os.environ["DFT_SCRIPT_ID"]
# CLI argument overrides
if args.port is not None:
config["listen_port"] = args.port
elif os.environ.get("DFT_PORT"):
config["listen_port"] = int(os.environ["DFT_PORT"])
if args.host is not None:
config["listen_host"] = args.host
elif os.environ.get("DFT_HOST"):
config["listen_host"] = os.environ["DFT_HOST"]
if args.socks5_port is not None:
config["socks5_port"] = args.socks5_port
elif os.environ.get("DFT_SOCKS5_PORT"):
config["socks5_port"] = int(os.environ["DFT_SOCKS5_PORT"])
if args.disable_socks5:
config["socks5_enabled"] = False
if args.log_level is not None:
config["log_level"] = args.log_level
elif os.environ.get("DFT_LOG_LEVEL"):
config["log_level"] = os.environ["DFT_LOG_LEVEL"]
for key in ("auth_key",):
if key not in config:
print(f"Missing required config key: {key}")
sys.exit(1)
if config.get("auth_key", "") in _PLACEHOLDER_AUTH_KEYS:
print(
"Refusing to start: 'auth_key' is unset or uses a known placeholder.\n"
"Pick a long random secret and set it in both config.json AND "
"the AUTH_KEY constant inside Code.gs (they must match)."
)
sys.exit(1)
# Always Apps Script mode — force-set for backward-compat configs.
config["mode"] = "apps_script"
sid = config.get("script_ids") or config.get("script_id")
if not sid or (isinstance(sid, str) and sid == "YOUR_APPS_SCRIPT_DEPLOYMENT_ID"):
print("Missing 'script_id' in config.")
print("Deploy the Apps Script from Code.gs and paste the Deployment ID.")
sys.exit(1)
# ── Certificate installation ──────────────────────────────────────────
if args.install_cert:
setup_logging("INFO")
_log = logging.getLogger("Main")
_log.info("Installing CA certificate…")
ok = install_ca(CA_CERT_FILE)
sys.exit(0 if ok else 1)
# ── Google IP Scanner ──────────────────────────────────────────────────
if args.scan:
setup_logging("INFO")
front_domain = config.get("front_domain", "www.google.com")
_log = logging.getLogger("Main")
_log.info(f"Scanning Google IPs (fronting domain: {front_domain})")
ok = scan_sync(front_domain)
sys.exit(0 if ok else 1)
setup_logging(config.get("log_level", "INFO"))
log = logging.getLogger("Main")
print_banner(__version__)
log.info("DomainFront Tunnel starting (Apps Script relay)")
log.info("Apps Script relay : SNI=%s → script.google.com",
config.get("front_domain", "www.google.com"))
script_ids = config.get("script_ids") or config.get("script_id")
if isinstance(script_ids, list):
log.info("Script IDs : %d scripts (sticky per-host)", len(script_ids))
for i, sid in enumerate(script_ids):
log.info(" [%d] %s", i + 1, sid)
else:
log.info("Script ID : %s", script_ids)
# Ensure CA file exists before checking / installing it.
# MITMCertManager generates ca/ca.crt on first instantiation.
if not os.path.exists(CA_CERT_FILE):
from mitm import MITMCertManager
MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key
# Auto-install MITM CA if not already trusted
if not args.no_cert_check:
if not is_ca_trusted(CA_CERT_FILE):
log.warning("MITM CA is not trusted — attempting automatic installation…")
ok = install_ca(CA_CERT_FILE)
if ok:
log.info("CA certificate installed. You may need to restart your browser.")
else:
log.error(
"Auto-install failed. Run with --install-cert (may need admin/sudo) "
"or manually install ca/ca.crt as a trusted root CA."
)
else:
log.info("MITM CA is already trusted.")
# ── LAN sharing configuration ────────────────────────────────────────
lan_sharing = config.get("lan_sharing", False)
if lan_sharing:
# If LAN sharing is enabled and host is still localhost, change to all interfaces
if config.get("listen_host", "127.0.0.1") == "127.0.0.1":
config["listen_host"] = "0.0.0.0"
log.info("LAN sharing enabled — listening on all interfaces")
log.info("HTTP proxy : %s:%d",
config.get("listen_host", "127.0.0.1"),
config.get("listen_port", 8080))
if config.get("socks5_enabled", True):
log.info("SOCKS5 proxy : %s:%d",
config.get("socks5_host", config.get("listen_host", "127.0.0.1")),
config.get("socks5_port", 1080))
# Log LAN access addresses if sharing is enabled
if lan_sharing:
socks_port = config.get("socks5_port", 1080) if config.get("socks5_enabled", True) else None
log_lan_access(config.get("listen_port", 8080), socks_port)
try:
asyncio.run(_run(config))
except KeyboardInterrupt:
log.info("Stopped")
def _make_exception_handler(log):
"""Return an asyncio exception handler that silences Windows WinError 10054
noise from connection cleanup (ConnectionResetError in
_ProactorBasePipeTransport._call_connection_lost), which is harmless but
verbose on Python/Windows when a remote host force-closes a socket."""
def handler(loop, context):
exc = context.get("exception")
cb = context.get("handle") or context.get("source_traceback", "")
if (
isinstance(exc, ConnectionResetError)
and "_call_connection_lost" in str(cb)
):
return # suppress: benign Windows socket cleanup race
log.error("[asyncio] %s", context.get("message", context))
if exc:
loop.default_exception_handler(context)
return handler
async def _run(config):
loop = asyncio.get_running_loop()
_log = logging.getLogger("asyncio")
loop.set_exception_handler(_make_exception_handler(_log))
server = ProxyServer(config)
try:
await server.start()
finally:
await server.stop()
if __name__ == "__main__":
main()