-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathScraper.py
More file actions
311 lines (239 loc) · 11.1 KB
/
Scraper.py
File metadata and controls
311 lines (239 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
from concurrent.futures import ProcessPoolExecutor
import time
import re
import sys
import argparse
import os
import random
import signal
import re
import requests
from seleniumwire import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import pandas as pd
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from tqdm import tqdm
from urllib.parse import quote_plus
from fake_useragent import UserAgent
from databaseClasses import PostgressDBConnection, AWSS3Connection
from email.header import Header
from wsgiref import headers
from torpy.http.requests import TorRequests
import urllib.request
import random
load_dotenv()
class Scraper():
proxies_all = "Proxy Config here"
# Split the proxies into a list, and split each proxy into the relevant fields (username, password, endpoint, port)
def __init__(self, startingUrl, company, brand_base_url, residentialProxy = False, ignoreUpdates= True, product_database = "productdata", autocommit = False):
# Name of company that is being scraped.
self.company = company
# Starting url of the scraped company.
self.startingUrl = startingUrl
## Connect to pg database on aws
self.pg = PostgressDBConnection(table_name="productdata", autocommit = True)
self.aws = AWSS3Connection()
self.ignoreUpdates = ignoreUpdates
# Leave empty string, if urls scraped are absolute/have base url.
self.brand_base_url = brand_base_url
self.product_database = product_database
#Read from proxies.txt and headers.txt
with open("./cached_data/proxies.txt", "r") as file:
self.datacenter_proxies = file.readlines()
self.datacenter_proxies = [x.strip() for x in self.datacenter_proxies]
self.agent = UserAgent(browsers=['edge', 'chrome'])
self.use_residential_proxy = residentialProxy
self.residential_proxy = "customer-ardaakman-cc-us:Scripe123456@pr.oxylabs.io:7777"
#Default column values used in the database. Do not change if the database columns are the same.
self.columns = ['name', 'gender', 'color', 'description', 'compositions', 'price', 'sizes', 'images', 'url', 'company', 'timestamp']
#Column names for the urls table in database.
self.url_columns = ["url", "company", "gender", "timestamp"]
def get_proxy(self):
if self.use_residential_proxy:
return self.get_residential_proxy()
else:
return self.get_datacenter_proxy()
def get_datacenter_proxy(self) -> dict:
proxy = random.choice(self.datacenter_proxies)
provider = {
"http": f"http://{proxy}",
}
# Create the proxy dictionary, return it
return provider
def get_residential_proxy(self) -> dict:
# Construct the proxy string with authentication.
# Construct the proxies dictionary
proxies = {
"https": f"https://{self.residential_proxy}",
"http": f"http://{self.residential_proxy}"
}
return proxies
def get_browser_header(self) -> str:
# Return a browser agent, that will be used in the header for the HTTP(S) request.
agent = self.agent.random
header = {
"User-Agent": agent,
"Origin": self.brand_base_url,
"Referer": self.brand_base_url,
}
return header
def setup_session(self):
header = self.get_browser_header()
session = requests.Session()
session.headers.update(header)
session.proxies.update(self.get_proxy())
return session
def create_driver(self):
# Used to create a driver with the necessary headers and ipv6 address. (source ip)
header = random.choice(self.AGENT_LIST)
proxy_settings = self.firefox_proxy(header)
seleniumwire_options = {
'proxy': {
'http': proxy_settings["http"],
'no_proxy': 'localhost,127.0.0.1',
}
}
# If your proxy requires authentication, add the credentials
if 'username' in proxy_settings and 'password' in proxy_settings:
seleniumwire_options['proxy']['username'] = proxy_settings['username']
seleniumwire_options['proxy']['password'] = proxy_settings['password']
# Configure additional options for Chrome
chrome_options = webdriver.ChromeOptions()
# Initialize the WebDriver with the specified options
driver = webdriver.Chrome(
seleniumwire_options=seleniumwire_options,
chrome_options=chrome_options
)
return driver
def scrape_products(self, fn):
"""Function that scrapes the actual urls from a website and returns this."""
if not(self.ignoreUpdates) and self.pg.table_exists(f"productdata_{self.company}"):
old_products = self.pg.run_query(f"SELECT url FROM productdata_{self.company} WHERE company = '{self.company}'")
#set of old product urls
if old_products.size > 0:
old_set = set(old_products[:,0])
else:
old_set = set()
else:
old_set = set()
urls = self.pg.run_query(f"SELECT url FROM producturls_{self.company} WHERE company = '{self.company}' ORDER BY unique_ids DESC")
# Now grab previously failed urls. These are cached.
failed_urls = set()
if os.path.exists(f"./failed_urls_{self.company}"):
with open(f"./failed_urls_{self.company}") as f:
failed_urls = f.readlines()
failed_urls = set([x.strip() for x in failed_urls])
urls = urls[:,0]
urls = [url for url in urls if url not in old_set]
urls = [url for url in urls if url not in failed_urls]
prods = fn(urls)
self.save_product(prods)
def get_urls(self):
"""Function that returns the urls from the database This is only for the purpose of multiprocessing."""
query = self.pg.run_query(f"SELECT url FROM producturls_{self.company} WHERE company = '{self.company}' ORDER BY unique_ids DESC")
failed_urls = set()
if os.path.exists(f"./failed_urls_{self.company}"):
with open(f"./failed_urls_{self.company}") as f:
failed_urls = f.readlines()
failed_urls = set([x.strip() for x in failed_urls])
urls = query[:,0]
urls = [url for url in urls if url not in failed_urls]
return urls
def scrape_urls(self, fn):
connection_established = self.pg.test_connection()
if not connection_established:
print("Could not establish connection to database. Exiting...")
sys.exit(1)
table_exists = self.pg.table_exists(f"producturls_{self.company}")
if not(self.ignoreUpdates) and table_exists:
old_products = self.pg.run_query(f"SELECT url FROM producturls_{self.company} WHERE company = '{self.company}'")
old_set = set(old_products[:, 0])
else:
old_set = set()
vals = fn(self.startingUrl, self.brand_base_url)
result_urls = []
for val in vals:
if val[0] not in old_set:
result_urls.append((val[0], self.company, val[1]))
# Result urls contains the [url, company, geder, timestamp]!
self.save_urls(result_urls)
return result_urls
def fetchProductsFromDb(self):
query = f"SELECT * FROM products WHERE company = '{self.company}'"
return self.pg.run_query(query)
def process_urls_in_chunk(self, urls_chunk, mapping, i, lock):
sub_chunk_size = 5 # Number of URLs to process before switching session
# Break the urls_chunk into smaller sub-chunks
for sub_start in range(0, len(urls_chunk), sub_chunk_size):
sub_chunk = urls_chunk[sub_start:sub_start + sub_chunk_size]
subchunk_processed_count = 0
vals = []
with TorRequests() as tor_requests:
with tor_requests.get_session() as sess:
HEADERS = {"User-Agent": random.choice(self.AGENT_LIST)}
print(sess.get("http://httpbin.org/ip").json())
print(HEADERS["User-Agent"])
for url in tqdm(sub_chunk):
bs = BeautifulSoup(sess.get(url).text, 'html.parser')
val = self.scrapeSingleProduct(bs, url)
vals.append(val)
#Store vals in database.
self.pg.save_product_details(vals)
print(f"Subchunk {i} processed {subchunk_processed_count} urls")
def save_urls(self, urls):
"""Function that saves the urls to the database."""
self.pg.save_urls_db(urls, self.company)
def save_product(self, product):
""""Here are the columns:
'Unique ID': sha256_hash,
'Color': product_color,
'Name': product_name,
'Description': product_long_desc,
'Details': product_details,
'Material': material_type,
'Image_urls': secondary_image_urls,
'Product_url': color_url,
'Size': size,
'Size Availability': availability,
'Gender': gender,
'Price': product_price,
"""
self.pg.save_data_to_db(f"productdata_{self.company}", product, self.columns)
def scrape_single_product(self, driver, url , fn):
""""This function should get the details of the product. The requred fields are:
- product_name
- product_color
- product_description
- avaliable_sizes
- Material information
- image_urls
Function can change per website.
"""
return fn(driver, url)
def create_listener(self):
self.pg.conn.autocommit = True
curs = self.pg.conn.cursor()
curs.execute("LISTEN productdata_channel;")
print("Listening for notifications on channel 'productdata_channel'")
try:
while True:
# Check for connection status here
print("Waiting for notifications...")
self.pg.conn.poll()
while self.pg.conn.notifies:
notify = self.pg.conn.notifies.pop(0)
print(f"Received notification: {notify.payload}")
# Run the image update function here
self.aws.upload_images_to_s3(self)
# Sleep for a short period to prevent high CPU usage, adjust the time as needed
time.sleep(5)
except Exception as e:
print(f"An error occurred: {e}")
def kill_db_connection(self):
self.pg.disconnect()