-
-
Notifications
You must be signed in to change notification settings - Fork 0
Add :: Lock based thread-safe adapter
#20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
AHReccese
wants to merge
7
commits into
dev
Choose a base branch
from
add/threadsafe_adapter
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b9ada30
Add thread-safe (`Lock` based) HTTP adapters for IPv4 and IPv6 support
AHReccese 8945942
add Lock adapters
AHReccese 5e76569
add Lock adapters
AHReccese f4c27e4
add comprehensive test suite for Lock-based adapter
AHReccese 76042f5
update test command
AHReccese 67e35ed
`CHANGELOG.md` updated
AHReccese 164c479
`README.md` updated
AHReccese File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,241 @@ | ||
| """Unit and concurrency tests for Lock-based thread-safe adapters.""" | ||
| import contextlib | ||
| import socket | ||
| import threading | ||
| import unittest | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from unittest.mock import patch, MagicMock | ||
|
|
||
| from requests.adapters import HTTPAdapter | ||
|
|
||
| from ipforce.adapters import IPv4LockAdapter, IPv6LockAdapter, _adapter_lock | ||
|
|
||
| MIXED_ADDR_RESULTS = [ | ||
| (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80)), | ||
| (socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 80)), | ||
| (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.1', 80)), | ||
| (socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('2001:db8::1', 80)), | ||
| ] | ||
|
|
||
| NUM_THREADS = 8 | ||
| SENDS_PER_THREAD = 20 | ||
|
|
||
|
|
||
| # ============================================================================ | ||
| # Unit tests | ||
| # ============================================================================ | ||
|
|
||
|
|
||
| class TestIPv4LockAdapter(unittest.TestCase): | ||
| """Test cases for IPv4LockAdapter.""" | ||
|
|
||
| def setUp(self): | ||
| """Set up test fixtures.""" | ||
| self.adapter = IPv4LockAdapter() | ||
|
|
||
| def test_ipv4_filtering_during_send(self): | ||
| """Test that IPv4LockAdapter filters only IPv4 addresses during send.""" | ||
| captured = [] | ||
|
|
||
| def mock_super_send(*args, **kwargs): | ||
| captured.extend(socket.getaddrinfo('example.com', 80)) | ||
| return MagicMock() | ||
|
|
||
| with patch('socket.getaddrinfo', return_value=MIXED_ADDR_RESULTS): | ||
| with patch.object(HTTPAdapter, 'send', mock_super_send): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertEqual(len(captured), 2) | ||
| for result in captured: | ||
| self.assertEqual(result[0], socket.AF_INET) | ||
|
|
||
| def test_cleanup_after_send(self): | ||
| """Test that getaddrinfo is restored after send.""" | ||
| original = socket.getaddrinfo | ||
|
|
||
| with patch.object(HTTPAdapter, 'send', return_value=MagicMock()): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertEqual(socket.getaddrinfo, original) | ||
|
|
||
| def test_cleanup_on_exception(self): | ||
| """Test that getaddrinfo is restored even when send raises.""" | ||
| original = socket.getaddrinfo | ||
|
|
||
| with patch.object(HTTPAdapter, 'send', side_effect=Exception("error")): | ||
| with self.assertRaises(Exception): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertEqual(socket.getaddrinfo, original) | ||
|
|
||
| def test_lock_is_acquired_during_send(self): | ||
| """Test that the adapter lock is held during the send call.""" | ||
| lock_was_held = [] | ||
|
|
||
| def mock_super_send(*args, **kwargs): | ||
| lock_was_held.append(_adapter_lock.locked()) | ||
| return MagicMock() | ||
|
|
||
| with patch.object(HTTPAdapter, 'send', mock_super_send): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertTrue(lock_was_held[0]) | ||
|
|
||
|
|
||
| class TestIPv6LockAdapter(unittest.TestCase): | ||
| """Test cases for IPv6LockAdapter.""" | ||
|
|
||
| def setUp(self): | ||
| """Set up test fixtures.""" | ||
| self.adapter = IPv6LockAdapter() | ||
|
|
||
| def test_ipv6_filtering_during_send(self): | ||
| """Test that IPv6LockAdapter filters only IPv6 addresses during send.""" | ||
| captured = [] | ||
|
|
||
| def mock_super_send(*args, **kwargs): | ||
| captured.extend(socket.getaddrinfo('example.com', 80)) | ||
| return MagicMock() | ||
|
|
||
| with patch('socket.getaddrinfo', return_value=MIXED_ADDR_RESULTS): | ||
| with patch.object(HTTPAdapter, 'send', mock_super_send): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertEqual(len(captured), 2) | ||
| for result in captured: | ||
| self.assertEqual(result[0], socket.AF_INET6) | ||
|
|
||
| def test_cleanup_after_send(self): | ||
| """Test that getaddrinfo is restored after send.""" | ||
| original = socket.getaddrinfo | ||
|
|
||
| with patch.object(HTTPAdapter, 'send', return_value=MagicMock()): | ||
| self.adapter.send(MagicMock()) | ||
|
|
||
| self.assertEqual(socket.getaddrinfo, original) | ||
|
|
||
|
|
||
| # ============================================================================ | ||
| # Concurrency tests | ||
| # ============================================================================ | ||
|
|
||
|
|
||
| def _run_concurrent_lock_test(adapter, expected_family): | ||
| """Run a barrier-synchronised concurrency test for a lock adapter.""" | ||
| barrier = threading.Barrier(NUM_THREADS) | ||
| lock = threading.Lock() | ||
| results = [] | ||
| errors = [] | ||
|
|
||
| mock_gai = MagicMock(return_value=MIXED_ADDR_RESULTS) | ||
|
|
||
| def mock_super_send(*args, **kwargs): | ||
| captured = list(socket.getaddrinfo('example.com', 80)) | ||
| for r in captured: | ||
| if r[0] != expected_family: | ||
| with lock: | ||
| errors.append( | ||
| "Expected family {exp}, got {got}".format(exp=expected_family, got=r[0]), | ||
| ) | ||
| with lock: | ||
| results.append(len(captured)) | ||
| return MagicMock() | ||
|
|
||
| def worker(_idx): | ||
| barrier.wait() | ||
| for _ in range(SENDS_PER_THREAD): | ||
| adapter.send(MagicMock()) | ||
|
|
||
| with patch('socket.getaddrinfo', mock_gai): | ||
| with patch.object(HTTPAdapter, 'send', mock_super_send): | ||
| with ThreadPoolExecutor(max_workers=NUM_THREADS) as pool: | ||
| futures = [pool.submit(worker, i) for i in range(NUM_THREADS)] | ||
| for f in as_completed(futures): | ||
| f.result() | ||
|
|
||
| return results, errors | ||
|
|
||
|
|
||
| class TestLockAdapterConcurrency(unittest.TestCase): | ||
| """Verify IPv4LockAdapter / IPv6LockAdapter under thread contention.""" | ||
|
|
||
| def test_concurrent_ipv4_sends(self): | ||
| """Multiple threads using IPv4LockAdapter simultaneously.""" | ||
| results, errors = _run_concurrent_lock_test( | ||
| IPv4LockAdapter(), socket.AF_INET, | ||
| ) | ||
| self.assertEqual(errors, []) | ||
| self.assertEqual(len(results), NUM_THREADS * SENDS_PER_THREAD) | ||
|
|
||
| def test_concurrent_ipv6_sends(self): | ||
| """Multiple threads using IPv6LockAdapter simultaneously.""" | ||
| results, errors = _run_concurrent_lock_test( | ||
| IPv6LockAdapter(), socket.AF_INET6, | ||
| ) | ||
| self.assertEqual(errors, []) | ||
|
|
||
| def test_getaddrinfo_restored_after_concurrent_sends(self): | ||
| """Verify socket.getaddrinfo is pristine after concurrent lock-adapter sends.""" | ||
| original = socket.getaddrinfo | ||
| adapter = IPv4LockAdapter() | ||
| barrier = threading.Barrier(NUM_THREADS) | ||
|
|
||
| def worker(_idx): | ||
| barrier.wait() | ||
| for _ in range(SENDS_PER_THREAD): | ||
| adapter.send(MagicMock()) | ||
|
|
||
| with patch.object(HTTPAdapter, 'send', return_value=MagicMock()): | ||
| with ThreadPoolExecutor(max_workers=NUM_THREADS) as pool: | ||
| futures = [pool.submit(worker, i) for i in range(NUM_THREADS)] | ||
| for f in as_completed(futures): | ||
| f.result() | ||
|
|
||
| self.assertIs(socket.getaddrinfo, original) | ||
|
|
||
| def test_mixed_ipv4_ipv6_lock_adapters(self): | ||
| """IPv4 and IPv6 lock adapters running concurrently filter correctly.""" | ||
| lock4 = IPv4LockAdapter() | ||
| lock6 = IPv6LockAdapter() | ||
| barrier = threading.Barrier(NUM_THREADS) | ||
| data_lock = threading.Lock() | ||
| errors = [] | ||
| completed = [] | ||
|
|
||
| mock_gai = MagicMock(return_value=MIXED_ADDR_RESULTS) | ||
|
|
||
| def mock_super_send(*args, **kwargs): | ||
| results = socket.getaddrinfo('example.com', 80) | ||
| families = set(r[0] for r in results) | ||
| if len(families) > 1: | ||
| with data_lock: | ||
| errors.append("Mixed families in single send: {f}".format(f=families)) | ||
| return MagicMock() | ||
|
|
||
| def v4_worker(): | ||
| barrier.wait() | ||
| for _ in range(SENDS_PER_THREAD): | ||
| lock4.send(MagicMock()) | ||
| with data_lock: | ||
| completed.append('v4') | ||
|
|
||
| def v6_worker(): | ||
| barrier.wait() | ||
| for _ in range(SENDS_PER_THREAD): | ||
| lock6.send(MagicMock()) | ||
| with data_lock: | ||
| completed.append('v6') | ||
|
|
||
| with patch('socket.getaddrinfo', mock_gai): | ||
| with patch.object(HTTPAdapter, 'send', mock_super_send): | ||
| with ThreadPoolExecutor(max_workers=NUM_THREADS) as pool: | ||
| half = NUM_THREADS // 2 | ||
| futures = ( | ||
| [pool.submit(v4_worker) for _ in range(half)] + | ||
| [pool.submit(v6_worker) for _ in range(NUM_THREADS - half)] | ||
| ) | ||
| for f in as_completed(futures): | ||
| f.result() | ||
|
|
||
| self.assertEqual(errors, []) | ||
| self.assertEqual(len(completed), NUM_THREADS) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param gargs: additional list arguments for the original_getaddrinfo function
:param gkwargs: additional keyword arguments for the original_getaddrinfo function