From e7b8605555f12a6c9d2b90ad71555c6782191c96 Mon Sep 17 00:00:00 2001 From: Andriy Sheredko Date: Thu, 18 Jun 2026 13:18:03 +0300 Subject: [PATCH] fix(osf): ENG-11068 bound external HTTP in celery tasks to stop worker stalls --- api/share/utils.py | 4 +++ api_tests/share/test_request_timeout.py | 36 +++++++++++++++++++++ framework/sessions/__init__.py | 15 +-------- osf/external/askismet/client.py | 9 ++++-- osf/external/askismet/tasks.py | 4 +-- osf/external/cedar/client.py | 4 +-- osf/external/gravy_valet/request_helpers.py | 2 +- osf/external/oopspam/client.py | 3 +- osf/external/spam/tasks.py | 28 ++++++++-------- osf/management/commands/force_archive.py | 4 +-- osf/metrics/reporters/preprint_count.py | 6 +++- website/archiver/tasks.py | 2 +- website/identifiers/clients/crossref.py | 2 ++ website/settings/defaults.py | 2 ++ 14 files changed, 79 insertions(+), 42 deletions(-) create mode 100644 api_tests/share/test_request_timeout.py diff --git a/api/share/utils.py b/api/share/utils.py index d843c7e146a..b6547ce1d60 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -72,6 +72,8 @@ def _enqueue_update_share(osfresource): acks_late=True, max_retries=4, retry_backoff=True, + soft_time_limit=120, + time_limit=180, ) def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN'): """ @@ -195,6 +197,7 @@ def pls_send_trove_record(osf_item, *, is_backfill: bool, osfmap_partition: Osfm **_shtrove_auth_headers(osf_item), }, data=ensure_bytes(_serialized_record), + timeout=settings.EXTERNAL_REQUEST_TIMEOUT, ) @@ -205,6 +208,7 @@ def pls_delete_trove_record(osf_item, osfmap_partition: OsfmapPartition): 'record_identifier': _shtrove_record_identifier(osf_item, osfmap_partition), }, headers=_shtrove_auth_headers(osf_item), + timeout=settings.EXTERNAL_REQUEST_TIMEOUT, ) diff --git a/api_tests/share/test_request_timeout.py b/api_tests/share/test_request_timeout.py new file mode 100644 index 00000000000..ff7f2670482 --- /dev/null +++ b/api_tests/share/test_request_timeout.py @@ -0,0 +1,36 @@ +from unittest import mock + +import pytest + +from api.share import utils as share_utils +from osf.metadata.osf_gathering import OsfmapPartition +from osf_tests.factories import ProjectFactory +from website import settings + + +@pytest.mark.django_db +class TestShareRequestTimeout: + + @pytest.fixture() + def public_node(self): + return ProjectFactory(is_public=True) + + def test_delete_trove_record_passes_timeout(self, public_node): + with mock.patch.object(share_utils.requests, 'delete') as mock_delete: + share_utils.pls_delete_trove_record(public_node, osfmap_partition=OsfmapPartition.MAIN) + assert mock_delete.call_args.kwargs['timeout'] == settings.EXTERNAL_REQUEST_TIMEOUT + + def test_send_trove_record_passes_timeout(self, public_node): + fake_serializer = mock.Mock(mediatype='text/turtle') + fake_serializer.serialize.return_value = b'' + with ( + mock.patch.object(share_utils, 'pls_get_magic_metadata_basket'), + mock.patch.object(share_utils, 'get_metadata_serializer', return_value=fake_serializer), + mock.patch.object(share_utils.requests, 'post') as mock_post, + ): + share_utils.pls_send_trove_record( + public_node, + is_backfill=False, + osfmap_partition=OsfmapPartition.MAIN, + ) + assert mock_post.call_args.kwargs['timeout'] == settings.EXTERNAL_REQUEST_TIMEOUT diff --git a/framework/sessions/__init__.py b/framework/sessions/__init__.py index 72939c1ec5d..ee52e2a4543 100644 --- a/framework/sessions/__init__.py +++ b/framework/sessions/__init__.py @@ -4,13 +4,11 @@ from urllib.parse import urlparse, parse_qs, urlunparse, urlencode from django.apps import apps -from django.utils import timezone from django.conf import settings as django_conf_settings import itsdangerous from flask import request, g from furl import furl -from framework.celery_tasks.handlers import enqueue_task from framework.flask import redirect from osf.utils.fields import ensure_str from osf.exceptions import InvalidCookieOrSessionError @@ -213,7 +211,7 @@ def before_request(): cookie = request.cookies.get(settings.COOKIE_NAME) if cookie: try: - user_session = flask_get_session_from_cookie(cookie) + flask_get_session_from_cookie(cookie) except InvalidCookieOrSessionError: # If invalid session/cookie happens, only remove the invalid cookie and redirect to the same request. # This ensures users landing on the page/link they previously clicked. @@ -226,17 +224,6 @@ def before_request(): response = redirect(redirect_url) response.delete_cookie(settings.COOKIE_NAME, domain=settings.OSF_COOKIE_DOMAIN) return response - # Case 1: anonymous session that is used for first time external (e.g. ORCiD) login only - if user_session.get('auth_user_external_first_login', False) is True: - return - # Case 2: session without authenticated user - user_id = user_session.get('auth_user_id', None) - if not user_id: - return - # Case 3: authenticated session with user - # Update date last login when making non-api requests - from framework.auth.tasks import update_user_from_activity - enqueue_task(update_user_from_activity.s(user_id, timezone.now().timestamp(), cas_login=False)) def after_request(response): diff --git a/osf/external/askismet/client.py b/osf/external/askismet/client.py index db57b1d3cfa..eaf726dd740 100644 --- a/osf/external/askismet/client.py +++ b/osf/external/askismet/client.py @@ -38,7 +38,8 @@ def _is_apikey_valid(self): 'key': self.apikey, 'blog': self.website }, - headers=self._default_headers + headers=self._default_headers, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT ) self._apikey_is_valid = res.text == 'valid' return self._is_apikey_valid() @@ -108,7 +109,8 @@ def submit_spam(self, user_ip, user_agent, **kwargs): res = requests.post( f'{self.API_PROTOCOL}{self.apikey}.{self.API_HOST}/1.1/submit-spam', data=data, - headers=self._default_headers + headers=self._default_headers, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT ) if res.status_code != requests.codes.ok: raise AkismetClientError(reason=res.text) @@ -129,7 +131,8 @@ def submit_ham(self, user_ip, user_agent, **kwargs): res = requests.post( f'{self.API_PROTOCOL}{self.apikey}.{self.API_HOST}/1.1/submit-ham', data=data, - headers=self._default_headers + headers=self._default_headers, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT ) if res.status_code != requests.codes.ok: raise AkismetClientError(reason=res.text) diff --git a/osf/external/askismet/tasks.py b/osf/external/askismet/tasks.py index f77516204b1..1a1b9886965 100644 --- a/osf/external/askismet/tasks.py +++ b/osf/external/askismet/tasks.py @@ -3,7 +3,7 @@ from osf.external.askismet.client import AkismetClient -@celery_app.task() +@celery_app.task(soft_time_limit=60, time_limit=90) def submit_spam(guid): from osf.models import Guid resource = Guid.load(guid).referent @@ -20,7 +20,7 @@ def submit_spam(guid): ) -@celery_app.task() +@celery_app.task(soft_time_limit=60, time_limit=90) def submit_ham(guid): from osf.models import Guid resource = Guid.load(guid).referent diff --git a/osf/external/cedar/client.py b/osf/external/cedar/client.py index e748d7ccc9c..130be44b171 100644 --- a/osf/external/cedar/client.py +++ b/osf/external/cedar/client.py @@ -19,7 +19,7 @@ class CedarClient: def retrieve_all_template_ids(self): url = f'{self.host}folders/{self.home_folder_id}/contents/?resource_types=template' try: - r = requests.get(url, headers=self.headers) + r = requests.get(url, headers=self.headers, timeout=settings.EXTERNAL_REQUEST_TIMEOUT) r.raise_for_status() except RequestException: raise CedarClientRequestError( @@ -36,7 +36,7 @@ def retrieve_all_template_ids(self): def retrieve_template_by_id(self, template_id): url = f'{self.host}templates/{quote_plus(template_id)}' try: - r = requests.get(url, headers=self.headers) + r = requests.get(url, headers=self.headers, timeout=settings.EXTERNAL_REQUEST_TIMEOUT) r.raise_for_status() except RequestException: raise CedarClientRequestError(reason=f'Fail to complete Cedar API request: template_id={template_id}') diff --git a/osf/external/gravy_valet/request_helpers.py b/osf/external/gravy_valet/request_helpers.py index a4f7d37694b..48096ef8429 100644 --- a/osf/external/gravy_valet/request_helpers.py +++ b/osf/external/gravy_valet/request_helpers.py @@ -283,7 +283,7 @@ def _make_gv_request( ) assert not (request_method == 'GET' and json_data is not None) try: - response = requests.request(url=endpoint_url, headers=auth_headers, params=params, method=request_method, json=json_data) + response = requests.request(url=endpoint_url, headers=auth_headers, params=params, method=request_method, json=json_data, timeout=settings.EXTERNAL_REQUEST_TIMEOUT) except RequestException as e: logger.error(f"Cannot reach GravyValet: {e}") return None diff --git a/osf/external/oopspam/client.py b/osf/external/oopspam/client.py index 0abdfdd021f..151a102ec37 100644 --- a/osf/external/oopspam/client.py +++ b/osf/external/oopspam/client.py @@ -34,7 +34,8 @@ def check_content(self, user_ip, content, **kwargs): 'content-type': 'application/json', 'x-rapidapi-key': self.apikey, 'x-rapidapi-host': 'oopspam.p.rapidapi.com' - } + }, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT ) if response.status_code != requests.codes.ok: diff --git a/osf/external/spam/tasks.py b/osf/external/spam/tasks.py index 4cb4c7784b5..c64fed209c8 100644 --- a/osf/external/spam/tasks.py +++ b/osf/external/spam/tasks.py @@ -5,7 +5,6 @@ from framework.celery_tasks import app as celery_app from framework.postcommit_tasks.handlers import run_postcommit from django.contrib.contenttypes.models import ContentType -from django.db import transaction from osf.external.askismet.client import AkismetClient from osf.external.oopspam.client import OOPSpamClient from osf.utils.fields import ensure_str @@ -24,20 +23,19 @@ def reclassify_domain_references(notable_domain_id, current_note, previous_note) from osf.models.notable_domain import DomainReference, NotableDomain domain = NotableDomain.load(notable_domain_id) references = DomainReference.objects.filter(domain=domain) - with transaction.atomic(): - for item in references: - item.is_triaged = current_note != NotableDomain.Note.UNKNOWN - if current_note == NotableDomain.Note.EXCLUDE_FROM_ACCOUNT_CREATION_AND_CONTENT: - item.referrer.confirm_spam(save=False, domains=[domain.domain]) - elif previous_note == NotableDomain.Note.EXCLUDE_FROM_ACCOUNT_CREATION_AND_CONTENT: - try: - item.referrer.spam_data['domains'].remove(domain.domain) - except (KeyError, AttributeError, ValueError) as error: - logger.info(error) - if not item.referrer.spam_data.get('domains') and not item.referrer.spam_data.get('who_flagged'): - item.referrer.unspam(save=False) - item.save() - item.referrer.save() + for item in references: + item.is_triaged = current_note != NotableDomain.Note.UNKNOWN + if current_note == NotableDomain.Note.EXCLUDE_FROM_ACCOUNT_CREATION_AND_CONTENT: + item.referrer.confirm_spam(save=False, domains=[domain.domain]) + elif previous_note == NotableDomain.Note.EXCLUDE_FROM_ACCOUNT_CREATION_AND_CONTENT: + try: + item.referrer.spam_data['domains'].remove(domain.domain) + except (KeyError, AttributeError, ValueError) as error: + logger.info(error) + if not item.referrer.spam_data.get('domains') and not item.referrer.spam_data.get('who_flagged'): + item.referrer.unspam(save=False) + item.save() + item.referrer.save() def _check_resource_for_domains(resource, content): diff --git a/osf/management/commands/force_archive.py b/osf/management/commands/force_archive.py index 7b6d18b6c65..4a8d04feb28 100644 --- a/osf/management/commands/force_archive.py +++ b/osf/management/commands/force_archive.py @@ -45,7 +45,7 @@ from api.waffle.utils import flag_is_active from scripts import utils as script_utils from website.archiver import ARCHIVER_SUCCESS -from website.settings import ARCHIVE_TIMEOUT_TIMEDELTA, ARCHIVE_PROVIDER, COOKIE_NAME +from website.settings import ARCHIVE_TIMEOUT_TIMEDELTA, ARCHIVE_PROVIDER, COOKIE_NAME, EXTERNAL_REQUEST_TIMEOUT from website.files.utils import attach_versions logger = logging.getLogger(__name__) @@ -174,7 +174,7 @@ def perform_wb_copy(reg, node_settings, delete_collisions=False, skip_collisions 'provider': ARCHIVE_PROVIDER, } url = waterbutler_api_url_for(src._id, node_settings.short_name, _internal=True, base_url=src.osfstorage_region.waterbutler_url, **params) - res = requests.post(url, data=json.dumps(data), cookies={COOKIE_NAME: cookie}) + res = requests.post(url, data=json.dumps(data), cookies={COOKIE_NAME: cookie}, timeout=EXTERNAL_REQUEST_TIMEOUT) if res.status_code not in (http_status.HTTP_200_OK, http_status.HTTP_201_CREATED, http_status.HTTP_202_ACCEPTED): http_exception = HTTPError(res.status_code) sentry.log_exception(http_exception) diff --git a/osf/metrics/reporters/preprint_count.py b/osf/metrics/reporters/preprint_count.py index 6cafa063c62..99dbede560f 100644 --- a/osf/metrics/reporters/preprint_count.py +++ b/osf/metrics/reporters/preprint_count.py @@ -47,7 +47,11 @@ def report(self, date): for preprint_provider in PreprintProvider.objects.all(): elastic_query = get_elastic_query(date, preprint_provider) - resp = requests.post(f'{settings.SHARE_URL}api/v2/search/creativeworks/_search', json=elastic_query).json() + resp = requests.post( + f'{settings.SHARE_URL}api/v2/search/creativeworks/_search', + json=elastic_query, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT, + ).json() yield DailyPreprintSummaryReport( cycle_coverage=cycle_coverage_date(date), diff --git a/website/archiver/tasks.py b/website/archiver/tasks.py index 7608b5c02c6..a16055bafca 100644 --- a/website/archiver/tasks.py +++ b/website/archiver/tasks.py @@ -268,7 +268,7 @@ def make_copy_request(self, job_pk, url, data): src, dst, user = job.info() logger.info(f"Sending copy request for addon: {data['provider']} on node: {dst._id}") cookie = furl(url).query.params.get('cookie') - res = requests.post(url, data=json.dumps(data), cookies={settings.COOKIE_NAME: cookie}) + res = requests.post(url, data=json.dumps(data), cookies={settings.COOKIE_NAME: cookie}, timeout=settings.EXTERNAL_REQUEST_TIMEOUT) if res.status_code not in (http_status.HTTP_200_OK, http_status.HTTP_201_CREATED, http_status.HTTP_202_ACCEPTED): raise HTTPError(res.status_code) diff --git a/website/identifiers/clients/crossref.py b/website/identifiers/clients/crossref.py index 8f496ce363b..f7e20a618c2 100644 --- a/website/identifiers/clients/crossref.py +++ b/website/identifiers/clients/crossref.py @@ -263,6 +263,7 @@ def create_identifier(self, preprint, category, include_relation=True): fname=f'{preprint._id}.xml' ), files={'file': (f'{preprint._id}.xml', metadata)}, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT, ) if response.status_code == 429: raise CrossRefRateLimitError(response.text) @@ -291,6 +292,7 @@ def bulk_create(self, metadata, filename): fname=f'{filename}.xml' ), files={'file': (f'{filename}.xml', metadata)}, + timeout=settings.EXTERNAL_REQUEST_TIMEOUT, ) logger.info('Sent a bulk update of metadata to CrossRef') diff --git a/website/settings/defaults.py b/website/settings/defaults.py index da607c2afb3..966c8f64fe2 100644 --- a/website/settings/defaults.py +++ b/website/settings/defaults.py @@ -375,6 +375,8 @@ def parent_dir(path): SHARE_URL = 'https://share.osf.io/' SHARE_API_TOKEN = None # Required to send project updates to SHARE +EXTERNAL_REQUEST_TIMEOUT = (10, 30) # (connect, read) timeout for outbound requests to external services + CAS_SERVER_URL = 'http://localhost:8080' MFR_SERVER_URL = 'http://localhost:7778'