diff --git a/api/share/utils.py b/api/share/utils.py index d843c7e146a..42ee4ea7ce4 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -50,21 +50,31 @@ def is_qa_resource(resource): return has_qa_tags or has_qa_title -def update_share(resource): +def update_share(resource, target_queue=None): + """ + By default, tasks are routed to queue based on module routing in CeleryRouter, + :param resource: osf resource that is needed to be reindexed + :param target_queue: should be task queue attribute of CeleryConfig f.e 'task_low_queue' for bulk background + passing 'target_queue' allows low-level queue task run (reindexing files after a user merge) even though + related module path may be marked to work with task_high_queue. + """ if not settings.SHARE_ENABLED: return if not hasattr(resource, 'guids'): logger.error(f'update_share called on non-guid resource: {resource}') return - _enqueue_update_share(resource) + if target_queue is not None: + _enqueue_update_share(resource, target_queue) + else: + _enqueue_update_share(resource) -def _enqueue_update_share(osfresource): +def _enqueue_update_share(osfresource, target_queue=None): _osfguid_value = osfresource.guids.values_list('_id', flat=True).first() if not _osfguid_value: logger.warning(f'update_share skipping resource that has no guids: {osfresource}') return - enqueue_task(task__update_share.s(_osfguid_value)) + enqueue_task(task__update_share.s(_osfguid_value, target_queue=target_queue)) @celery_app.task( @@ -73,7 +83,7 @@ def _enqueue_update_share(osfresource): max_retries=4, retry_backoff=True, ) -def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN'): +def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN', target_queue=None): """ Send SHARE/trove current metadata record(s) for the osf-guid-identified object """ diff --git a/framework/celery_tasks/routers.py b/framework/celery_tasks/routers.py index d9d6e335286..4f08d8b256e 100644 --- a/framework/celery_tasks/routers.py +++ b/framework/celery_tasks/routers.py @@ -32,6 +32,8 @@ def route_for_task(self, task, args=None, kwargs=None): :param str task: Of the form 'full.module.path.to.class.function' :returns dict: Tells celery into which queue to route this task. """ + if kwargs and (target_queue := kwargs.get('target_queue')): + return {'queue': target_queue} return { 'queue': match_by_module(task) } diff --git a/osf/models/user.py b/osf/models/user.py index 04cc0df6662..fadf7dc14a6 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -18,6 +18,7 @@ from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager from django.contrib.auth.hashers import check_password from django.contrib.auth.models import PermissionsMixin +from django.contrib.contenttypes.models import ContentType from django.core.exceptions import FieldDoesNotExist from django.dispatch import receiver from django.db import models @@ -67,6 +68,7 @@ from website import filters from website.project import new_bookmark_collection from website.util.metrics import OsfSourceTags, unregistered_created_source_tag +from website.settings import CeleryConfig from importlib import import_module from osf.models.notification_type import NotificationTypeEnum from osf.utils.requests import string_type_request_headers @@ -732,7 +734,9 @@ def merge_user(self, user): # Capture content to SHARE reindex BEFORE merge transfers contributors # After merge, user.contributed and user.preprints will be empty nodes_to_reindex = list(user.contributed) + node_ids_to_reindex = [node.id for node in nodes_to_reindex] preprints_to_reindex = list(user.preprints.all()) + preprint_ids_to_reindex = [preprint.id for preprint in preprints_to_reindex] # Move over the other user's attributes # TODO: confirm @@ -877,6 +881,29 @@ def merge_user(self, user): except Exception as e: logger.exception(f'Failed to SHARE reindex preprint {preprint._id} during user merge: {e}') + from osf.models import AbstractNode, Preprint + from addons.osfstorage.models import OsfStorageFile + node_ctype = ContentType.objects.get_for_model(AbstractNode) + preprint_ctype = ContentType.objects.get_for_model(Preprint) + nodes_files_to_reindex = OsfStorageFile.objects.filter( + target_object_id__in=node_ids_to_reindex, target_content_type=node_ctype, + guids__isnull=False + ) + preprints_files_to_reindex = OsfStorageFile.objects.filter( + target_object_id__in=preprint_ids_to_reindex, target_content_type=preprint_ctype, + guids__isnull=False + ) + for file in nodes_files_to_reindex.iterator(chunk_size=100): + try: + update_share(file, target_queue=CeleryConfig.task_low_queue) + except Exception as e: + logger.exception(f'Failed to SHARE reindex file {file._id} during user merge: {e}') + for file in preprints_files_to_reindex.iterator(chunk_size=100): + try: + update_share(file, target_queue=CeleryConfig.task_low_queue) + except Exception as e: + logger.exception(f'Failed to SHARE reindex preprints file {file._id} during user merge: {e}') + def _merge_users_preprints(self, user): """ Preprints use guardian. The PreprintContributor table stores order and bibliographic information. diff --git a/osf_tests/test_user.py b/osf_tests/test_user.py index 2e5d0631cd4..b681b1966f0 100644 --- a/osf_tests/test_user.py +++ b/osf_tests/test_user.py @@ -445,6 +445,7 @@ def test_merge_drafts(self, user): @mock.patch('api.share.utils.update_share') def test_merge_user_triggers_share_reindex(self, mock_update_share): from osf.models import Preprint + from addons.osfstorage.models import OsfStorageFile user = AuthUserFactory() user2 = AuthUserFactory() @@ -457,26 +458,39 @@ def test_merge_user_triggers_share_reindex(self, mock_update_share): preprint_two = PreprintFactory(title='preprint_two') preprint_two.add_contributor(user2) + node_file = OsfStorageFile.create( + target=node_one, path='/node_file.txt', name='node_file.txt', materialized_path='/node_file.txt' + ) + node_file.save(skip_search=True) + node_file.get_guid(create=True) + + preprint_file = OsfStorageFile.create( + target=preprint_one, path='/preprint_file.txt', name='preprint_file.txt', + materialized_path='/preprint_file.txt' + ) + preprint_file.save(skip_search=True) + preprint_file.get_guid(create=True) + user.merge_user(user2) + all_reindexed = [call[0][0] for call in mock_update_share.call_args_list] # Verify update_share was called for both nodes - nodes_reindexed = [ - call[0][0] for call in mock_update_share.call_args_list - if isinstance(call[0][0], AbstractNode) - ] + nodes_reindexed = [node_reindexed for node_reindexed in all_reindexed if isinstance(node_reindexed, AbstractNode)] assert len(nodes_reindexed) == 2 assert node_one in nodes_reindexed assert node_two in nodes_reindexed # Verify update_share was called for both preprints - preprints_reindexed = [ - call[0][0] for call in mock_update_share.call_args_list - if isinstance(call[0][0], Preprint) - ] + preprints_reindexed = [preprint_reindexed for preprint_reindexed in all_reindexed if isinstance(preprint_reindexed, Preprint)] assert len(preprints_reindexed) == 2 assert preprint_one in preprints_reindexed assert preprint_two in preprints_reindexed + # Verify update_share was called for files belonging to user2's nodes and preprints + files_reindexed = [file_reindexed for file_reindexed in all_reindexed if isinstance(file_reindexed, OsfStorageFile)] + assert node_file in files_reindexed + assert preprint_file in files_reindexed + def test_cant_create_user_without_username(self): u = OSFUser() # No username given with pytest.raises(ValidationError):