Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions api/share/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,31 @@ def is_qa_resource(resource):
return has_qa_tags or has_qa_title


def update_share(resource):
def update_share(resource, target_queue=None):
"""
By default, tasks are routed to queue based on module routing in CeleryRouter,
:param resource: osf resource that is needed to be reindexed
:param target_queue: should be task queue attribute of CeleryConfig f.e 'task_low_queue' for bulk background
passing 'target_queue' allows low-level queue task run (reindexing files after a user merge) even though
related module path may be marked to work with task_high_queue.
"""
if not settings.SHARE_ENABLED:
return
if not hasattr(resource, 'guids'):
logger.error(f'update_share called on non-guid resource: {resource}')
return
_enqueue_update_share(resource)
if target_queue is not None:
_enqueue_update_share(resource, target_queue)
else:
_enqueue_update_share(resource)


def _enqueue_update_share(osfresource):
def _enqueue_update_share(osfresource, target_queue=None):
_osfguid_value = osfresource.guids.values_list('_id', flat=True).first()
if not _osfguid_value:
logger.warning(f'update_share skipping resource that has no guids: {osfresource}')
return
enqueue_task(task__update_share.s(_osfguid_value))
enqueue_task(task__update_share.s(_osfguid_value, target_queue=target_queue))


@celery_app.task(
Expand All @@ -73,7 +83,7 @@ def _enqueue_update_share(osfresource):
max_retries=4,
retry_backoff=True,
)
def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN'):
def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN', target_queue=None):
"""
Send SHARE/trove current metadata record(s) for the osf-guid-identified object
"""
Expand Down
2 changes: 2 additions & 0 deletions framework/celery_tasks/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def route_for_task(self, task, args=None, kwargs=None):
:param str task: Of the form 'full.module.path.to.class.function'
:returns dict: Tells celery into which queue to route this task.
"""
if kwargs and (target_queue := kwargs.get('target_queue')):
return {'queue': target_queue}
return {
'queue': match_by_module(task)
}
27 changes: 27 additions & 0 deletions osf/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
from django.contrib.auth.hashers import check_password
from django.contrib.auth.models import PermissionsMixin
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldDoesNotExist
from django.dispatch import receiver
from django.db import models
Expand Down Expand Up @@ -67,6 +68,7 @@
from website import filters
from website.project import new_bookmark_collection
from website.util.metrics import OsfSourceTags, unregistered_created_source_tag
from website.settings import CeleryConfig
from importlib import import_module
from osf.models.notification_type import NotificationTypeEnum
from osf.utils.requests import string_type_request_headers
Expand Down Expand Up @@ -732,7 +734,9 @@ def merge_user(self, user):
# Capture content to SHARE reindex BEFORE merge transfers contributors
# After merge, user.contributed and user.preprints will be empty
nodes_to_reindex = list(user.contributed)
node_ids_to_reindex = [node.id for node in nodes_to_reindex]
preprints_to_reindex = list(user.preprints.all())
preprint_ids_to_reindex = [preprint.id for preprint in preprints_to_reindex]

# Move over the other user's attributes
# TODO: confirm
Expand Down Expand Up @@ -877,6 +881,29 @@ def merge_user(self, user):
except Exception as e:
logger.exception(f'Failed to SHARE reindex preprint {preprint._id} during user merge: {e}')

from osf.models import AbstractNode, Preprint
from addons.osfstorage.models import OsfStorageFile
Comment thread
cslzchen marked this conversation as resolved.
node_ctype = ContentType.objects.get_for_model(AbstractNode)
preprint_ctype = ContentType.objects.get_for_model(Preprint)
nodes_files_to_reindex = OsfStorageFile.objects.filter(
target_object_id__in=node_ids_to_reindex, target_content_type=node_ctype,
guids__isnull=False
)
preprints_files_to_reindex = OsfStorageFile.objects.filter(
target_object_id__in=preprint_ids_to_reindex, target_content_type=preprint_ctype,
guids__isnull=False
)
for file in nodes_files_to_reindex.iterator(chunk_size=100):
try:
update_share(file, target_queue=CeleryConfig.task_low_queue)
except Exception as e:
logger.exception(f'Failed to SHARE reindex file {file._id} during user merge: {e}')
for file in preprints_files_to_reindex.iterator(chunk_size=100):
try:
update_share(file, target_queue=CeleryConfig.task_low_queue)
except Exception as e:
logger.exception(f'Failed to SHARE reindex preprints file {file._id} during user merge: {e}')

def _merge_users_preprints(self, user):
"""
Preprints use guardian. The PreprintContributor table stores order and bibliographic information.
Expand Down
30 changes: 22 additions & 8 deletions osf_tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def test_merge_drafts(self, user):
@mock.patch('api.share.utils.update_share')
def test_merge_user_triggers_share_reindex(self, mock_update_share):
from osf.models import Preprint
from addons.osfstorage.models import OsfStorageFile

user = AuthUserFactory()
user2 = AuthUserFactory()
Expand All @@ -457,26 +458,39 @@ def test_merge_user_triggers_share_reindex(self, mock_update_share):
preprint_two = PreprintFactory(title='preprint_two')
preprint_two.add_contributor(user2)

node_file = OsfStorageFile.create(
target=node_one, path='/node_file.txt', name='node_file.txt', materialized_path='/node_file.txt'
)
node_file.save(skip_search=True)
node_file.get_guid(create=True)

preprint_file = OsfStorageFile.create(
target=preprint_one, path='/preprint_file.txt', name='preprint_file.txt',
materialized_path='/preprint_file.txt'
)
preprint_file.save(skip_search=True)
preprint_file.get_guid(create=True)

user.merge_user(user2)

all_reindexed = [call[0][0] for call in mock_update_share.call_args_list]
# Verify update_share was called for both nodes
nodes_reindexed = [
call[0][0] for call in mock_update_share.call_args_list
if isinstance(call[0][0], AbstractNode)
]
nodes_reindexed = [node_reindexed for node_reindexed in all_reindexed if isinstance(node_reindexed, AbstractNode)]
assert len(nodes_reindexed) == 2
assert node_one in nodes_reindexed
assert node_two in nodes_reindexed

# Verify update_share was called for both preprints
preprints_reindexed = [
call[0][0] for call in mock_update_share.call_args_list
if isinstance(call[0][0], Preprint)
]
preprints_reindexed = [preprint_reindexed for preprint_reindexed in all_reindexed if isinstance(preprint_reindexed, Preprint)]
assert len(preprints_reindexed) == 2
assert preprint_one in preprints_reindexed
assert preprint_two in preprints_reindexed

# Verify update_share was called for files belonging to user2's nodes and preprints
files_reindexed = [file_reindexed for file_reindexed in all_reindexed if isinstance(file_reindexed, OsfStorageFile)]
assert node_file in files_reindexed
assert preprint_file in files_reindexed

def test_cant_create_user_without_username(self):
u = OSFUser() # No username given
with pytest.raises(ValidationError):
Expand Down