-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
185 lines (153 loc) · 7.72 KB
/
Copy pathserver.py
File metadata and controls
185 lines (153 loc) · 7.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import sys
import json
import logging
import threading
import time
from socket import socket, SOL_SOCKET, SO_REUSEADDR, SOCK_STREAM, AF_INET
from concurrent import futures
from utils import get_args
from request import Request
from response import Response
from methods import Methods
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - [%(levelname)s] - %(message)s')
# Command line arguments: ./server [port] [host] [max_workers]
PORT = int(get_args(0, '8080')) # First argument: port number
IP_ADDRESS = get_args(1, '127.0.0.1') # Second argument: host address
MAX_WORKERS = int(get_args(2, '10')) # Third argument: maximum thread pool size
MAX_PAYLOAD_SIZE = 8192
class HTTPServer:
"""
Multi-threaded HTTP Server implementation with support for persistent connections,
binary file transfers, and comprehensive security features
"""
def __init__(self, max_workers: int, max_payload_size: int):
self._logger = logging.getLogger(HTTPServer.__name__)
self.max_payload_size = max_payload_size
self.max_workers = max_workers
self.active_connections = {} # Track active connections
self.connection_lock = threading.Lock()
# Create TCP socket
self._socket = socket(AF_INET, SOCK_STREAM)
# Prevent "Address already in use" errors
self._socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self._logger.info('Socket created')
# Create thread pool for handling connections
self._pool = futures.ThreadPoolExecutor(max_workers=max_workers)
def _handle_connection(self, connection: socket, address: tuple):
"""
Handle client connection with support for persistent connections
and proper error handling
"""
client_addr = f"{address[0]}:{address[1]}"
thread_name = threading.current_thread().name
request_count = 0
max_requests = 100
self._logger.info(f"[{thread_name}] Connection from {client_addr}")
try:
# Set socket timeout for persistent connections
connection.settimeout(30.0)
while request_count < max_requests:
try:
# Receive data from client
data = connection.recv(self.max_payload_size)
if not data:
break
data_str = data.decode('utf-8', errors='ignore')
if not data_str.strip():
continue
# Parse request
request = Request(data_str)
request_count += 1
self._logger.info(f"[{thread_name}] Request: {request.method} {request.path} HTTP/{request.version}")
# Process request based on method
if request.method == 'GET':
response = Methods.GET(request)
elif request.method == 'POST':
response = Methods.POST(request)
else:
response = Methods.OTHER(request)
# Send response
if hasattr(response, 'binary_data') and response.binary_data is not None:
connection.send(response.to_bytes())
self._logger.info(f"[{thread_name}] Response: {response.status} {response.message} ({len(response.binary_data)} bytes transferred)")
else:
connection.send(str(response).encode('utf-8'))
body_size = len(response.body.encode('utf-8')) if response.body else 0
self._logger.info(f"[{thread_name}] Response: {response.status} {response.message} ({body_size} bytes transferred)")
# Check connection header for keep-alive
connection_header = request.headers.get('connection', '').lower()
if connection_header == 'close' or request.version == '1.0':
self._logger.info(f"[{thread_name}] Connection: close")
break
else:
self._logger.info(f"[{thread_name}] Connection: keep-alive")
except socket.timeout:
self._logger.info(f"[{thread_name}] Connection timeout")
break
except UnicodeDecodeError:
self._logger.error(f"[{thread_name}] Invalid UTF-8 data received")
break
except Exception as e:
self._logger.error(f"[{thread_name}] Error handling request: {e}")
# Send 500 Internal Server Error
error_response = Response(request.version if 'request' in locals() else '1.1', 500, 'Internal Server Error')
error_response.set_body('Internal Server Error')
connection.send(str(error_response).encode('utf-8'))
break
except Exception as e:
self._logger.error(f"[{thread_name}] Connection error: {e}")
finally:
try:
connection.close()
self._logger.info(f"[{thread_name}] Connection closed after {request_count} requests")
except:
pass
def listen(self, ip: str, port: int):
"""
Start listening for client connections
"""
try:
self._socket.bind((ip, port))
self._logger.info('Socket bound successfully')
# Listen for connections (queue size of 50 as per requirements)
self._socket.listen(50)
# Log server startup information
self._logger.info(f'HTTP Server started on http://{ip}:{port}')
self._logger.info(f'Thread pool size: {self.max_workers}')
self._logger.info('Serving files from \'resources\' directory')
self._logger.info('Press Ctrl+C to stop the server')
while True:
try:
connection, address = self._socket.accept()
# Check thread pool status
with self.connection_lock:
active_threads = len([t for t in threading.enumerate() if t.name.startswith('ThreadPoolExecutor')])
if active_threads >= self.max_workers:
self._logger.warning('Thread pool saturated, queuing connection')
# Submit connection to thread pool
future = self._pool.submit(self._handle_connection, connection, address)
# Log when connection is dequeued
future.add_done_callback(lambda f: self._logger.info('Connection dequeued, assigned to thread'))
except Exception as e:
self._logger.error(f'Error accepting connection: {e}')
except Exception as e:
self._logger.error(f'Error starting server: {e}')
raise
def close(self):
self._socket.close()
self._logger.info('Socket closed')
if __name__ == '__main__':
"""
Main server entry point
"""
try:
# Create and start server
server = HTTPServer(MAX_WORKERS, MAX_PAYLOAD_SIZE)
server.listen(IP_ADDRESS, PORT)
except KeyboardInterrupt:
logging.info('Server stopped by user (Ctrl+C)')
except Exception as e:
logging.error(f'Server error: {e}')
finally:
logging.info('Server shutdown complete')