Skip to content

TaskSpider下载中间件耗时久,导致redis分布式分配重复url问题 #306

@mochazi

Description

@mochazi

需知

升级feapder,保证feapder是最新版,若BUG仍然存在,则详细描述问题

pip install --upgrade feapder
确定是最新版 feapder[all]==1.9.3

问题

如果下载中间件耗时久,取任务一直给我重复的url,暂时不知道怎么解决

复现步骤

  • 开 1 个终端执行 start_monitor_task.py 发任务
  • 开 3 个终端执行 start.py 取任务
  • 跑一段时间后,取任务开始重复url

截图

Image

代码

# setting.py
# -*- coding: utf-8 -*-
import os,sys
# 切换工作路径为当前项目路径
PROJECT_PATH = os.path.abspath(os.path.dirname(__file__))
os.chdir(PROJECT_PATH)  # 切换工作路经
sys.path.insert(0, PROJECT_PATH) # 添加环境变量
if not os.path.exists('logs'):
    os.makedirs('logs')

LOG_NAME = "init_spider"
LOG_PATH = "logs/%s.log" % LOG_NAME  # log存储路径
LOG_LEVEL = "DEBUG"
# LOG_LEVEL = "INFO"
LOG_COLOR = True  # 是否带有颜色
LOG_IS_WRITE_TO_CONSOLE = True  # 是否打印到控制台
LOG_IS_WRITE_TO_FILE = True  # 是否写文件
LOG_MODE = "w"  # 写文件的模式
LOG_MAX_BYTES = 10 * 1024 * 1024  # 每个日志文件的最大字节数
LOG_BACKUP_COUNT = 20  # 日志文件保留数量
LOG_ENCODING = "utf8"  # 日志文件编码
OTHERS_LOG_LEVAL = "ERROR"  # 第三方库的log等级

SPIDER_SLEEP_TIME = [5, 8] # 请求延迟
SPIDER_MAX_RETRY_TIMES = 1 # 最大重试次数

REDISDB_IP_PORTS="localhost:6379"
REDISDB_USER_PASS=""
REDISDB_DB=0

ITEM_FILTER_SETTING = dict(
    filter_type=1  # 永久去重(BloomFilter) = 1 、内存去重(MemoryFilter) = 2、 临时去重(ExpireFilter)= 3、轻量去重(LiteFilter)= 4
)
REQUEST_FILTER_SETTING = dict(
    filter_type=3,  # 永久去重(BloomFilter) = 1 、内存去重(MemoryFilter) = 2、 临时去重(ExpireFilter)= 3、 轻量去重(LiteFilter)= 4
    expire_time=2592000,  # 过期时间1个月
)
# spider.py
# -*- coding: utf-8 -*-
from feapder.utils.log import log
import requests, time, os, feapder
import random

log.info(f"[当前工作路径] {os.getcwd()}")

class SpiderTest(feapder.TaskSpider):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    
    def download_midware(self, request):
        # 模拟处理很久的情况
        time.sleep(random.randint(10, 15))
        response = feapder.Response(requests.get(request.url))
        return request, response

    def add_task(self):
        for index in range(10000):
            self._redisdb.zadd(self._task_table, {"url": f"https://httpbin.org/get?test={index}"})

    def start_requests(self, task):

        yield feapder.Request(url=task.url)

    def parse(self, request, response):
        log.info(f"response.json['args'] = {response.json['args']}")
# start_monitor_task.py
# -*- coding: utf-8 -*-
from spider import *

if __name__ == "__main__":

    spider = SpiderTest(
        task_table="spider_task", # 任务表名
        task_table_type="redis",  # 任务表类型为redis
        redis_key="test:task_spider", # redis里做任务队列的key
        keep_alive=True, # 是否常驻
        delete_keys="*",
        use_mysql=False
    )
    spider.start_monitor_task()

    # 检测 3次
    for _ in range(3):
        while spider.all_thread_is_done() is False:
            time.sleep(1)
        time.sleep(1)
# start.py
# -*- coding: utf-8 -*-
from spider import *

if __name__ == "__main__":

    spider = SpiderTest(
        task_table="spider_task", # 任务表名
        task_table_type="redis",  # 任务表类型为redis
        redis_key="test:task_spider", # redis里做任务队列的key
        keep_alive=True, # 是否常驻
        use_mysql=False
    )
    spider.start()

    # 检测 3次
    for _ in range(3):
        while spider.all_thread_is_done() is False:
            time.sleep(1)
        time.sleep(1)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions