-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest.py
More file actions
163 lines (138 loc) · 5.01 KB
/
Copy pathrequest.py
File metadata and controls
163 lines (138 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import json
import logging
class Request:
"""
HTTP Request parser that handles GET and POST requests with proper
header and body parsing
"""
def __init__(self, data: str):
self.data = data
self.path = '/'
self.method = 'GET'
self.version = '1.1'
self.protocol = 'HTTP'
self.headers = {}
self.query_params = {}
self.body = ''
self.is_valid = True
self.error_message = ''
self._logger = logging.getLogger(Request.__name__)
self._parse_data()
def _parse_data(self):
"""
Parse HTTP request data into components
"""
try:
if not self.data or not self.data.strip():
self.is_valid = False
self.error_message = 'Empty request'
return
lines = self.data.split('\r\n')
if len(lines) < 1:
lines = self.data.split('\n') # Fallback for different line endings
if len(lines) < 1:
self.is_valid = False
self.error_message = 'Invalid request format'
return
# Parse request line
request_line = lines[0].strip()
parts = request_line.split()
if len(parts) != 3:
self.is_valid = False
self.error_message = 'Invalid request line format'
return
self.method = parts[0].upper()
self.path = parts[1]
protocol_version = parts[2]
# Parse protocol and version
if '/' in protocol_version:
self.protocol = protocol_version.split('/')[0]
self.version = protocol_version.split('/')[1]
else:
self.is_valid = False
self.error_message = 'Invalid HTTP version format'
return
# Parse query parameters from path
if '?' in self.path:
path_parts = self.path.split('?', 1)
self.path = path_parts[0]
self._parse_query_params(path_parts[1])
# Parse headers
header_start = 1
header_end = len(lines)
# Find where headers end (empty line or \r\n\r\n)
for i in range(1, len(lines)):
if lines[i].strip() == '':
header_end = i
break
# Parse header lines
for i in range(header_start, header_end):
line = lines[i].strip()
if ':' in line:
key, value = line.split(':', 1)
self.headers[key.strip().lower()] = value.strip()
# Parse body (everything after the empty line)
if header_end < len(lines) - 1:
self.body = '\r\n'.join(lines[header_end + 1:])
elif '\r\n\r\n' in self.data:
self.body = self.data.split('\r\n\r\n', 1)[1]
except Exception as e:
self._logger.error(f'Error parsing request: {e}')
self.is_valid = False
self.error_message = f'Request parsing error: {str(e)}'
def _parse_query_params(self, query_string: str):
"""
Parse query parameters from URL
"""
if not query_string:
return
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
self.query_params[key] = value
else:
self.query_params[param] = ''
def get_json_body(self):
"""
Parse request body as JSON
Returns parsed JSON or raises exception if invalid
"""
if not self.body:
return {}
try:
return json.loads(self.body)
except json.JSONDecodeError as e:
raise ValueError(f'Invalid JSON in request body: {str(e)}')
def get_header(self, key: str, default: str = None):
"""
Get header value by key (case-insensitive)
"""
return self.headers.get(key.lower(), default)
def has_header(self, key: str) -> bool:
"""
Check if header exists (case-insensitive)
"""
return key.lower() in self.headers
def get_content_type(self) -> str:
"""
Get Content-Type header value
"""
return self.get_header('content-type', '')
def get_content_length(self) -> int:
"""
Get Content-Length header value as integer
"""
try:
return int(self.get_header('content-length', '0'))
except ValueError:
return 0
def validate_host_header(self, expected_host: str) -> bool:
"""
Validate Host header matches expected server host
"""
host = self.get_header('host')
if not host:
return False
return host == expected_host
def __str__(self):
return f"{self.method} {self.path} HTTP/{self.version}"