Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions fxsharing/shares/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
@pytest.fixture(autouse=True)
def mock_celery_tasks():
with (
patch("fxsharing.shares.views.fetch_link_preview", autospec=True),
patch("fxsharing.shares.views.submit_link_to_cinder", autospec=True),
patch("fxsharing.shares.views.process_new_share", autospec=True),
patch("fxsharing.shares.views.submit_share_to_cinder", autospec=True),
patch("fxsharing.shares.views.purge_cdn_cache", autospec=True),
):
Expand Down
69 changes: 67 additions & 2 deletions fxsharing/shares/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import functools
import mimetypes
from urllib.parse import urljoin, urlparse

from django.conf import settings

import requests
from bs4 import BeautifulSoup
from celery import shared_task
from celery import group, shared_task
from celery.contrib.django.task import DjangoTask
from celery.utils.log import get_task_logger
from google.cloud import storage
Expand All @@ -29,6 +30,17 @@
}


@functools.cache
def _get_gcs_client():
"""Return a process-wide GCS client, created lazily on first use.

``storage.Client()`` performs credential discovery and a metadata-server
round trip, so we reuse one client per worker process rather than build a
new one for every favicon.
"""
return storage.Client()


def download_and_store_favicon(favicon_url, link_url, headers):
"""Download favicon and upload to GCS. Returns public URL or None on failure."""
bucket_name = settings.GCS_IMAGE_BUCKET
Expand Down Expand Up @@ -73,7 +85,7 @@ def download_and_store_favicon(favicon_url, link_url, headers):
hostname = urlparse(link_url).hostname
object_name = f"favicons/{hostname}{ext}"

client = storage.Client()
client = _get_gcs_client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(object_name)

Expand Down Expand Up @@ -409,3 +421,56 @@ def purge_cdn_cache(shortcodes):
)
response.raise_for_status()
logger.info("purged CDN cache for shortcode=%s", shortcode)


def _all_link_ids(share):
"""Yield link id strings for ``share`` and every nested share (depth-first)."""
for link_id in share.links.values_list("id", flat=True):
yield str(link_id)
for nested in share.nested_shares.all():
yield from _all_link_ids(nested)


def _cinder_signatures(link_ids):
"""Return Cinder submission signatures for ``link_ids``.

Returns an empty list (and logs) when Cinder is not configured, so callers
can unconditionally concatenate the result into a larger dispatch group.
"""
for name in ("CINDER_URL", "CINDER_API_TOKEN", "CINDER_API_ENDPOINT"):
if not getattr(settings, name):
logger.error("%s is not set!", name)
return []

return [submit_link_to_cinder.s(link_id) for link_id in link_ids]


@shared_task(base=BaseTaskWithRetry)
def process_new_share(share_id):
"""Fan out preview + safety processing for a newly created share.

Enqueued once per create-share request (after commit), so the web request
makes a single broker round trip regardless of how many links the
collection holds. The per-link ``fetch_link_preview`` tasks and Cinder
submissions are dispatched here, in the worker, as a single ``group`` so
the whole fan-out is one broker operation. Building one group rather than a
loop of ``.delay()`` calls also means a retry (the task auto-retries on
error) replays a single dispatch instead of re-enqueuing previews link by
link.
"""
from .models import Share

try:
share = Share.objects.get(id=share_id)
except Share.DoesNotExist:
logger.warning("process_new_share: share %s does not exist", share_id)
return

link_ids = list(_all_link_ids(share))
if not link_ids:
return

signatures = [fetch_link_preview.s(link_id) for link_id in link_ids]
signatures += _cinder_signatures(link_ids)

group(signatures).apply_async()
133 changes: 117 additions & 16 deletions fxsharing/shares/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,26 @@ def test_creates_links(self):
share = Share.objects.get()
assert share.links.count() == 2

def test_enqueues_single_dispatch_regardless_of_link_count(self):
from fxsharing.shares import views

