diff --git a/fxsharing/shares/conftest.py b/fxsharing/shares/conftest.py index 706034e..58434c8 100644 --- a/fxsharing/shares/conftest.py +++ b/fxsharing/shares/conftest.py @@ -6,8 +6,7 @@ @pytest.fixture(autouse=True) def mock_celery_tasks(): with ( - patch("fxsharing.shares.views.fetch_link_preview", autospec=True), - patch("fxsharing.shares.views.submit_link_to_cinder", autospec=True), + patch("fxsharing.shares.views.process_new_share", autospec=True), patch("fxsharing.shares.views.submit_share_to_cinder", autospec=True), patch("fxsharing.shares.views.purge_cdn_cache", autospec=True), ): diff --git a/fxsharing/shares/tasks.py b/fxsharing/shares/tasks.py index a0e1d48..6954ff6 100644 --- a/fxsharing/shares/tasks.py +++ b/fxsharing/shares/tasks.py @@ -1,3 +1,4 @@ +import functools import mimetypes from urllib.parse import urljoin, urlparse @@ -5,7 +6,7 @@ import requests from bs4 import BeautifulSoup -from celery import shared_task +from celery import group, shared_task from celery.contrib.django.task import DjangoTask from celery.utils.log import get_task_logger from google.cloud import storage @@ -29,6 +30,17 @@ } +@functools.cache +def _get_gcs_client(): + """Return a process-wide GCS client, created lazily on first use. + + ``storage.Client()`` performs credential discovery and a metadata-server + round trip, so we reuse one client per worker process rather than build a + new one for every favicon. + """ + return storage.Client() + + def download_and_store_favicon(favicon_url, link_url, headers): """Download favicon and upload to GCS. Returns public URL or None on failure.""" bucket_name = settings.GCS_IMAGE_BUCKET @@ -73,7 +85,7 @@ def download_and_store_favicon(favicon_url, link_url, headers): hostname = urlparse(link_url).hostname object_name = f"favicons/{hostname}{ext}" - client = storage.Client() + client = _get_gcs_client() bucket = client.bucket(bucket_name) blob = bucket.blob(object_name) @@ -409,3 +421,56 @@ def purge_cdn_cache(shortcodes): ) response.raise_for_status() logger.info("purged CDN cache for shortcode=%s", shortcode) + + +def _all_link_ids(share): + """Yield link id strings for ``share`` and every nested share (depth-first).""" + for link_id in share.links.values_list("id", flat=True): + yield str(link_id) + for nested in share.nested_shares.all(): + yield from _all_link_ids(nested) + + +def _cinder_signatures(link_ids): + """Return Cinder submission signatures for ``link_ids``. + + Returns an empty list (and logs) when Cinder is not configured, so callers + can unconditionally concatenate the result into a larger dispatch group. + """ + for name in ("CINDER_URL", "CINDER_API_TOKEN", "CINDER_API_ENDPOINT"): + if not getattr(settings, name): + logger.error("%s is not set!", name) + return [] + + return [submit_link_to_cinder.s(link_id) for link_id in link_ids] + + +@shared_task(base=BaseTaskWithRetry) +def process_new_share(share_id): + """Fan out preview + safety processing for a newly created share. + + Enqueued once per create-share request (after commit), so the web request + makes a single broker round trip regardless of how many links the + collection holds. The per-link ``fetch_link_preview`` tasks and Cinder + submissions are dispatched here, in the worker, as a single ``group`` so + the whole fan-out is one broker operation. Building one group rather than a + loop of ``.delay()`` calls also means a retry (the task auto-retries on + error) replays a single dispatch instead of re-enqueuing previews link by + link. + """ + from .models import Share + + try: + share = Share.objects.get(id=share_id) + except Share.DoesNotExist: + logger.warning("process_new_share: share %s does not exist", share_id) + return + + link_ids = list(_all_link_ids(share)) + if not link_ids: + return + + signatures = [fetch_link_preview.s(link_id) for link_id in link_ids] + signatures += _cinder_signatures(link_ids) + + group(signatures).apply_async() diff --git a/fxsharing/shares/tests.py b/fxsharing/shares/tests.py index 2acb51d..b13ba7d 100644 --- a/fxsharing/shares/tests.py +++ b/fxsharing/shares/tests.py @@ -216,6 +216,26 @@ def test_creates_links(self): share = Share.objects.get() assert share.links.count() == 2 + def test_enqueues_single_dispatch_regardless_of_link_count(self): + from fxsharing.shares import views + + payload = { + "type": "tabs", + "title": "My Links", + "links": [ + {"url": f"https://example.com/{i}", "title": f"Link {i}"} + for i in range(10) + ], + } + response = self.client.post( + reverse("create_share"), + data=json.dumps(payload), + content_type="application/json", + ) + assert response.status_code == 201 + share = Share.objects.get() + views.process_new_share.delay_on_commit.assert_called_once_with(str(share.id)) + def test_duplicate_request_creates_distinct_share(self): payload = { "type": "tabs", @@ -1567,38 +1587,119 @@ def test_unknown_share_id_is_noop(self): assert mock_post.call_count == 0 +class TestProcessNewShare(TestCase): + @classmethod + def setUpTestData(cls): + cls.user = User.objects.create_user(fxa_id="a1b2c3d4e5f6dispatch") + + def test_dispatches_single_group_covering_every_link_including_nested(self): + from fxsharing.shares import tasks + + share = Share.objects.create(title="top", user=self.user, type="tabs") + Link.objects.create(share=share, url="https://a.example") + nested = Share.objects.create( + title="nested", user=self.user, type="bookmarks", parent_share=share + ) + Link.objects.create(share=nested, url="https://b.example") + + with ( + patch.object(tasks, "fetch_link_preview", autospec=True), + patch.object(tasks, "group", autospec=True) as mock_group, + patch.object(tasks, "_cinder_signatures", autospec=True, return_value=[]), + ): + tasks.process_new_share(str(share.id)) + + enqueued = { + call.args[0] for call in tasks.fetch_link_preview.s.call_args_list + } + mock_group.assert_called_once() + mock_group.return_value.apply_async.assert_called_once_with() + + expected = {str(link.id) for link in Link.objects.all()} + assert enqueued == expected + + def test_includes_cinder_signatures_in_the_group(self): + from fxsharing.shares import tasks + + share = Share.objects.create(title="top", user=self.user, type="tabs") + Link.objects.create(share=share, url="https://a.example") + + with ( + patch.object(tasks, "fetch_link_preview", autospec=True), + patch.object(tasks, "group", autospec=True) as mock_group, + patch.object( + tasks, "_cinder_signatures", autospec=True, return_value=["cinder-sig"] + ) as mock_cinder, + ): + tasks.process_new_share(str(share.id)) + + link_ids = [str(link.id) for link in share.links.all()] + mock_cinder.assert_called_once_with(link_ids) + (signatures,) = mock_group.call_args.args + assert "cinder-sig" in signatures + + def test_no_links_dispatches_nothing(self): + from fxsharing.shares import tasks + + share = Share.objects.create(title="empty", user=self.user, type="tabs") + + with ( + patch.object(tasks, "fetch_link_preview", autospec=True), + patch.object(tasks, "group", autospec=True) as mock_group, + patch.object(tasks, "_cinder_signatures", autospec=True) as mock_cinder, + ): + tasks.process_new_share(str(share.id)) + assert mock_group.call_count == 0 + assert mock_cinder.call_count == 0 + + def test_missing_share_is_a_noop(self): + from fxsharing.shares import tasks + + with ( + patch.object(tasks, "fetch_link_preview", autospec=True), + patch.object(tasks, "group", autospec=True) as mock_group, + ): + tasks.process_new_share("00000000-0000-0000-0000-000000000000") + assert tasks.fetch_link_preview.s.call_count == 0 + assert mock_group.call_count == 0 + + @override_settings( CINDER_URL="https://cinder.example.test", CINDER_API_TOKEN="t", # noqa: S106 CINDER_API_ENDPOINT="https://cinder.example.test/api/v2/workflows/event/", ) -class TestCheckLinkSharingQuality(TestCase): +class TestCinderSignatures(TestCase): @classmethod def setUpTestData(cls): cls.user = User.objects.create_user(fxa_id="a1b2c3d4e5f6group") - def test_enqueues_one_signature_per_link(self): - from fxsharing.shares import views + def test_one_signature_per_link(self): + from fxsharing.shares import tasks share = Share.objects.create(title="t", user=self.user, type="tabs") Link.objects.create(share=share, url="https://a.example") Link.objects.create(share=share, url="https://b.example") - views.check_link_sharing_quality(share) + link_ids = [str(link.id) for link in share.links.all()] - link_ids = {str(link.id) for link in share.links.all()} - called_with = { - call.args[0] for call in views.submit_link_to_cinder.s.call_args_list - } - assert called_with == link_ids + with patch.object(tasks, "submit_link_to_cinder", autospec=True): + tasks._cinder_signatures(link_ids) + called_with = { + call.args[0] for call in tasks.submit_link_to_cinder.s.call_args_list + } + assert called_with == set(link_ids) - def test_no_enqueue_when_cinder_url_unset(self): - from fxsharing.shares import views + def test_empty_when_cinder_url_unset(self): + from fxsharing.shares import tasks - with override_settings(CINDER_URL=""): - share = Share.objects.create(title="t", user=self.user, type="tabs") - Link.objects.create(share=share, url="https://a.example") - views.check_link_sharing_quality(share) - assert views.submit_link_to_cinder.s.call_count == 0 + with ( + override_settings(CINDER_URL=""), + patch.object(tasks, "submit_link_to_cinder", autospec=True), + ): + assert ( + tasks._cinder_signatures(["00000000-0000-0000-0000-000000000000"]) == [] + ) + assert tasks.submit_link_to_cinder.s.call_count == 0 def _fake_getaddrinfo(host_to_ip): diff --git a/fxsharing/shares/views.py b/fxsharing/shares/views.py index 87ebf23..584ddf0 100644 --- a/fxsharing/shares/views.py +++ b/fxsharing/shares/views.py @@ -22,7 +22,6 @@ from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST -from celery import group from jsonschema import ValidationError, validate from modern_csrf.decorators import csrf_protect @@ -37,12 +36,7 @@ from .cinder_schema import decision_created_schema from .models import Link, Share, ShareStatus from .share_schema import share_schema -from .tasks import ( - fetch_link_preview, - purge_cdn_cache, - submit_link_to_cinder, - submit_share_to_cinder, -) +from .tasks import process_new_share, purge_cdn_cache, submit_share_to_cinder log = logging.getLogger(__name__) @@ -148,41 +142,11 @@ def create_share_from_data(data, user, parent_share=None): elif obj.get("links"): create_share_from_data(obj, user=user, parent_share=share) - created_links = Link.objects.bulk_create(links) - for link in created_links: - fetch_link_preview.delay_on_commit(str(link.id)) + Link.objects.bulk_create(links) return share -def _all_link_ids(share): - """Yield link id strings for ``share`` and every nested share (depth-first).""" - for link_id in share.links.values_list("id", flat=True): - yield str(link_id) - for nested in share.nested_shares.all(): - yield from _all_link_ids(nested) - - -def check_link_sharing_quality(share): - # TODO: is this unnecessary? can we verify the env at the k8s level? - if not settings.CINDER_URL: - log.error("CINDER_URL is not set!") - return - if not settings.CINDER_API_TOKEN: - log.error("CINDER_API_TOKEN is not set!") - return - if not settings.CINDER_API_ENDPOINT: - log.error("CINDER_API_ENDPOINT is not set!") - return - - link_ids = list(_all_link_ids(share)) - if not link_ids: - return - - signatures = [submit_link_to_cinder.s(link_id) for link_id in link_ids] - transaction.on_commit(lambda: group(signatures).apply_async()) - - @require_POST @csrf_protect def create_share(request): @@ -220,7 +184,7 @@ def create_share(request): # Always create a fresh share page so a user can generate a new link # from the same tab group each time they share. share = create_share_from_data(data=data, user=request.user) - check_link_sharing_quality(share) + process_new_share.delay_on_commit(str(share.id)) metrics.share_created.add(1, {"outcome": "created"}) url = request.build_absolute_uri(f"/{share.shortcode}")