From 31c2c4b3eaeb9aff2f36ba8995b98711ea5f1445 Mon Sep 17 00:00:00 2001 From: Ben Gardiner Date: Mon, 27 Apr 2026 08:11:18 -0400 Subject: [PATCH] can, isotp, soft: threding fixes and small buffer device support improvements * try harder to drain before close to avoid warnings * unit tests to trigger issues encountered using soft sockt with slcan, candle, cantact on windows * threading robustness improvements * robustness improvements when using can adapters with small buffers * improvements to the heuristics for detecting can adapters with no hardware filtering --- .config/codespell_ignore.txt | 1 + scapy/contrib/cansocket_python_can.py | 198 +++- scapy/contrib/isotp/isotp_soft_socket.py | 119 +- test/contrib/cansocket_python_can.uts | 45 + test/contrib/isotp_soft_socket.uts | 1336 +++++++++++++++++++++- test/testsocket.py | 117 +- 6 files changed, 1743 insertions(+), 73 deletions(-) diff --git a/.config/codespell_ignore.txt b/.config/codespell_ignore.txt index 7ed3f0caa78..5ed4ca8ee2d 100644 --- a/.config/codespell_ignore.txt +++ b/.config/codespell_ignore.txt @@ -7,6 +7,7 @@ ba browseable byteorder cace +cantact cas ciph componet diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index 340104a4abb..6f483610e4d 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -23,7 +23,7 @@ from scapy.supersocket import SuperSocket from scapy.layers.can import CAN from scapy.packet import Packet -from scapy.error import warning +from scapy.error import warning, log_runtime from typing import ( List, Type, @@ -41,6 +41,36 @@ __all__ = ["CANSocket", "PythonCANSocket"] +# Interfaces with hardware or kernel-level CAN filtering. +# These keep bus-level filters for efficiency since the device/driver +# handles filtering before frames reach userspace. +# All other interfaces (USB adapters like candle, gs_usb, cantact; +# serial like slcan) only do software filtering in BusABC.recv() and +# need filters cleared at the bus level to avoid starving matching +# frames behind non-matching ones (echo frames, other bus traffic). +_HW_FILTERED_INTERFACES = frozenset([ + 'socketcan', 'kvaser', 'vector', 'pcan', 'ixxat', + 'nican', 'neovi', 'etas', 'systec', 'nixnet', +]) + + +def _is_sw_filtered(interface_key): + # type: (str) -> bool + """Return True if the bus identified by interface_key only does + software filtering (inside BusABC.recv). + + For such interfaces, bus-level filters must be cleared so that + bus.recv(timeout=0) returns ALL frames. Per-socket filtering + is then handled by distribute() via _matches_filters(). + + Without this, BusABC.recv(timeout=0) on software-filtered buses + (candle, gs_usb, cantact, slcan, etc.) can silently consume + one non-matching frame per call and return None, starving matching + frames that sit behind it in the USB/serial buffer. + """ + iface = interface_key.split('_')[0].lower() + return iface not in _HW_FILTERED_INTERFACES + class SocketMapper(object): """Internal Helper class to map a python-can bus object to @@ -57,6 +87,14 @@ def __init__(self, bus, sockets): self.bus = bus self.sockets = sockets self.closing = False + # Per-bus lock serializing read_bus() and send_bus(). + # On serial interfaces (slcan) the python-can Bus uses a single + # serial port for both recv() and send(). Without serialization, + # concurrent calls from different threads (TimeoutScheduler for + # recv, main thread for send) corrupt the serial byte stream, + # causing slcan parsing errors. The lock is per-mapper (per-bus) + # so different CAN buses are not blocked by each other. + self.bus_lock = threading.Lock() # Maximum time (seconds) to spend reading frames in one read_bus() # call. On serial interfaces (slcan) the final bus.recv(timeout=0) @@ -76,28 +114,43 @@ def read_bus(self): slcan serial timeout). This method limits total time spent reading so the TimeoutScheduler thread stays responsive. - This method intentionally does NOT hold pool_mutex so that - concurrent send() calls are not blocked during the serial I/O. + This method does NOT hold pool_mutex but DOES hold bus_lock to + serialize with send_bus(). This prevents concurrent serial I/O + on slcan interfaces while still allowing sends to other buses. """ if self.closing: return [] msgs = [] deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT - while True: - try: - msg = self.bus.recv(timeout=0) - if msg is None: + with self.bus_lock: + while True: + try: + msg = self.bus.recv(timeout=0) + if msg is None: + break + else: + msgs.append(msg) + if time.monotonic() >= deadline: + break + except Exception as e: + if not self.closing: + warning("[MUX] python-can exception caught: %s" % e) break - else: - msgs.append(msg) - if time.monotonic() >= deadline: - break - except Exception as e: - if not self.closing: - warning("[MUX] python-can exception caught: %s" % e) - break return msgs + def send_bus(self, msg): + # type: (can_Message) -> None + """Send a CAN message on the bus. + + Serialized with read_bus() via bus_lock to prevent concurrent + serial I/O on slcan interfaces. + + :param msg: python-can Message to send. + :raises can_CanError: If the underlying python-can Bus raises. + """ + with self.bus_lock: + self.bus.send(msg) + def distribute(self, msgs): # type: (List[can_Message]) -> None """Distribute received messages to all subscribed sockets.""" @@ -122,10 +175,10 @@ def internal_send(self, sender, msg): A given SocketWrapper wants to send a CAN message. The python-can Bus object is obtained from an internal pool of SocketMapper objects. - The given message is sent on the python-can Bus object and also - inserted into the message queues of all other SocketWrapper objects - which are connected to the same python-can bus object - by the SocketMapper. + The message is sent on the python-can Bus object via send_bus() + (serialized with read_bus() by bus_lock) and also inserted into + the message queues of all other SocketWrapper objects connected to + the same python-can bus object by the SocketMapper. :param sender: SocketWrapper which initiated a send of a CAN message :param msg: CAN message to be sent @@ -136,30 +189,43 @@ def internal_send(self, sender, msg): with self.pool_mutex: try: mapper = self.pool[sender.name] - mapper.bus.send(msg) - for sock in mapper.sockets: - if sock == sender: - continue - if not sock._matches_filters(msg): - continue - - with sock.lock: - sock.rx_queue.append(msg) except KeyError: warning("[SND] Socket %s not found in pool" % sender.name) - except can_CanError as e: - warning("[SND] python-can exception caught: %s" % e) + return + # Snapshot the peer sockets while under pool_mutex + peers = [s for s in mapper.sockets if s != sender] + + # Send on the bus outside pool_mutex but inside bus_lock. + # This serializes with read_bus() to prevent concurrent serial + # I/O on slcan interfaces, while still allowing multiplexing + # and sends on other buses to proceed concurrently. + try: + mapper.send_bus(msg) + except can_CanError as e: + warning("[SND] python-can exception caught: %s" % e) + return + + # Distribute to peer sockets (no need for pool_mutex here, + # we already have a snapshot of the peers list). + for sock in peers: + if not sock._matches_filters(msg): + continue + with sock.lock: + sock.rx_queue.append(msg) def multiplex_rx_packets(self): # type: () -> None """This calls the mux() function of all SocketMapper objects in this SocketPool """ - if time.monotonic() - self.last_call < 0.001: + now = time.monotonic() + if now - self.last_call < 0.001: # Avoid starvation if multiple threads are doing selects, since # this object is singleton and all python-CAN sockets are using # the same instance and locking the same locks. return + self.last_call = now + # Snapshot pool entries under the lock, then read from each bus # WITHOUT holding pool_mutex. On slow serial interfaces (slcan) # bus.recv(timeout=0) can take ~2-3ms per frame; holding the @@ -171,7 +237,6 @@ def multiplex_rx_packets(self): msgs = mapper.read_bus() if msgs: mapper.distribute(msgs) - self.last_call = time.monotonic() def register(self, socket, *args, **kwargs): # type: (SocketWrapper, Tuple[Any, ...], Dict[str, Any]) -> None @@ -198,12 +263,13 @@ def register(self, socket, *args, **kwargs): t = self.pool[k] t.sockets.append(socket) # Update bus-level filters to the union of all sockets' - # filters. For non-slcan interfaces (socketcan, kvaser, - # vector), this enables efficient hardware/kernel - # filtering. For slcan, the bus filters were already - # cleared on creation, so this is a no-op (all sockets - # on slcan share the unfiltered bus). - if not k.lower().startswith('slcan'): + # filters. For hardware-filtered interfaces (socketcan, + # kvaser, vector, pcan, ixxat), this enables efficient + # kernel/device filtering. For software-filtered + # interfaces (slcan, candle, gs_usb, cantact), the bus + # filters were already cleared on creation, so this is + # a no-op (all sockets share the unfiltered bus). + if not _is_sw_filtered(k): filters = [s.filters for s in t.sockets if s.filters is not None] if filters: @@ -211,21 +277,23 @@ def register(self, socket, *args, **kwargs): socket.name = k else: bus = can_Bus(*args, **kwargs) - # Serial interfaces like slcan only do software - # filtering inside BusABC.recv(): the recv loop reads - # one frame, finds it doesn't match, and returns - # None -- silently consuming serial bandwidth without - # returning the frame to the mux. This starves the - # mux on busy buses. + # Software-filtered interfaces (slcan, candle, gs_usb, + # cantact, etc.) only filter inside BusABC.recv(): the + # recv loop reads one frame, finds it doesn't match, + # and returns None -- silently consuming serial/USB + # bandwidth without returning the frame to the mux. + # On USB adapters with timeout=0 mapped to ~1ms reads, + # this means only one non-matching frame is consumed + # per poll cycle, starving matching ECU responses that + # sit behind echo frames in the hardware buffer. # - # For slcan, clear the filters from the bus so that - # bus.recv() returns ALL frames. Per-socket filtering - # in distribute() via _matches_filters() handles - # delivery. Other interfaces (socketcan, kvaser, - # vector, candle) perform efficient hardware/kernel - # filtering and should keep their bus-level filters. - if kwargs.get('can_filters') and \ - k.lower().startswith('slcan'): + # For all software-filtered interfaces, clear the bus + # filters so bus.recv() returns ALL frames. Per-socket + # filtering in distribute() via _matches_filters() + # handles delivery. Hardware-filtered interfaces + # (socketcan, kvaser, vector, pcan, ixxat) keep their + # bus-level filters for efficiency. + if kwargs.get('can_filters') and _is_sw_filtered(k): bus.set_filters(None) socket.name = k self.pool[k] = SocketMapper(bus, [socket]) @@ -242,17 +310,25 @@ def unregister(self, socket): if socket.name is None: raise TypeError("SocketWrapper.name should never be None") + mapper_to_shutdown = None with self.pool_mutex: try: t = self.pool[socket.name] t.sockets.remove(socket) if not t.sockets: t.closing = True - t.bus.shutdown() del self.pool[socket.name] + mapper_to_shutdown = t except KeyError: warning("Socket %s already removed from pool" % socket.name) + # Shutdown the bus outside pool_mutex. Acquire bus_lock to + # wait for any in-progress read_bus() or send_bus() to finish + # before shutting down the underlying transport. + if mapper_to_shutdown is not None: + with mapper_to_shutdown.bus_lock: + mapper_to_shutdown.bus.shutdown() + SocketsPool = _SocketsPool() @@ -341,6 +417,9 @@ def recv_raw(self, x=0xffff): """Returns a tuple containing (cls, pkt_data, time)""" msg = self.can_iface.recv() + if msg is None: + return self.basecls, None, None + hdr = msg.is_extended_id << 31 | msg.is_remote_frame << 30 | \ msg.is_error_frame << 29 | msg.arbitration_id @@ -382,7 +461,13 @@ def select(sockets, remain=conf.recv_poll_rate): :returns: an array of sockets that were selected and the function to be called next to get the packets (i.g. recv) """ + # Move kernel-buffered CAN frames into the per-socket rx_queues + # BEFORE checking which sockets are ready. The previous order + # (check, then multiplex) returned a stale ready-list that did + # not include sockets whose data had just been multiplexed, + # causing a one-iteration delay. SocketsPool.multiplex_rx_packets() + ready_sockets = \ [s for s in sockets if isinstance(s, PythonCANSocket) and len(s.can_iface.rx_queue)] @@ -401,6 +486,13 @@ def close(self): """Closes this socket""" if self.closed: return + # Final poll to ensure all kernel-buffered frames are distributed + # to any shared socket instances before we unregister. + try: + SocketsPool.multiplex_rx_packets() + except Exception: + log_runtime.debug("Exception during SocketsPool multiplex in close", + exc_info=True) super(PythonCANSocket, self).close() self.can_iface.shutdown() diff --git a/scapy/contrib/isotp/isotp_soft_socket.py b/scapy/contrib/isotp/isotp_soft_socket.py index e6baa1dbbf7..a5818bc1b67 100644 --- a/scapy/contrib/isotp/isotp_soft_socket.py +++ b/scapy/contrib/isotp/isotp_soft_socket.py @@ -13,7 +13,7 @@ import time import traceback from bisect import bisect_left -from threading import Thread, Event, RLock +from threading import Thread, Event, RLock, current_thread # Typing imports from typing import ( Optional, @@ -253,8 +253,10 @@ def schedule(cls, timeout, callback): heapq.heappush(cls._handles, handle) must_interrupt = cls._handles[0] == handle - # Start the scheduling thread if it is not started already - if cls._thread is None: + # Start the scheduling thread if it is not started already. + # Also recover if the thread reference is stale (thread died + # without clearing _thread — e.g. from a BaseException). + if cls._thread is None or not cls._thread.is_alive(): t = Thread(target=cls._task, name="TimeoutScheduler._task") t.daemon = True must_interrupt = False @@ -357,17 +359,45 @@ def _task(cls): time_empty = now # 100 ms of grace time before killing the thread if cls.GRACE < now - time_empty: - return + # Atomically check whether new handles arrived + # while we were deciding to die. schedule() + # holds _mutex when it checks _thread, so by + # holding _mutex here we ensure that either: + # (a) _handles is still empty → we set + # _thread = None and return, OR + # (b) a new handle was pushed → we stay alive. + # This closes the race window where schedule() + # saw _thread as not-None but the thread was + # about to die. + with cls._mutex: + if not cls._handles: + cls.logger.debug( + "Thread died @ %f", cls._time()) + cls._thread = None + return + # New handle(s) appeared — stay alive + time_empty = None + continue else: time_empty = None cls._wait(handle) cls._poll() - + except Exception: + cls.logger.exception( + "Thread died @ %f (exception)", cls._time()) finally: - # Worst case scenario: if this thread dies, the next scheduled - # timeout will start a new one - cls.logger.debug("Thread died @ %f", cls._time()) - cls._thread = None + # Clear _thread so the next schedule() call can start a + # fresh thread. Only clear if _thread still points to + # *this* thread; if schedule() already started a + # replacement thread between the normal-exit mutex release + # and this finally block, we must not overwrite the new + # reference. The normal-exit path (GRACE expiry) sets + # _thread = None inside the mutex before returning; this + # finally covers unexpected exits (exceptions, + # BaseException subclasses like SystemExit, etc.). + with cls._mutex: + if cls._thread is current_thread(): + cls._thread = None @classmethod def _poll(cls): @@ -551,10 +581,26 @@ def __init__(self, self.rx_tx_poll_rate = 0.005 self.tx_timeout_handle = None # type: Optional[TimeoutScheduler.Handle] # noqa: E501 self.rx_timeout_handle = None # type: Optional[TimeoutScheduler.Handle] # noqa: E501 + + # Drain frames that accumulated in the CAN adapter's hardware + # RX buffer while no ISOTP socket was active. USB adapters + # (candle, cantact) have small hardware buffers; if background + # CAN traffic fills the buffer before can_recv starts polling, + # the ECU's response frame may be silently dropped by the + # adapter. This drain frees buffer space *before* we send. + try: + self.can_socket.select([self.can_socket], 0) + except Exception: + log_isotp.debug("Exception during ISOTP socket drain select", + exc_info=True) + + # Schedule initial callbacks with timeout=0 so they fire on + # the very next TimeoutScheduler._poll() cycle, minimising + # the window during which the adapter buffer is unserviced. self.rx_handle = TimeoutScheduler.schedule( - self.rx_tx_poll_rate, self.can_recv) + 0, self.can_recv) self.tx_handle = TimeoutScheduler.schedule( - self.rx_tx_poll_rate, self._send) + 0, self._send) self.last_rx_call = 0.0 self.rx_start_time = 0.0 @@ -598,9 +644,22 @@ def _get_padding_size(pl_size): def can_recv(self): # type: () -> None + # Early exit for orphan callbacks: when close() races with + # can_recv on the TimeoutScheduler thread, the old handle may + # fire one last time after closed is set. Without this guard + # the orphan callback would consume CAN frames from the shared + # bus — frames that belong to the NEXT ISOTPSocket session. + if self.closed: + return self.last_rx_call = TimeoutScheduler._time() try: while self.can_socket.select([self.can_socket], 0): + # Re-check closed inside the loop: if close() set the + # flag while we were processing the previous frame, + # stop immediately to avoid consuming frames that + # belong to the next session. + if self.closed: + break pkt = self.can_socket.recv() if pkt: self.on_can_recv(pkt) @@ -642,20 +701,32 @@ def on_can_recv(self, p): def close(self): # type: () -> None + if self.closed: + return + + # Set closed flag FIRST to prevent orphan callbacks from + # consuming CAN frames meant for the next ISOTP session. + # The can_recv() and _send() methods check self.closed at + # entry AND inside their loops, so any in-progress callback + # on the TimeoutScheduler thread will exit promptly. + self.closed = True + + # Brief barrier: yield to the TimeoutScheduler thread so any + # currently-executing callback sees self.closed and exits. + time.sleep(0.005) + + # Diagnostic warnings (non-blocking) try: if select_objects([self.tx_queue], 0): log_isotp.warning("TX queue not empty") - time.sleep(0.1) - except OSError: + except (OSError, Scapy_Exception): pass - try: if select_objects([self.rx_queue], 0): log_isotp.warning("RX queue not empty") - except OSError: + except (OSError, Scapy_Exception): pass - self.closed = True try: self.rx_handle.cancel() except Scapy_Exception: @@ -674,6 +745,19 @@ def close(self): self.tx_timeout_handle.cancel() except Scapy_Exception: pass + + # Final drain: move frames from the CAN adapter's hardware + # buffer into the SocketWrapper software queue. This frees + # adapter buffer space so the NEXT ISOTP session's ECU + # response is not dropped due to hardware overflow. The + # frames stay in the SocketWrapper rx_queue (not lost) and + # will be available to the next session's can_recv. + try: + self.can_socket.select([self.can_socket], 0) + except Exception: + log_isotp.debug("Exception during ISOTP socket drain select", + exc_info=True) + try: self.rx_queue.close() except (OSError, EOFError): @@ -1052,6 +1136,9 @@ def begin_send(self, x): def _send(self): # type: () -> None + # Early exit for orphan callbacks (same rationale as can_recv). + if self.closed: + return try: if self.tx_state == ISOTP_IDLE: if select_objects([self.tx_queue], 0): diff --git a/test/contrib/cansocket_python_can.uts b/test/contrib/cansocket_python_can.uts index 78fa349d899..630c5d706d8 100644 --- a/test/contrib/cansocket_python_can.uts +++ b/test/contrib/cansocket_python_can.uts @@ -23,6 +23,9 @@ bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 typ = Load os ~ conf command +import sys +sys.path.append(".") +sys.path.append("test") import os import threading from subprocess import call @@ -618,3 +621,45 @@ if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]): if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]): raise Exception("vcan1 could not be deleted") + + ++ PythonCANSocket Extra Coverage (Platform Independent) + += _is_sw_filtered logic +from scapy.contrib.cansocket_python_can import _is_sw_filtered +assert _is_sw_filtered("slcan_0") +assert not _is_sw_filtered("socketcan_0") + += SocketsPool register / unregister / internal_send edge cases +import threading +from collections import deque +from scapy.contrib.cansocket_python_can import SocketsPool + +class DummyWrapper: + def __init__(self, name): + self.name = name + self.filters = None + self.lock = threading.Lock() + self.rx_queue = deque() + def _matches_filters(self, msg): return True + def shutdown(self): pass + +# internal_send not in pool +try: + SocketsPool.internal_send(DummyWrapper("none"), None) +except TypeError: + pass + +# register with interface (use virtual to avoid WinError on socketcan) +sock = DummyWrapper(None) +SocketsPool.register(sock, bustype="virtual", channel="vcan0") +assert sock.name == "virtual_vcan0" +SocketsPool.unregister(sock) + += PythonCANSocket.recv_raw with message +s = PythonCANSocket(bustype="virtual", channel="vcan0") +from can import Message as can_Message +msg = can_Message(arbitration_id=0x123, data=[1,2,3], is_extended_id=False) +s.can_iface.rx_queue.append(msg) +assert s.recv_raw()[1] is not None +s.close() diff --git a/test/contrib/isotp_soft_socket.uts b/test/contrib/isotp_soft_socket.uts index f112563d62a..684e30cc78e 100644 --- a/test/contrib/isotp_soft_socket.uts +++ b/test/contrib/isotp_soft_socket.uts @@ -10,7 +10,7 @@ from io import BytesIO from scapy.layers.can import * from scapy.contrib.isotp import * from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler -from test.testsocket import TestSocket, SlowTestSocket, cleanup_testsockets +from test.testsocket import TestSocket, SlowTestSocket, USBTestSocket, cleanup_testsockets with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: exec(f.read()) @@ -1658,7 +1658,1339 @@ assert result is not None, "MF response not received (slcan, can_filters, thread assert result.data == expected -+ Cleanup ++ ISOTP socket reuse tests (TimeoutScheduler race fix verification) +# These tests verify the fix for a race condition in TimeoutScheduler +# that caused sr1() timeouts when reusing CAN adapters across multiple +# ISOTP socket open/close cycles (Python 3.12+ on Windows with slcan/candle). +# +# The root cause was a race window in _task(): between _peek_next() +# returning None (GRACE expired) and the `return` statement, schedule() +# could see _thread as not-None and skip starting a new thread. The old +# thread then died, orphaning the newly pushed handles. +# +# The fix: _task() now holds _mutex when deciding to die, atomically +# checking _handles one more time. If schedule() pushed a new handle, +# _task() sees it and stays alive instead of dying. + += TimeoutScheduler race fix: schedule() during GRACE-expiry is handled + +# Verifies the fix for the race condition where schedule() could run +# while _task() was about to die after GRACE expiration. +# +# The fix: _task() now holds _mutex when deciding to die and checks +# _handles one more time. If schedule() pushed a new handle while +# the thread was in the GRACE countdown, _task() sees it and stays +# alive instead of dying. +# +# This test exercises the real (fixed) code path: +# 1. Schedule a handle, let it fire, clear all handles +# 2. Wait for GRACE to expire (thread enters shutdown path) +# 3. Schedule a new handle — with the fix, the thread stays alive +# or schedule() starts a fresh thread +# 4. Assert the callback fires + +import time as _time +from threading import Thread as _Thread, Event as _Event + +# Clean slate +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + +callback_fired = _Event() + +# Schedule a dummy handle to start the thread, let it fire +TimeoutScheduler.schedule(0.001, lambda: None) +_time.sleep(0.02) +# Clear all handles — thread enters GRACE countdown +TimeoutScheduler.clear() +# Wait past GRACE so thread is in the process of dying or already dead +_time.sleep(TimeoutScheduler.GRACE + 0.05) + +# Now schedule a real callback — with the fix, this must succeed +TimeoutScheduler.schedule(0.01, callback_fired.set) + +# The callback MUST fire — this is the fix verification +assert callback_fired.wait(timeout=5.0), \ + "TimeoutScheduler race fix failed: callback did not fire " \ + "after scheduling during GRACE expiry" + +# Cleanup +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += TimeoutScheduler race fix: rapid schedule after close/reopen cycle + +# Verifies that rapidly closing and reopening ISOTP sockets doesn't +# orphan TimeoutScheduler handles. Schedules handles, cancels them +# (simulating socket close), then immediately schedules new ones +# (simulating socket reopen). All new callbacks must fire. + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + +for trial in range(5): + fired = _Event() + # Simulate socket open: schedule handles + h1 = TimeoutScheduler.schedule(0.005, lambda: None) + h2 = TimeoutScheduler.schedule(0.005, lambda: None) + _time.sleep(0.02) # let them fire + # Simulate socket close: cancel remaining + try: + h1.cancel() + except Exception: + pass + try: + h2.cancel() + except Exception: + pass + # Simulate socket reopen: schedule new handle immediately + TimeoutScheduler.schedule(0.01, fired.set) + assert fired.wait(timeout=5.0), \ + "Trial %d: callback did not fire after cancel/reschedule" % trial + +# Cleanup +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += TimeoutScheduler race fix: ISOTP close/reopen with GRACE expiry + +# End-to-end test: open an ISOTPSoftSocket, close it, wait for GRACE +# to expire, then open a new one. With the fix, the second socket's +# callbacks (can_recv, _send) must be executed by the scheduler. + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + # Iteration 1: works fine + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + stop = _Event() + def ecu_r1(_stim=stim, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + t1 = _Thread(target=ecu_r1) + t1.start() + resp1 = isock.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + stop.set() + t1.join(timeout=5) + assert resp1 is not None, "Iteration 1 should succeed" + # Wait past GRACE so thread fully enters shutdown path + _time.sleep(TimeoutScheduler.GRACE + 0.05) + # Iteration 2: open a new ISOTPSocket — scheduler must handle it + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock2: + stop2 = _Event() + def ecu_r2(_stim=stim, _stop=stop2): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + t2 = _Thread(target=ecu_r2) + t2.start() + resp2 = isock2.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + stop2.set() + t2.join(timeout=5) + assert resp2 is not None, \ + "Iteration 2 timed out — TimeoutScheduler race fix failed" + +# Cleanup +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += TimeoutScheduler race fix: stress test across GRACE boundaries + +# Stress test: rapidly schedule/wait/clear across many iterations to +# exercise the GRACE-expiry atomic check under real thread scheduling. +# Each iteration lets the thread die (or nearly die), then schedules +# a new callback. All callbacks must fire. + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + +_missed = [] +for trial in range(20): + fired = _Event() + # Schedule and immediately let it fire + TimeoutScheduler.schedule(0.001, lambda: None) + _time.sleep(0.005) + # Clear handles — thread begins GRACE countdown + TimeoutScheduler.clear() + # Wait approximately GRACE time (sometimes under, sometimes over) + # to exercise both the "thread still alive" and "thread dead" paths + if trial % 2 == 0: + _time.sleep(TimeoutScheduler.GRACE + 0.02) + else: + _time.sleep(TimeoutScheduler.GRACE * 0.8) + # Schedule a new callback — must fire regardless of thread state + TimeoutScheduler.schedule(0.01, fired.set) + if not fired.wait(timeout=5.0): + _missed.append(trial) + +assert not _missed, \ + "Callbacks failed to fire on trials: %s" % _missed + +# Cleanup +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: both CANSocket and ISOTPSocket recreated each iteration + +# Reproduces the user's exact bug pattern: +# for m in messages: +# with CANSocket() as csock: +# with ISOTPSocket(csock) as isock: +# resp = isock.sr1(...) +# +# On Python 3.12+ on Windows, the 2nd/3rd iteration times out because +# the TimeoutScheduler thread dies during the close/reopen gap. +# On Linux, the race window is small so this test exercises the code +# path without deterministically triggering the race. + +def run_isotp_reuse_both_sockets_recreated(num_iterations=3): + """Run sr1 in a loop, recreating both sockets each time.""" + results = [] + for i in range(num_iterations): + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + stop = _Event() + def ecu_responder(_stim=stim, _stop=stop): + """Echo back a SF response for any received request.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + ecu_thread = _Thread(target=ecu_responder) + ecu_thread.start() + try: + resp = isock.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + results.append(resp) + finally: + stop.set() + ecu_thread.join(timeout=5) + # Brief pause between iterations — on 3.11.9 this works fine, + # on 3.12+ the TimeoutScheduler thread may die here + _time.sleep(0.05) + return results + +results = run_isotp_reuse_both_sockets_recreated(3) +for i, r in enumerate(results): + assert r is not None, \ + "Iteration %d timed out (both sockets recreated)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: CANSocket kept open, only ISOTPSocket recreated + +# Same bug but with the CANSocket kept across iterations. +# This proves the bug is in the ISOTP/TimeoutScheduler layer, +# not in the SocketsPool/CANSocket layer. + +def run_isotp_reuse_can_socket_kept(num_iterations=3): + """Run sr1 in a loop, keeping the CANSocket and recreating ISOTPSocket.""" + results = [] + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + for i in range(num_iterations): + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + stop = _Event() + def ecu_responder(_stim=stim, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + ecu_thread = _Thread(target=ecu_responder) + ecu_thread.start() + try: + resp = isock.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + results.append(resp) + finally: + stop.set() + ecu_thread.join(timeout=5) + # Brief pause — TimeoutScheduler thread may die here + _time.sleep(0.05) + return results + +results = run_isotp_reuse_can_socket_kept(3) +for i, r in enumerate(results): + assert r is not None, \ + "Iteration %d timed out (CANSocket kept, ISOTPSocket recreated)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: rapid open/close/reopen without GRACE delay + +# Variant where we do NOT wait between iterations. This tests the case +# where the TimeoutScheduler thread is still in the GRACE wait when new +# handles are scheduled. + +def run_isotp_reuse_rapid(num_iterations=3): + """Run sr1 in a loop with no delay between close and reopen.""" + results = [] + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + for i in range(num_iterations): + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + stop = _Event() + def ecu_responder(_stim=stim, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + ecu_thread = _Thread(target=ecu_responder) + ecu_thread.start() + try: + resp = isock.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + results.append(resp) + finally: + stop.set() + ecu_thread.join(timeout=5) + # NO delay — immediate reopen + return results + +results = run_isotp_reuse_rapid(3) +for i, r in enumerate(results): + assert r is not None, \ + "Iteration %d timed out (rapid reopen, no delay)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: CANSocket recreated with delay past GRACE period + +# Variant where we wait longer than GRACE (100ms) between iterations, +# ensuring the TimeoutScheduler thread has fully died. schedule() must +# reliably start a new thread. + +def run_isotp_reuse_post_grace(num_iterations=3): + """Run sr1 in a loop, waiting past GRACE between iterations.""" + results = [] + for i in range(num_iterations): + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + stop = _Event() + def ecu_responder(_stim=stim, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(CAN(identifier=0x7eb, + data=dhex("02 50 03"))) + ecu_thread = _Thread(target=ecu_responder) + ecu_thread.start() + try: + resp = isock.sr1(ISOTP(b'\x10\x03'), timeout=5, verbose=0) + results.append(resp) + finally: + stop.set() + ecu_thread.join(timeout=5) + # Wait past GRACE so thread is fully dead + _time.sleep(TimeoutScheduler.GRACE + 0.05) + return results + +results = run_isotp_reuse_post_grace(3) +for i, r in enumerate(results): + assert r is not None, \ + "Iteration %d timed out (post-GRACE delay)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: MF (multi-frame) response across iterations + +# The original bug report uses UDS 0x1003 (DiagnosticSessionControl) +# which returns a multi-frame response. This test verifies that MF +# responses work across multiple open/close cycles, using the same +# ECU simulation pattern as the cartesian product tests above. + +def run_isotp_mf_reuse(num_iterations=3, keep_can_socket=True): + """Run MF sr1 exchange in a loop.""" + response_data = dhex("620001666c61677b5544535f444154415f524541447d") + results = [] + cans_ctx = None + stim_ctx = None + if keep_can_socket: + cans_ctx = TestSocket(CAN) + stim_ctx = TestSocket(CAN) + cans_ctx.pair(stim_ctx) + for i in range(num_iterations): + if not keep_can_socket: + cans_ctx = TestSocket(CAN) + stim_ctx = TestSocket(CAN) + cans_ctx.pair(stim_ctx) + cans = cans_ctx + stim = stim_ctx + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + fc_received = _Event() + stop = _Event() + def ecu_mf_responder(_stim=stim, _fc=fc_received, _stop=stop): + """Send a multi-frame response after receiving FC.""" + _time.sleep(0.05) + _stim.send(CAN(identifier=0x7eb, + data=dhex("1016620001666c61"))) + _fc.wait(timeout=10.0) + if not _fc.is_set(): + return + _time.sleep(0.008) + _stim.send(CAN(identifier=0x7eb, + data=dhex("21677b5544535f44"))) + _time.sleep(0.010) + _stim.send(CAN(identifier=0x7eb, + data=dhex("224154415f524541"))) + _time.sleep(0.010) + _stim.send(CAN(identifier=0x7eb, + data=dhex("23447d"))) + with TestSocket(CAN) as ecu_mon: + cans.pair(ecu_mon) + def fc_watcher(_mon=ecu_mon, _fc=fc_received, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_mon], 0.1): + pkt = _mon.recv() + if pkt is not None and \ + pkt.identifier == 0x7e3 and \ + len(pkt.data) >= 1 and \ + bytes(pkt.data)[0] == 0x30: + _fc.set() + return + ecu_thread = _Thread(target=ecu_mf_responder) + fc_thread = _Thread(target=fc_watcher) + ecu_thread.start() + fc_thread.start() + try: + resp = isock.sr1(ISOTP(data=dhex("220001")), + retry=0, timeout=10.0, verbose=0) + results.append(resp) + finally: + stop.set() + fc_received.set() + ecu_thread.join(timeout=5) + fc_thread.join(timeout=5) + # Unpair ecu_mon before its context manager closes it; + # otherwise stim would try to send to a closed pipe + # on the next iteration since pair() is bidirectional. + try: + cans.paired_sockets.remove(ecu_mon) + except ValueError: + pass + # Cleanup scheduler between iterations to match real-world + # pattern where user code doesn't explicitly manage the scheduler + _time.sleep(0.05) + if not keep_can_socket: + cans_ctx.close() + stim_ctx.close() + if keep_can_socket: + cans_ctx.close() + stim_ctx.close() + return results, response_data + +results, expected = run_isotp_mf_reuse(3, keep_can_socket=True) +for i, r in enumerate(results): + assert r is not None, \ + "MF iteration %d timed out (CANSocket kept)" % (i + 1) + assert r.data == expected, \ + "MF iteration %d data mismatch (CANSocket kept)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += ISOTP socket reuse: MF response, both sockets recreated each iteration + +results, expected = run_isotp_mf_reuse(3, keep_can_socket=False) +for i, r in enumerate(results): + assert r is not None, \ + "MF iteration %d timed out (both recreated)" % (i + 1) + assert r.data == expected, \ + "MF iteration %d data mismatch (both recreated)" % (i + 1) + +# Cleanup scheduler +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + + += Orphan can_recv guard: closed ISOTP socket must not consume bus frames + +# Verifies the fix for the orphan-callback bug: +# When close() races with can_recv() on the TimeoutScheduler thread, +# the old handle can fire one last time after self.closed is set. +# Without the guard at the top of can_recv(), the orphan callback +# would call select() → multiplex_rx_packets() → recv() and +# consume response frames from the shared CAN bus that belong to +# the NEXT ISOTPSocket session. This causes the next sr1() to +# time out. +# +# This test deterministically reproduces the race: +# 1. Create an ISOTP socket, get its ISOTPSocketImplementation +# 2. Close the ISOTP socket (sets impl.closed = True) +# 3. Inject a response frame on the shared CAN bus +# 4. Call impl.can_recv() as if it were an orphan callback +# 5. Assert the CAN bus frame was NOT consumed + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + isock = ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) + impl = isock.impl + # Stop the background callbacks so we control timing + _ts = TimeoutScheduler._thread + TimeoutScheduler.clear() + if _ts is not None: + _ts.join(timeout=5) + # Close the ISOTP socket (simulates normal close) + isock.close() + assert impl.closed is True + # Now inject a response frame on the CAN bus + stim.send(CAN(identifier=0x7eb, data=dhex("02 50 03"))) + # Simulate the orphan callback firing after close + impl.can_recv() + # The CAN frame must still be on the bus (not consumed by orphan) + assert TestSocket.select([cans], 0.1), \ + "Frame was consumed by orphan can_recv — not available on bus" + pkt = cans.recv() + assert pkt is not None, "Frame was consumed by orphan can_recv" + assert pkt.identifier == 0x7eb + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Adapter buffer overflow: shared CANSocket, USB adapter FIFO fills between sessions + +# Reproduces the USB adapter hardware buffer overflow bug: +# +# Pattern: with CANSocket(): for msg: with ISOTPSocket(): +# +# Between close() of one ISOTPSocket and __init__() of the next, +# nobody calls select() on the CANSocket. On USB adapters (candle, +# cantact) the hardware endpoint FIFO is small (32-128 frames). +# Background CAN traffic fills it while no ISOTP socket is active. +# When the next ISOTPSocket sends a request, the ECU's response +# frame arrives but the FIFO is already full → silently dropped. +# slcan doesn't have this issue because the OS serial buffer is +# much larger (4096+ bytes). +# +# The fix: __init__() drains the adapter buffer via select() before +# scheduling callbacks, and close() drains again after setting +# self.closed. Both drains call multiplex_rx_packets() which +# moves frames from hardware FIFO to the software rx_queue, +# freeing space for the ECU's response. +# +# This test uses USBTestSocket with a small hw_fifo_size to make +# the overflow deterministic even without real hardware. + +def run_usb_buffer_overflow_test(num_iterations=5, hw_fifo_size=8, + bg_frames_per_gap=12, + disable_drain=False): + """Run sr1 in a loop with a shared USB-like CANSocket. + Between iterations, inject bg_frames_per_gap background frames + to fill/overflow the USB adapter's hardware FIFO. + If disable_drain is True, re-fill the FIFO after __init__ drains + it, reproducing the pre-fix behavior where no drain occurred. + """ + import time as _time + from threading import Thread as _Thread, Event as _Event + from scapy.layers.can import CAN as _CAN + from scapy.contrib.isotp import ISOTP as _ISOTP + from scapy.contrib.isotp.isotp_soft_socket import ISOTPSoftSocket as _ISOTPSoftSocket + from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler as _TS + from test.testsocket import TestSocket as _TestSocket, USBTestSocket as _USBTestSocket + _dhex = bytes.fromhex + results = [] + bg_ids = [0x062, 0x024, 0x039, 0x077, 0x098, 0x150] + with _USBTestSocket(_CAN, hw_fifo_size=hw_fifo_size) as cans, \ + _TestSocket(_CAN) as stim: + cans.pair(stim) + for iteration in range(num_iterations): + # Inject background traffic between sessions. + # On real hardware this comes from other ECUs on the bus. + # The frames go into the USB adapter's hardware FIFO. + # If nobody drains it, the FIFO overflows. + if iteration > 0: + for j in range(bg_frames_per_gap): + bid = bg_ids[j % len(bg_ids)] + stim.send(_CAN(identifier=bid, data=bytes(8))) + with _ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + if disable_drain and iteration > 0: + # Undo the drain that __init__ performed by + # re-injecting the same amount of background + # frames back into the FIFO. This simulates + # the old code that didn't drain at all. + for j in range(bg_frames_per_gap): + bid = bg_ids[j % len(bg_ids)] + stim.send(_CAN(identifier=bid, data=bytes(8))) + stop = _Event() + def ecu_responder(_stim=stim, _stop=stop, _it=iteration): + while not _stop.is_set(): + if _TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + _stim.send(_CAN(identifier=0x7eb, + data=_dhex("02 50 03"))) + ecu_thread = _Thread(target=ecu_responder) + ecu_thread.start() + try: + resp = isock.sr1(_ISOTP(b'\x10\x03'), timeout=2, verbose=0) + results.append(resp) + finally: + stop.set() + ecu_thread.join(timeout=5) + _time.sleep(0.02) + dropped = cans.dropped_count + return results, dropped + +# Test 1: With the fix (current code), all iterations should succeed. +# The drains in __init__/close() keep the FIFO from overflowing. +results, dropped = run_usb_buffer_overflow_test( + num_iterations=5, hw_fifo_size=8, bg_frames_per_gap=12, + disable_drain=False) +for i, r in enumerate(results): + assert r is not None, \ + "Iteration %d timed out (USB buffer overflow, drain enabled)" % (i + 1) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Adapter buffer overflow: without drain, USB FIFO overflows and responses are lost + +# Test 2: Simulate the pre-fix behavior by re-filling the FIFO after +# __init__ drains it. This proves that without the drain, the USB +# adapter FIFO overflows and ECU responses are dropped. +results, dropped = run_usb_buffer_overflow_test( + num_iterations=5, hw_fifo_size=8, bg_frames_per_gap=12, + disable_drain=True) +# With the drain disabled (simulated), at least one iteration should +# fail due to FIFO overflow. On real hardware this is the 50-50 +# failure pattern seen with candle/cantact adapters. +failures = sum(1 for r in results if r is None) +assert failures > 0 or dropped > 0, \ + "Expected at least one timeout or FIFO drop when drain is disabled, " \ + "got %d failures, %d drops" % (failures, dropped) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + + += Stress test: hundreds of SF exchanges on persistent ISOTPSoftSocket + +# Exercises a single ISOTPSoftSocket for many request-response cycles +# without closing/reopening. The socket must reliably dispatch all +# exchanges via the same TimeoutScheduler callbacks. +# Uses short timeout (2s) per exchange to surface any timing bugs. + +import time as _time +from threading import Thread as _Thread, Event as _Event + +NUM_SF_EXCHANGES = 200 + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + exchange_count = [0] + def sf_ecu(_stim=stim, _stop=stop, _count=exchange_count): + """Auto-respond to any SF request with a SF response.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + raw = bytes(pkt.data) + # Echo back the service ID + 0x40 + if len(raw) >= 2: + sid = raw[1] + _stim.send(CAN(identifier=0x7eb, + data=bytes([0x02, sid + 0x40, 0x03]))) + _count[0] += 1 + ecu_thread = _Thread(target=sf_ecu) + ecu_thread.start() + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + failures = [] + for i in range(NUM_SF_EXCHANGES): + resp = isock.sr1(ISOTP(b'\x10\x03'), timeout=2, verbose=0) + if resp is None: + failures.append(i) + stop.set() + ecu_thread.join(timeout=10) + +assert len(failures) == 0, \ + "SF stress test: %d/%d timed out, first failures: %s" % ( + len(failures), NUM_SF_EXCHANGES, failures[:10]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Stress test: hundreds of MF exchanges on persistent ISOTPSoftSocket + +# Same pattern but with multi-frame responses (4 CF). This exercises +# the full ISOTP state machine (FF → FC → CF×3) across many cycles +# on the same socket instance. + +NUM_MF_EXCHANGES = 100 + +def run_mf_stress(num_exchanges): + """Run num_exchanges MF request/response cycles on one ISOTPSoftSocket.""" + expected_data = dhex("620001666c61677b5544535f444154415f524541447d") + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + with TestSocket(CAN) as ecu_mon: + cans.pair(ecu_mon) + def mf_ecu(_stim=stim, _mon=ecu_mon, _stop=stop): + """For each request, send an MF response (FF + 3 CF).""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e3: + # Send FF + _stim.send(CAN(identifier=0x7eb, + data=dhex("1016620001666c61"))) + # Wait for FC from tester + fc_seen = False + deadline = _time.monotonic() + 5.0 + while not _stop.is_set() and _time.monotonic() < deadline: + if TestSocket.select([_mon], 0.05): + fp = _mon.recv() + if fp is not None and \ + fp.identifier == 0x7e3 and \ + len(fp.data) >= 1 and \ + bytes(fp.data)[0] == 0x30: + fc_seen = True + break + if not fc_seen: + continue + _time.sleep(0.002) + _stim.send(CAN(identifier=0x7eb, + data=dhex("21677b5544535f44"))) + _time.sleep(0.002) + _stim.send(CAN(identifier=0x7eb, + data=dhex("224154415f524541"))) + _time.sleep(0.002) + _stim.send(CAN(identifier=0x7eb, + data=dhex("23447d"))) + ecu_thread = _Thread(target=mf_ecu) + ecu_thread.start() + with ISOTPSoftSocket(cans, tx_id=0x7e3, rx_id=0x7eb) as isock: + failures = [] + mismatches = [] + for i in range(num_exchanges): + resp = isock.sr1(ISOTP(data=dhex("220001")), + retry=0, timeout=5, verbose=0) + if resp is None: + failures.append(i) + elif resp.data != expected_data: + mismatches.append(i) + stop.set() + ecu_thread.join(timeout=10) + # Unpair ecu_mon to avoid stale references + try: + cans.paired_sockets.remove(ecu_mon) + except ValueError: + pass + return failures, mismatches + +failures, mismatches = run_mf_stress(NUM_MF_EXCHANGES) +assert len(failures) == 0, \ + "MF stress test: %d/%d timed out, first failures: %s" % ( + len(failures), NUM_MF_EXCHANGES, failures[:10]) +assert len(mismatches) == 0, \ + "MF stress test: %d/%d data mismatch, first: %s" % ( + len(mismatches), NUM_MF_EXCHANGES, mismatches[:10]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Stress test: UDS-scanner-style varying service IDs on persistent socket (SF) + +# Mimics UDS_ServiceEnumerator: iterates through different service IDs +# on the SAME ISOTPSoftSocket without closing/reopening. Each sr1() +# sends a different UDS service request and the ECU simulator replies +# with the matching positive response. This exercises the rx_queue +# across varying hashret/answers pairs — a late response from service +# N could confuse service N+1 if state isn't handled properly. + +import time as _time +from threading import Thread as _Thread, Event as _Event + +SCAN_RANGE = list(range(0x10, 0x3F)) # 47 services +NUM_SCAN_CYCLES = 5 # Repeat the scan range 5 times = 235 total exchanges + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + def uds_ecu_sf(_stim=stim, _stop=stop): + """ECU that responds to each UDS service with positive response.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e0: + raw = bytes(pkt.data) + if len(raw) >= 2: + pci_len = raw[0] + sid = raw[1] + # Positive response: SID + 0x40 + resp_sid = (sid + 0x40) & 0xFF + _stim.send(CAN(identifier=0x7e8, + data=bytes([0x02, resp_sid, 0x00]))) + ecu_thread = _Thread(target=uds_ecu_sf) + ecu_thread.start() + with ISOTPSoftSocket(cans, tx_id=0x7e0, rx_id=0x7e8) as isock: + failures = [] + wrong_resp = [] + total = 0 + for cycle in range(NUM_SCAN_CYCLES): + for sid in SCAN_RANGE: + req = ISOTP(data=bytes([sid, 0x00])) + resp = isock.sr1(req, timeout=2, verbose=0) + if resp is None: + failures.append((cycle, sid, total)) + elif len(resp.data) < 1 or resp.data[0] != ((sid + 0x40) & 0xFF): + wrong_resp.append((cycle, sid, total, bytes(resp.data))) + total += 1 + stop.set() + ecu_thread.join(timeout=10) + +assert len(failures) == 0, \ + "UDS SF scan stress: %d/%d timed out, first: %s" % ( + len(failures), total, failures[:5]) +assert len(wrong_resp) == 0, \ + "UDS SF scan stress: %d/%d wrong response, first: %s" % ( + len(wrong_resp), total, wrong_resp[:5]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Stress test: UDS-scanner-style varying service IDs on persistent socket (MF) + +# Same pattern but some services return multi-frame responses. +# Services 0x22 (ReadDataByIdentifier) and 0x19 (ReadDTCInformation) +# return long MF responses; all others return SF. This tests the +# ISOTP state machine transitioning between SF and MF responses across +# many consecutive sr1() calls on the same socket. + +MF_SIDS = {0x22, 0x19} # Services that return multi-frame responses +NUM_MF_SCAN_CYCLES = 3 +MF_SCAN_RANGE = [0x10, 0x11, 0x19, 0x22, 0x27, 0x2E, 0x31, 0x3E] + +def run_uds_mf_scan_stress(num_cycles, scan_range, mf_sids): + """Scan with mixed SF/MF responses on a persistent socket.""" + mf_payload = dhex("0001AABBCCDD112233445566778899") # 15 bytes + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + with TestSocket(CAN) as ecu_mon: + cans.pair(ecu_mon) + def uds_ecu_mf(_stim=stim, _mon=ecu_mon, _stop=stop): + """ECU with mixed SF/MF responses per service ID.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is None or not hasattr(pkt, 'identifier'): + continue + if pkt.identifier != 0x7e0: + continue + raw = bytes(pkt.data) + if len(raw) < 2: + continue + # Only process Single Frame requests (PCI type 0). + # Ignore FC frames (PCI type 3 = 0x3X) that the + # tester sends back during MF exchanges. + pci_type = (raw[0] >> 4) & 0x0F + if pci_type != 0: + continue + sid = raw[1] + resp_sid = (sid + 0x40) & 0xFF + if sid in mf_sids: + # Multi-frame response: FF + CFs + # Build 16-byte payload: resp_sid + 15 bytes + resp_data = bytes([resp_sid]) + mf_payload + ff_data = bytes([0x10, len(resp_data)]) + resp_data[:6] + _stim.send(CAN(identifier=0x7e8, data=ff_data)) + # Wait for FC + fc_seen = False + deadline = _time.monotonic() + 5.0 + while not _stop.is_set() and _time.monotonic() < deadline: + if TestSocket.select([_mon], 0.05): + fp = _mon.recv() + if fp is not None and \ + fp.identifier == 0x7e0 and \ + len(fp.data) >= 1 and \ + (bytes(fp.data)[0] >> 4) == 3: + fc_seen = True + break + if not fc_seen: + continue + _time.sleep(0.002) + cf1 = bytes([0x21]) + resp_data[6:13] + _stim.send(CAN(identifier=0x7e8, data=cf1)) + _time.sleep(0.002) + cf2 = bytes([0x22]) + resp_data[13:16] + _stim.send(CAN(identifier=0x7e8, data=cf2)) + else: + # Single-frame response + _stim.send(CAN(identifier=0x7e8, + data=bytes([0x02, resp_sid, 0x00]))) + ecu_thread = _Thread(target=uds_ecu_mf) + ecu_thread.start() + with ISOTPSoftSocket(cans, tx_id=0x7e0, rx_id=0x7e8) as isock: + failures = [] + wrong_resp = [] + total = 0 + for cycle in range(num_cycles): + for sid in scan_range: + req = ISOTP(data=bytes([sid, 0x00])) + resp = isock.sr1(req, timeout=5, verbose=0) + expected_sid = (sid + 0x40) & 0xFF + if resp is None: + failures.append((cycle, sid, total)) + elif len(resp.data) < 1 or resp.data[0] != expected_sid: + wrong_resp.append((cycle, sid, total, bytes(resp.data))) + total += 1 + stop.set() + ecu_thread.join(timeout=10) + try: + cans.paired_sockets.remove(ecu_mon) + except ValueError: + pass + return failures, wrong_resp, total + +failures, wrong_resp, total = run_uds_mf_scan_stress( + NUM_MF_SCAN_CYCLES, MF_SCAN_RANGE, MF_SIDS) +assert len(failures) == 0, \ + "UDS MF scan stress: %d/%d timed out, first: %s" % ( + len(failures), total, failures[:5]) +assert len(wrong_resp) == 0, \ + "UDS MF scan stress: %d/%d wrong response, first: %s" % ( + len(wrong_resp), total, wrong_resp[:5]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Stress test: close/reopen cycles between service groups (scanner reconnect pattern) + +# Mimics UDS_Scanner reconnect_handler: after scanning one ECU state, +# close the ISOTPSoftSocket and create a new one (same CANSocket). +# This is the pattern that triggers the TimeoutScheduler race and +# orphan-callback bugs at scale. Each group does 10 service probes, +# then close/reopen. + +NUM_GROUPS = 20 +PROBES_PER_GROUP = 10 +GROUP_SCAN_RANGE = list(range(0x10, 0x10 + PROBES_PER_GROUP)) + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + def group_ecu(_stim=stim, _stop=stop): + """ECU that responds to any service probe.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e0: + raw = bytes(pkt.data) + if len(raw) >= 2: + sid = raw[1] + resp_sid = (sid + 0x40) & 0xFF + _stim.send(CAN(identifier=0x7e8, + data=bytes([0x02, resp_sid, 0x00]))) + ecu_thread = _Thread(target=group_ecu) + ecu_thread.start() + failures = [] + total = 0 + for group in range(NUM_GROUPS): + with ISOTPSoftSocket(cans, tx_id=0x7e0, rx_id=0x7e8) as isock: + for sid in GROUP_SCAN_RANGE: + req = ISOTP(data=bytes([sid, 0x00])) + resp = isock.sr1(req, timeout=2, verbose=0) + if resp is None: + failures.append((group, sid, total)) + total += 1 + # Brief pause between groups — mimics scanner state transition + _time.sleep(0.02) + stop.set() + ecu_thread.join(timeout=10) + +assert len(failures) == 0, \ + "Reconnect stress: %d/%d timed out, first: %s" % ( + len(failures), total, failures[:10]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + += Stress test: rapid close/reopen with no delay (worst-case reconnect) + +# Same as above but with ZERO delay between groups. This maximises +# the race window where the TimeoutScheduler thread may die during +# close and the next ISOTPSocket.__init__ needs it alive. + +NUM_RAPID_GROUPS = 30 +RAPID_PROBES = 5 + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + def rapid_ecu(_stim=stim, _stop=stop): + """ECU for rapid reconnect test.""" + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e0: + raw = bytes(pkt.data) + if len(raw) >= 2: + sid = raw[1] + _stim.send(CAN(identifier=0x7e8, + data=bytes([0x02, (sid + 0x40) & 0xFF, 0x00]))) + ecu_thread = _Thread(target=rapid_ecu) + ecu_thread.start() + failures = [] + total = 0 + for group in range(NUM_RAPID_GROUPS): + with ISOTPSoftSocket(cans, tx_id=0x7e0, rx_id=0x7e8) as isock: + for probe in range(RAPID_PROBES): + sid = 0x10 + (probe % 0x30) + req = ISOTP(data=bytes([sid, 0x00])) + resp = isock.sr1(req, timeout=2, verbose=0) + if resp is None: + failures.append((group, sid, total)) + total += 1 + # NO delay — immediate close/reopen + stop.set() + ecu_thread.join(timeout=10) + +assert len(failures) == 0, \ + "Rapid reconnect stress: %d/%d timed out, first: %s" % ( + len(failures), total, failures[:10]) + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +if _ts is not None: + _ts.join(timeout=5) + + += TimeoutScheduler thread death recovery: stale _thread reference must not block new threads + +# Simulates the scenario where the TimeoutScheduler thread dies from a +# BaseException (or any unexpected exit) that leaves _thread as a stale +# reference. The schedule() fix detects the dead thread via is_alive() +# and starts a fresh one, so ISOTP callbacks resume normally. + +import time as _time +from threading import Thread as _Thread, Event as _Event + +# Clean slate +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +_ts and _ts.join(timeout=5) + +# Schedule a simple callback to get the thread running +_fired = _Event() +def _dummy_cb(_e=_fired): + _e.set() + +TimeoutScheduler.schedule(0, _dummy_cb) +_fired.wait(timeout=2) +assert _fired.is_set(), "Initial callback did not fire" + +# Wait for the thread to go idle and die (GRACE = 0.1s) +_time.sleep(0.3) + +# Now simulate a stale _thread: set _thread to a dead Thread object. +# This mimics what would happen if _task() exited without clearing +# _thread (the bug that the finally block fixes). +with TimeoutScheduler._mutex: + _dead = _Thread(target=lambda: None) + _dead.start() + _dead.join() + assert not _dead.is_alive() + TimeoutScheduler._thread = _dead + +# schedule() should detect the dead thread and start a new one +_fired2 = _Event() +def _recovery_cb(_e=_fired2): + _e.set() + +TimeoutScheduler.schedule(0, _recovery_cb) +_fired2.wait(timeout=2) +assert _fired2.is_set(), \ + "Recovery callback did not fire — schedule() failed to detect dead thread" + +# Verify ISOTP sockets still work after thread recovery +def run_isotp_after_recovery(): + with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + stop = _Event() + def recovery_ecu(_stim=stim, _stop=stop): + while not _stop.is_set(): + if TestSocket.select([_stim], 0.05): + try: + pkt = _stim.recv() + except Exception: + break + if pkt is not None and hasattr(pkt, 'identifier'): + if pkt.identifier == 0x7e0: + raw = bytes(pkt.data) + if len(raw) >= 2: + sid = raw[1] + _stim.send(CAN(identifier=0x7e8, + data=bytes([0x02, (sid + 0x40) & 0xFF, 0x00]))) + ecu_thread = _Thread(target=recovery_ecu) + ecu_thread.start() + with ISOTPSoftSocket(cans, tx_id=0x7e0, rx_id=0x7e8) as isock: + for sid in [0x10, 0x11, 0x22]: + req = ISOTP(data=bytes([sid, 0x00])) + resp = isock.sr1(req, timeout=2, verbose=0) + assert resp is not None, \ + "ISOTP sr1 failed for SID 0x%02x after thread recovery" % sid + stop.set() + ecu_thread.join(timeout=10) + +run_isotp_after_recovery() + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +_ts and _ts.join(timeout=5) + += UDS_Scanner reset_handler leaked socket causes CAN frame theft + +# When reset_handler is a socket factory, the returned socket must be +# closed by reset_target(). If it is not, its can_recv callback +# steals CAN frames from the active session, causing sr1 timeouts. + +import time as _time +from threading import Thread as _Thread, Event as _Event +from scapy.contrib.automotive.uds import UDS + +COMMON_SIDs = [0x10, 0x11, 0x14, 0x19, 0x22, 0x27, 0x28, 0x3E, 0x85] + +leaked = [] +cans_isotp = TestSocket(CAN) +stim_isotp = TestSocket(CAN) +cans_isotp.pair(stim_isotp) +stop = _Event() + +def uds_ecu_nr_100(): + while not stop.is_set(): + if TestSocket.select([stim_isotp], 0.05): + try: + pkt = stim_isotp.recv() + except Exception: + break + if pkt is None or not hasattr(pkt, 'identifier'): + continue + if pkt.identifier != 0x7e0: + continue + raw_d = bytes(pkt.data) + if len(raw_d) < 2: + continue + if (raw_d[0] >> 4) != 0: + continue + sid = raw_d[1] + stim_isotp.send(CAN(identifier=0x7e8, data=bytes([0x03, 0x7f, sid, 0x13, 0x00, 0x00, 0x00, 0x00]))) + +def socket_factory_100(): + s = ISOTPSoftSocket(cans_isotp, tx_id=0x7e0, rx_id=0x7e8, basecls=UDS) + leaked.append(s) + return s + +ecu_thread = _Thread(target=uds_ecu_nr_100) +ecu_thread.start() +from scapy.contrib.automotive.uds_scan import UDS_ServiceEnumerator, UDS_Scanner +scanner = UDS_Scanner(socket_factory_100(), reconnect_handler=socket_factory_100, reset_handler=socket_factory_100, test_cases=[UDS_ServiceEnumerator], UDS_ServiceEnumerator_kwargs={"scan_range": COMMON_SIDs, "timeout": 2}) +scanner.scan() +stop.set() +ecu_thread.join(timeout=10) +enum = scanner.configuration.test_cases[0] +failures = [r for r in enum.results if r[2] is None] + +def cleanup_leaked(socks): + for s in socks: + try: + if not s.closed: + s.close() + except Exception: + pass + +cleanup_leaked(leaked) +cans_isotp.close() +stim_isotp.close() +assert len(failures) == 0, "UDS scanner had %d/%d timeouts (leaked reset_handler socket): first=%s" % (len(failures), len(enum.results), [(hex(r[1].service),) for r in failures[:5]]) + + ++ ISOTPSoftSocket Extra Coverage + += ISOTPSocketImplementation.can_send FD padding +with ISOTPSoftSocket(TestSocket(CAN), fd=True, padding=True) as s: + s.impl.can_send(b"\x00"*10) # Should pad to 12 + += ISOTPSocketImplementation.on_can_recv identifier mismatch +with ISOTPSoftSocket(TestSocket(CAN), rx_id=0x123) as s: + s.impl.on_can_recv(CAN(identifier=0x124, data=b"\x00")) + assert s.impl.filter_warning_emitted + += _rx_timer_handler timeout reset +with ISOTPSoftSocket(TestSocket(CAN), rx_id=0x123) as s: + s.impl.rx_state = 2 # ISOTP_WAIT_DATA + s.impl.rx_start_time = TimeoutScheduler._time() + s.impl._rx_timer_handler() + assert s.impl.rx_state == 2 # Still waiting due to extension + += _tx_timer_handler edge cases +with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x123) as s: + s.impl.tx_state = 1 # ISOTP_SENDING + s.impl.tx_buf = None + s.impl._tx_timer_handler() + assert s.impl.tx_state == 0 # ISOTP_IDLE + += _recv_fc / _recv_sf / _recv_ff / _recv_cf edge cases +with ISOTPSoftSocket(TestSocket(CAN), rx_id=0x123) as s: + # _recv_fc short + s.impl.tx_state = 2 # ISOTP_WAIT_FC + s.impl._recv_fc(b"\x30\x00") + assert s.impl.tx_state == 0 + # _recv_fc unknown + s.impl.tx_state = 2 + s.impl._recv_fc(b"\x3F\x00\x00") + assert s.impl.tx_state == 0 + # _recv_sf FD + s.impl.fd = True + s.impl._recv_sf(b"\x00\x01\xAA", 1.23) + # _recv_ff short + s.impl._recv_ff(b"\x10\x00\x00\x00\x00\x00", 1.23) + assert s.impl.rx_state == 0 + # _recv_ff 32-bit length + s.impl._recv_ff(b"\x10\x00\x00\x00\x00\x0A\xAA\xBB\xCC\xDD", 1.23) + assert s.impl.rx_len == 10 + # _recv_cf various + s.impl.rx_state = 3 # ISOTP_WAIT_DATA + s.impl.rx_ll_dl = 8 + s.impl._recv_cf(b"\x21\xAA\xBB\xCC\xDD\xEE\xFF\x00\x11") # Too long + assert s.impl.rx_state == 3 + s.impl.rx_sn = 2 + s.impl._recv_cf(b"\x21\xAA") # Wrong SN + assert s.impl.rx_state == 0 + += begin_send busy / too much data +with ISOTPSoftSocket(TestSocket(CAN), tx_id=0x123) as s: + s.impl.tx_state = 1 + s.impl.begin_send(b"data") # Busy + s.impl.tx_state = 0 + s.impl.begin_send(b"A" * 5000) # Too much data + + += Delete testsockets + +_ts = TimeoutScheduler._thread +TimeoutScheduler.clear() +_ts and _ts.join(timeout=5) = Delete testsockets diff --git a/test/testsocket.py b/test/testsocket.py index 1ecd79f4fec..e6577b15fb0 100644 --- a/test/testsocket.py +++ b/test/testsocket.py @@ -183,7 +183,8 @@ class SlowTestSocket(TestSocket): PythonCANSocket on a slow serial interface (like slcan). Frames sent to this socket go into an intermediate serial buffer. - They only become visible to recv()/select() after mux() moves + They only become visible to recv()/select() after _mux() moves + them to the rx ObjectPipe. Key parameters model the real slcan timing bottleneck: @@ -221,6 +222,7 @@ def __init__(self, basecls=None, frame_delay=0.0002, self.interface_name = interface_name from collections import deque self._serial_buffer = deque() # type: deque[bytes] + self._serial_lock = Lock() self._last_mux = 0.0 self._frame_delay = frame_delay @@ -258,6 +260,7 @@ def _mux(self): return # Phase 1: read_bus — read frames from serial buffer + msgs = [] deadline = time.monotonic() + self._read_time_limit \ if self._read_time_limit > 0 else None @@ -281,6 +284,7 @@ def _mux(self): break # Phase 2: distribute — apply per-socket filtering + for frame in msgs: if self._can_filters is not None: can_id = self._extract_can_id(frame) @@ -292,7 +296,8 @@ def _mux(self): def recv_raw(self, x=MTU): # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 - """Read from the rx ObjectPipe (populated by mux via select).""" + """Read from the rx ObjectPipe (populated by _mux via select).""" + return self.basecls, self._real_ins.recv(0), time.time() def send(self, x): @@ -344,6 +349,114 @@ def closed(self): return bool(self._owner._real_ins.closed) # type: ignore[attr-defined] +class USBTestSocket(TestSocket): + """A TestSocket that simulates the hardware RX FIFO of USB CAN + adapters like candle (gs_usb) and cantact. + + USB adapters have a small hardware endpoint buffer (typically + 32-128 frames). Frames that arrive when the buffer is full + are silently dropped by the adapter firmware. + + Frames sent to this socket go into a capacity-limited deque + (the "hardware FIFO"). They only become visible to recv()/select() + after _mux() moves them to the rx ObjectPipe — which happens when + ISOTPSocketImplementation calls can_socket.select() in its + can_recv callback, or in the drain calls in __init__/close. + + When nobody calls select/recv (e.g., between ISOTP socket close + and reopen), frames accumulate in the FIFO. Once the FIFO is + full, new frames are silently dropped, simulating hardware + overflow. + """ + + def __init__(self, basecls=None, hw_fifo_size=32): + # type: (Optional[Type[Packet]], int) -> None + """ + :param hw_fifo_size: Maximum number of frames in the simulated + hardware RX FIFO. Default 32 models a typical USB endpoint + buffer. Frames beyond this limit are silently dropped. + """ + super(USBTestSocket, self).__init__(basecls) + from collections import deque + self._hw_fifo = deque(maxlen=hw_fifo_size) # type: deque[bytes] + self._hw_lock = Lock() + self._real_ins = self.ins + self.ins = _USBPipeWrapper(self) # type: ignore[assignment] + self.dropped_count = 0 + + def _mux(self): + # type: () -> None + """Move frames from hardware FIFO to the rx ObjectPipe. + + This models the read path of PythonCANSocket.select() → + multiplex_rx_packets() → read_bus(). On real hardware this + is the USB bulk transfer that moves frames from the adapter + endpoint buffer into the host-side python-can rx_queue. + """ + with self._hw_lock: + while self._hw_fifo: + frame = self._hw_fifo.popleft() + self._real_ins.send(frame) + + def recv_raw(self, x=MTU): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 + return self.basecls, self._real_ins.recv(0), time.time() + + @staticmethod + def select(sockets, remain=conf.recv_poll_rate): + # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] + for s in sockets: + if isinstance(s, USBTestSocket): + s._mux() + return select_objects(sockets, remain) + + def close(self): + # type: () -> None + self.ins = self._real_ins + super(USBTestSocket, self).close() + + +class _USBPipeWrapper: + """Wrapper that routes incoming frames into the hardware FIFO. + + When the FIFO is full (maxlen reached), deque silently drops the + oldest frame — but real USB adapters drop the *newest* frame. + We track drops via owner.dropped_count so the test can verify + overflow occurred. + """ + def __init__(self, owner): + # type: (USBTestSocket) -> None + self._owner = owner + + def send(self, data): + # type: (bytes) -> None + with self._owner._hw_lock: + was_full = len(self._owner._hw_fifo) >= \ + (self._owner._hw_fifo.maxlen or 0) + if was_full: + # Drop the incoming frame (newest) like real hardware + self._owner.dropped_count += 1 + return + self._owner._hw_fifo.append(data) + + def recv(self, timeout=0): + # type: (int) -> Optional[bytes] + return self._owner._real_ins.recv(timeout) + + def fileno(self): + # type: () -> int + return self._owner._real_ins.fileno() + + def close(self): + # type: () -> None + self._owner._real_ins.close() + + @property + def closed(self): + # type: () -> bool + return bool(self._owner._real_ins.closed) # type: ignore[attr-defined] + + def cleanup_testsockets(): # type: () -> None """