payload = {
"type": "tabs",
"title": "My Links",
"links": [
{"url": f"https://example.com/{i}", "title": f"Link {i}"}
for i in range(10)
],
}
response = self.client.post(
reverse("create_share"),
data=json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 201
share = Share.objects.get()
views.process_new_share.delay_on_commit.assert_called_once_with(str(share.id))

def test_duplicate_request_creates_distinct_share(self):
payload = {
"type": "tabs",
Expand Down Expand Up @@ -1567,38 +1587,119 @@ def test_unknown_share_id_is_noop(self):
assert mock_post.call_count == 0


class TestProcessNewShare(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(fxa_id="a1b2c3d4e5f6dispatch")

def test_dispatches_single_group_covering_every_link_including_nested(self):
from fxsharing.shares import tasks

share = Share.objects.create(title="top", user=self.user, type="tabs")
Link.objects.create(share=share, url="https://a.example")
nested = Share.objects.create(
title="nested", user=self.user, type="bookmarks", parent_share=share
)
Link.objects.create(share=nested, url="https://b.example")

with (
patch.object(tasks, "fetch_link_preview", autospec=True),
patch.object(tasks, "group", autospec=True) as mock_group,
patch.object(tasks, "_cinder_signatures", autospec=True, return_value=[]),
):
tasks.process_new_share(str(share.id))

enqueued = {
call.args[0] for call in tasks.fetch_link_preview.s.call_args_list
}
mock_group.assert_called_once()
mock_group.return_value.apply_async.assert_called_once_with()

expected = {str(link.id) for link in Link.objects.all()}
assert enqueued == expected

def test_includes_cinder_signatures_in_the_group(self):
from fxsharing.shares import tasks

share = Share.objects.create(title="top", user=self.user, type="tabs")
Link.objects.create(share=share, url="https://a.example")

with (
patch.object(tasks, "fetch_link_preview", autospec=True),
patch.object(tasks, "group", autospec=True) as mock_group,
patch.object(
tasks, "_cinder_signatures", autospec=True, return_value=["cinder-sig"]
) as mock_cinder,
):
tasks.process_new_share(str(share.id))

link_ids = [str(link.id) for link in share.links.all()]
mock_cinder.assert_called_once_with(link_ids)
(signatures,) = mock_group.call_args.args
assert "cinder-sig" in signatures

def test_no_links_dispatches_nothing(self):
from fxsharing.shares import tasks

share = Share.objects.create(title="empty", user=self.user, type="tabs")

with (
patch.object(tasks, "fetch_link_preview", autospec=True),
patch.object(tasks, "group", autospec=True) as mock_group,
patch.object(tasks, "_cinder_signatures", autospec=True) as mock_cinder,
):
tasks.process_new_share(str(share.id))
assert mock_group.call_count == 0
assert mock_cinder.call_count == 0

def test_missing_share_is_a_noop(self):
from fxsharing.shares import tasks

with (
patch.object(tasks, "fetch_link_preview", autospec=True),
patch.object(tasks, "group", autospec=True) as mock_group,
):
tasks.process_new_share("00000000-0000-0000-0000-000000000000")
assert tasks.fetch_link_preview.s.call_count == 0
assert mock_group.call_count == 0


@override_settings(
CINDER_URL="https://cinder.example.test",
CINDER_API_TOKEN="t", # noqa: S106
CINDER_API_ENDPOINT="https://cinder.example.test/api/v2/workflows/event/",
)
class TestCheckLinkSharingQuality(TestCase):
class TestCinderSignatures(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(fxa_id="a1b2c3d4e5f6group")

def test_enqueues_one_signature_per_link(self):
from fxsharing.shares import views
def test_one_signature_per_link(self):
from fxsharing.shares import tasks

share = Share.objects.create(title="t", user=self.user, type="tabs")
Link.objects.create(share=share, url="https://a.example")
Link.objects.create(share=share, url="https://b.example")
views.check_link_sharing_quality(share)
link_ids = [str(link.id) for link in share.links.all()]

link_ids = {str(link.id) for link in share.links.all()}
called_with = {
call.args[0] for call in views.submit_link_to_cinder.s.call_args_list
}
assert called_with == link_ids
with patch.object(tasks, "submit_link_to_cinder", autospec=True):
tasks._cinder_signatures(link_ids)
called_with = {
call.args[0] for call in tasks.submit_link_to_cinder.s.call_args_list
}
assert called_with == set(link_ids)

def test_no_enqueue_when_cinder_url_unset(self):
from fxsharing.shares import views
def test_empty_when_cinder_url_unset(self):
from fxsharing.shares import tasks

with override_settings(CINDER_URL=""):
share = Share.objects.create(title="t", user=self.user, type="tabs")
Link.objects.create(share=share, url="https://a.example")
views.check_link_sharing_quality(share)
assert views.submit_link_to_cinder.s.call_count == 0
with (
override_settings(CINDER_URL=""),
patch.object(tasks, "submit_link_to_cinder", autospec=True),
):
assert (
tasks._cinder_signatures(["00000000-0000-0000-0000-000000000000"]) == []
)
assert tasks.submit_link_to_cinder.s.call_count == 0


def _fake_getaddrinfo(host_to_ip):
Expand Down
42 changes: 3 additions & 39 deletions fxsharing/shares/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST

from celery import group
from jsonschema import ValidationError, validate
from modern_csrf.decorators import csrf_protect

Expand All @@ -37,12 +36,7 @@
from .cinder_schema import decision_created_schema
from .models import Link, Share, ShareStatus
from .share_schema import share_schema
from .tasks import (
fetch_link_preview,
purge_cdn_cache,
submit_link_to_cinder,
submit_share_to_cinder,
)
from .tasks import process_new_share, purge_cdn_cache, submit_share_to_cinder

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -148,41 +142,11 @@ def create_share_from_data(data, user, parent_share=None):
elif obj.get("links"):
create_share_from_data(obj, user=user, parent_share=share)

created_links = Link.objects.bulk_create(links)
for link in created_links:
fetch_link_preview.delay_on_commit(str(link.id))
Link.objects.bulk_create(links)

return share


def _all_link_ids(share):
"""Yield link id strings for ``share`` and every nested share (depth-first)."""
for link_id in share.links.values_list("id", flat=True):
yield str(link_id)
for nested in share.nested_shares.all():
yield from _all_link_ids(nested)


def check_link_sharing_quality(share):
# TODO: is this unnecessary? can we verify the env at the k8s level?
if not settings.CINDER_URL:
log.error("CINDER_URL is not set!")
return
if not settings.CINDER_API_TOKEN:
log.error("CINDER_API_TOKEN is not set!")
return
if not settings.CINDER_API_ENDPOINT:
log.error("CINDER_API_ENDPOINT is not set!")
return

link_ids = list(_all_link_ids(share))
if not link_ids:
return

signatures = [submit_link_to_cinder.s(link_id) for link_id in link_ids]
transaction.on_commit(lambda: group(signatures).apply_async())


@require_POST
@csrf_protect
def create_share(request):
Expand Down Expand Up @@ -220,7 +184,7 @@ def create_share(request):
# Always create a fresh share page so a user can generate a new link
# from the same tab group each time they share.
share = create_share_from_data(data=data, user=request.user)
check_link_sharing_quality(share)
process_new_share.delay_on_commit(str(share.id))

metrics.share_created.add(1, {"outcome": "created"})
url = request.build_absolute_uri(f"/{share.shortcode}")
Expand Down
Loading