-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_https.py
More file actions
205 lines (165 loc) · 7.5 KB
/
test_https.py
File metadata and controls
205 lines (165 loc) · 7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import asyncio
from tcpip.drivers.macos_driver import MacOSDriver
from tcpip.interface.stack_controller import StackController
from tcpip.logging_config import get_main_logger, setup_logging
log_level = "INFO"
setup_logging(log_level)
logger = get_main_logger()
async def test_https():
drv = MacOSDriver("en0")
stack = StackController(drv)
await stack.start()
try:
logger.info("Testing DNS resolution for example.com...")
ips = await stack.resolve_dns("example.com")
if not ips:
logger.warning("DNS resolution failed, using fallback IP")
resolved_ip = "93.184.216.34"
else:
resolved_ip = ips[0]
logger.info("Resolved example.com to: %s", resolved_ip)
logger.info("Creating HTTPS connection to example.com:443...")
async with await stack.tls_connect("example.com", 443, verify_ssl=True) as tls:
logger.info("TLS handshake completed successfully!")
https_req = (
b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
)
logger.info("Sending HTTPS request...")
await tls.write(https_req)
logger.info("HTTPS request sent successfully!")
logger.info("Reading HTTPS response...")
response_data = b""
try:
while True:
try:
chunk = await asyncio.wait_for(tls.read(4096), timeout=5.0)
if not chunk:
break
response_data += chunk
logger.debug("Received %d bytes", len(chunk))
if b"\r\n\r\n" in response_data:
headers_end = response_data.find(b"\r\n\r\n") + 4
headers = response_data[:headers_end].decode(
"utf-8", errors="ignore"
)
logger.info("Received HTTP response headers:")
for line in headers.strip().split("\r\n"):
if line:
logger.info(" %s", line)
if "content-length:" in headers.lower():
content_length_line = [
line
for line in headers.split("\r\n")
if line.lower().startswith("content-length:")
]
if content_length_line:
try:
content_length = int(
content_length_line[0].split(":")[1].strip()
)
body_received = len(response_data) - headers_end
logger.info(
"Content-Length: %d, Body received: %d",
content_length,
body_received,
)
if body_received >= content_length:
break
except ValueError:
pass
if "connection: close" in headers.lower():
continue
else:
break
except asyncio.TimeoutError:
logger.info("Read timeout, assuming response complete")
break
except Exception as e:
logger.warning("Error reading response: %s", e)
# ���n���h:
if response_data:
logger.info("Total response received: %d bytes", len(response_data))
preview = response_data[:200].decode("utf-8", errors="ignore")
logger.info("Response preview: %s", repr(preview))
if response_data.startswith(b"HTTP/"):
status_line = response_data.split(b"\r\n")[0].decode(
"utf-8", errors="ignore"
)
logger.info("HTTP Status: %s", status_line)
if b"200 OK" in response_data[:100]:
logger.info("HTTPS request successful - received 200 OK")
else:
logger.info("HTTPS request completed - received response")
else:
logger.warning("Unexpected response format")
else:
logger.warning("No response data received")
logger.info("HTTPS test completed successfully!")
return True
except asyncio.TimeoutError:
logger.error("HTTPS test timed out")
return False
except Exception as e:
logger.error("HTTPS test failed: %s", e)
import traceback
logger.debug("Exception details:\n%s", traceback.format_exc())
return False
finally:
logger.info("Cleaning up...")
await stack.close()
async def test_https_multiple_sites():
drv = MacOSDriver("en0")
stack = StackController(drv)
await stack.start()
test_sites = [
("example.com", 443),
("httpbin.org", 443),
("www.google.com", 443),
]
successful_tests = 0
try:
for hostname, port in test_sites:
logger.info("Testing HTTPS connection to %s:%d...", hostname, port)
try:
async with await stack.tls_connect(
hostname, port, verify_ssl=True
) as tls:
logger.info("TLS handshake successful for %s", hostname)
https_req = f"GET / HTTP/1.1\r\nHost: {hostname}\r\nConnection: close\r\n\r\n".encode()
await tls.write(https_req)
response = await asyncio.wait_for(tls.read(1024), timeout=5.0)
if response and response.startswith(b"HTTP/"):
logger.info("HTTPS response received from %s", hostname)
successful_tests += 1
else:
logger.warning("Invalid response from %s", hostname)
except Exception as e:
logger.warning("HTTPS test failed for %s: %s", hostname, e)
logger.info(
"HTTPS multiple sites test completed: %d/%d successful",
successful_tests,
len(test_sites),
)
return successful_tests > 0
except Exception as e:
logger.error("Multiple sites test failed: %s", e)
return False
finally:
await stack.close()
async def main():
logger.info("Starting HTTPS tests...")
logger.info("=== Basic HTTPS Test ===")
basic_success = await test_https()
logger.info("\n=== Multiple Sites HTTPS Test ===")
multi_success = await test_https_multiple_sites()
logger.info("\n=== Test Results ===")
logger.info("Basic HTTPS test: %s", "PASS" if basic_success else "FAIL")
logger.info("Multiple sites test: %s", "PASS" if multi_success else "FAIL")
if basic_success and multi_success:
logger.info("All HTTPS tests passed!")
return True
else:
logger.warning("Some HTTPS tests failed")
return False
if __name__ == "__main__":
asyncio.run(main())