From 83caf6a47c0769e8d19125e4c73f21f40f5f326e Mon Sep 17 00:00:00 2001 From: sammiee5311 Date: Wed, 29 Apr 2026 00:30:32 +0900 Subject: [PATCH] Add access checks to issue_redirector and coverage_report handlers issue_redirector previously fetched testcases by ID without any access check, exposing bug-tracker URLs (including for security-sensitive testcases) to anyone able to iterate testcase IDs. Switch to access.check_access_and_get_testcase, matching the rest of the testcase handlers. coverage_report's @handler.oauth decorator only populates an email; it does not authorize. Add access.has_access(job_type=...) before resolving the report URL so non-authorized users cannot pull coverage maps for projects they do not own. --- src/appengine/handlers/coverage_report.py | 19 +++++- src/appengine/handlers/issue_redirector.py | 3 +- .../handlers/coverage_report_test.py | 66 +++++++++++++++++++ .../handlers/issue_redirector_test.py | 38 ++++++++--- 4 files changed, 113 insertions(+), 13 deletions(-) diff --git a/src/appengine/handlers/coverage_report.py b/src/appengine/handlers/coverage_report.py index 7c67cf55935..5f8e0d1f6c1 100644 --- a/src/appengine/handlers/coverage_report.py +++ b/src/appengine/handlers/coverage_report.py @@ -20,6 +20,7 @@ from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.metrics import fuzzer_stats from handlers import base_handler +from libs import access from libs import handler from libs import helpers @@ -47,8 +48,8 @@ def _get_project_report_url(job, date): return info.html_report_url -def get_report_url(report_type, argument, date): - """Get report url for a redirect from the coverage report handler.""" +def _validate_args(report_type, argument, date): + """Validate request arguments and return the job name.""" # It's very easy to add support for per fuzzer reports, but we don't need it. if report_type != 'job': raise helpers.EarlyExitError('Invalid report type.', 400) @@ -63,6 +64,12 @@ def get_report_url(report_type, argument, date): if not date or not VALID_DATE_REGEX.match(date): raise helpers.EarlyExitError('Invalid date.', 400) + return job + + +def get_report_url(report_type, argument, date): + """Get report url for a redirect from the coverage report handler.""" + job = _validate_args(report_type, argument, date) return _get_project_report_url(job, date) @@ -74,7 +81,13 @@ class Handler(base_handler.Handler): @handler.oauth def get(self, report_type=None, argument=None, date=None, extra=None): """Handle a get request.""" - report_url = get_report_url(report_type, argument, date) + job = _validate_args(report_type, argument, date) + + if not access.has_access(job_type=job): + raise helpers.AccessDeniedError( + "You don't have access to this coverage report.") + + report_url = _get_project_report_url(job, date) if report_url: return self.redirect(report_url) raise helpers.EarlyExitError('Failed to get coverage report.', 400) diff --git a/src/appengine/handlers/issue_redirector.py b/src/appengine/handlers/issue_redirector.py index f66adc80fcb..a65ab3e6e56 100644 --- a/src/appengine/handlers/issue_redirector.py +++ b/src/appengine/handlers/issue_redirector.py @@ -16,6 +16,7 @@ from clusterfuzz._internal.issue_management import issue_tracker_utils from handlers import base_handler +from libs import access from libs import helpers @@ -24,7 +25,7 @@ class Handler(base_handler.Handler): def get(self, testcase_id=None): """Redirect user to the correct URL.""" - testcase = helpers.get_testcase(testcase_id) + testcase = access.check_access_and_get_testcase(testcase_id) issue_url = helpers.get_or_exit( lambda: issue_tracker_utils.get_issue_url(testcase), 'Issue tracker for testcase (id=%s) is not found.' % testcase_id, diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/coverage_report_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/coverage_report_test.py index df6414d3631..a8cae530c86 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/coverage_report_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/coverage_report_test.py @@ -15,6 +15,9 @@ import datetime import unittest +import flask +import webtest + from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.tests.test_libs import helpers from clusterfuzz._internal.tests.test_libs import test_utils @@ -63,3 +66,66 @@ def test_get_none(self): report_url = coverage_report.get_report_url('job', 'fake_job', 'latest') expected_url = None self.assertEqual(expected_url, report_url) + + +class HandlerAccessTest(unittest.TestCase): + """Ensure the Handler enforces an access check before resolving the URL.""" + + def setUp(self): + helpers.patch_environ(self) + helpers.patch(self, [ + 'libs.access.has_access', + 'handlers.coverage_report._get_project_report_url', + ]) + + self.flaskapp = flask.Flask('testflask') + self.flaskapp.add_url_rule( + '/coverage-report///', + view_func=coverage_report.Handler.as_view('/coverage-report/')) + self.app = webtest.TestApp(self.flaskapp) + + def test_access_check_runs_before_resolving_url(self): + """Access denied returns 403, the URL is never resolved.""" + self.mock.has_access.return_value = False + + response = self.app.get( + '/coverage-report/job/job1/latest', + headers={'Accept': 'application/json'}, + expect_errors=True) + + self.assertEqual(403, response.status_int) + self.mock.has_access.assert_called_once_with(job_type='job1') + self.mock._get_project_report_url.assert_not_called() + + def test_access_granted_resolves_url(self): + """Access granted forwards to the URL resolver.""" + self.mock.has_access.return_value = True + self.mock._get_project_report_url.return_value = ( + 'https://report.example/index.html') + + response = self.app.get('/coverage-report/job/job1/latest') + + self.assertEqual(302, response.status_int) + self.assertEqual('https://report.example/index.html', + response.headers['Location']) + self.mock.has_access.assert_called_once_with(job_type='job1') + + def test_invalid_job_name_rejected_before_access_check(self): + """Invalid job names return 400 without invoking the access check.""" + response = self.app.get( + '/coverage-report/job/bad name!/latest', + headers={'Accept': 'application/json'}, + expect_errors=True) + + self.assertEqual(400, response.status_int) + self.mock.has_access.assert_not_called() + + def test_invalid_report_type_rejected(self): + """Non-job report types return 400.""" + response = self.app.get( + '/coverage-report/fuzzer/job1/latest', + headers={'Accept': 'application/json'}, + expect_errors=True) + + self.assertEqual(400, response.status_int) + self.mock.has_access.assert_not_called() diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py index 318074a89a1..911e15546ab 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py @@ -16,10 +16,12 @@ import unittest from unittest import mock +import flask import webtest from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.tests.test_libs import helpers as test_helpers +from handlers import issue_redirector class HandlerTest(unittest.TestCase): @@ -28,19 +30,20 @@ class HandlerTest(unittest.TestCase): def setUp(self): test_helpers.patch(self, [ 'clusterfuzz._internal.issue_management.issue_tracker_utils.get_issue_url', - 'libs.helpers.get_testcase', - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'libs.access.check_access_and_get_testcase', ]) - self.mock._is_running_on_app_engine.return_value = True # pylint: disable=protected-access - import server - self.app = webtest.TestApp(server.app) + self.flaskapp = flask.Flask('testflask') + self.flaskapp.add_url_rule( + '/issue/', + view_func=issue_redirector.Handler.as_view('/issue/')) + self.app = webtest.TestApp(self.flaskapp) def test_succeed(self): """Test redirection succeeds.""" testcase = data_types.Testcase() testcase.bug_information = '456789' - self.mock.get_testcase.return_value = testcase + self.mock.check_access_and_get_testcase.return_value = testcase self.mock.get_issue_url.return_value = 'http://google.com/456789' response = self.app.get('/issue/12345') @@ -48,13 +51,30 @@ def test_succeed(self): self.assertEqual(302, response.status_int) self.assertEqual('http://google.com/456789', response.headers['Location']) - self.mock.get_testcase.assert_has_calls([mock.call('12345')]) + self.mock.check_access_and_get_testcase.assert_has_calls( + [mock.call('12345')]) self.mock.get_issue_url.assert_has_calls([mock.call(testcase)]) def test_no_issue_url(self): """Test no issue url.""" - self.mock.get_testcase.return_value = data_types.Testcase() + self.mock.check_access_and_get_testcase.return_value = data_types.Testcase() self.mock.get_issue_url.return_value = '' - response = self.app.get('/issue/12345', expect_errors=True) + response = self.app.get( + '/issue/12345', + headers={'Accept': 'application/json'}, + expect_errors=True) self.assertEqual(404, response.status_int) + + def test_access_denied(self): + """Access denied returns 403 instead of leaking the issue URL.""" + from libs import helpers as libs_helpers + self.mock.check_access_and_get_testcase.side_effect = ( + libs_helpers.AccessDeniedError('Access denied')) + + response = self.app.get( + '/issue/12345', + headers={'Accept': 'application/json'}, + expect_errors=True) + self.assertEqual(403, response.status_int) + self.mock.get_issue_url.assert_not_called()