Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/codespell_ignore.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ba
browseable
byteorder
cace
cantact
cas
ciph
componet
Expand Down
198 changes: 145 additions & 53 deletions scapy/contrib/cansocket_python_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from scapy.supersocket import SuperSocket
from scapy.layers.can import CAN
from scapy.packet import Packet
from scapy.error import warning
from scapy.error import warning, log_runtime
from typing import (
List,
Type,
Expand All @@ -41,6 +41,36 @@

__all__ = ["CANSocket", "PythonCANSocket"]

# Interfaces with hardware or kernel-level CAN filtering.
# These keep bus-level filters for efficiency since the device/driver
# handles filtering before frames reach userspace.
# All other interfaces (USB adapters like candle, gs_usb, cantact;
# serial like slcan) only do software filtering in BusABC.recv() and
# need filters cleared at the bus level to avoid starving matching
# frames behind non-matching ones (echo frames, other bus traffic).
_HW_FILTERED_INTERFACES = frozenset([
'socketcan', 'kvaser', 'vector', 'pcan', 'ixxat',
'nican', 'neovi', 'etas', 'systec', 'nixnet',
])


def _is_sw_filtered(interface_key):
# type: (str) -> bool
"""Return True if the bus identified by interface_key only does
software filtering (inside BusABC.recv).

For such interfaces, bus-level filters must be cleared so that
bus.recv(timeout=0) returns ALL frames. Per-socket filtering
is then handled by distribute() via _matches_filters().

Without this, BusABC.recv(timeout=0) on software-filtered buses
(candle, gs_usb, cantact, slcan, etc.) can silently consume
one non-matching frame per call and return None, starving matching
frames that sit behind it in the USB/serial buffer.
"""
iface = interface_key.split('_')[0].lower()
return iface not in _HW_FILTERED_INTERFACES


class SocketMapper(object):
"""Internal Helper class to map a python-can bus object to
Expand All @@ -57,6 +87,14 @@ def __init__(self, bus, sockets):
self.bus = bus
self.sockets = sockets
self.closing = False
# Per-bus lock serializing read_bus() and send_bus().
# On serial interfaces (slcan) the python-can Bus uses a single
# serial port for both recv() and send(). Without serialization,
# concurrent calls from different threads (TimeoutScheduler for
# recv, main thread for send) corrupt the serial byte stream,
# causing slcan parsing errors. The lock is per-mapper (per-bus)
# so different CAN buses are not blocked by each other.
self.bus_lock = threading.Lock()

# Maximum time (seconds) to spend reading frames in one read_bus()
# call. On serial interfaces (slcan) the final bus.recv(timeout=0)
Expand All @@ -76,28 +114,43 @@ def read_bus(self):
slcan serial timeout). This method limits total time spent
reading so the TimeoutScheduler thread stays responsive.

This method intentionally does NOT hold pool_mutex so that
concurrent send() calls are not blocked during the serial I/O.
This method does NOT hold pool_mutex but DOES hold bus_lock to
serialize with send_bus(). This prevents concurrent serial I/O
on slcan interfaces while still allowing sends to other buses.
"""
if self.closing:
return []
msgs = []
deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT
while True:
try:
msg = self.bus.recv(timeout=0)
if msg is None:
with self.bus_lock:
while True:
try:
msg = self.bus.recv(timeout=0)
if msg is None:
break
else:
msgs.append(msg)
if time.monotonic() >= deadline:
break
except Exception as e:
if not self.closing:
warning("[MUX] python-can exception caught: %s" % e)
break
else:
msgs.append(msg)
if time.monotonic() >= deadline:
break
except Exception as e:
if not self.closing:
warning("[MUX] python-can exception caught: %s" % e)
break
return msgs

def send_bus(self, msg):
# type: (can_Message) -> None
"""Send a CAN message on the bus.

Serialized with read_bus() via bus_lock to prevent concurrent
serial I/O on slcan interfaces.

:param msg: python-can Message to send.
:raises can_CanError: If the underlying python-can Bus raises.
"""
with self.bus_lock:
self.bus.send(msg)

def distribute(self, msgs):
# type: (List[can_Message]) -> None
"""Distribute received messages to all subscribed sockets."""
Expand All @@ -122,10 +175,10 @@ def internal_send(self, sender, msg):

A given SocketWrapper wants to send a CAN message. The python-can
Bus object is obtained from an internal pool of SocketMapper objects.
The given message is sent on the python-can Bus object and also
inserted into the message queues of all other SocketWrapper objects
which are connected to the same python-can bus object
by the SocketMapper.
The message is sent on the python-can Bus object via send_bus()
(serialized with read_bus() by bus_lock) and also inserted into
the message queues of all other SocketWrapper objects connected to
the same python-can bus object by the SocketMapper.

:param sender: SocketWrapper which initiated a send of a CAN message
:param msg: CAN message to be sent
Expand All @@ -136,30 +189,43 @@ def internal_send(self, sender, msg):
with self.pool_mutex:
try:
mapper = self.pool[sender.name]
mapper.bus.send(msg)
for sock in mapper.sockets:
if sock == sender:
continue
if not sock._matches_filters(msg):
continue

with sock.lock:
sock.rx_queue.append(msg)
except KeyError:
warning("[SND] Socket %s not found in pool" % sender.name)
except can_CanError as e:
warning("[SND] python-can exception caught: %s" % e)
return
# Snapshot the peer sockets while under pool_mutex
peers = [s for s in mapper.sockets if s != sender]

# Send on the bus outside pool_mutex but inside bus_lock.
# This serializes with read_bus() to prevent concurrent serial
# I/O on slcan interfaces, while still allowing multiplexing
# and sends on other buses to proceed concurrently.
try:
mapper.send_bus(msg)
except can_CanError as e:
warning("[SND] python-can exception caught: %s" % e)
return

# Distribute to peer sockets (no need for pool_mutex here,
# we already have a snapshot of the peers list).
for sock in peers:
if not sock._matches_filters(msg):
continue
with sock.lock:
sock.rx_queue.append(msg)

def multiplex_rx_packets(self):
# type: () -> None
"""This calls the mux() function of all SocketMapper
objects in this SocketPool
"""
if time.monotonic() - self.last_call < 0.001:
now = time.monotonic()
if now - self.last_call < 0.001:
# Avoid starvation if multiple threads are doing selects, since
# this object is singleton and all python-CAN sockets are using
# the same instance and locking the same locks.
return
self.last_call = now

# Snapshot pool entries under the lock, then read from each bus
# WITHOUT holding pool_mutex. On slow serial interfaces (slcan)
# bus.recv(timeout=0) can take ~2-3ms per frame; holding the
Expand All @@ -171,7 +237,6 @@ def multiplex_rx_packets(self):
msgs = mapper.read_bus()
if msgs:
mapper.distribute(msgs)
self.last_call = time.monotonic()

def register(self, socket, *args, **kwargs):
# type: (SocketWrapper, Tuple[Any, ...], Dict[str, Any]) -> None
Expand All @@ -198,34 +263,37 @@ def register(self, socket, *args, **kwargs):
t = self.pool[k]
t.sockets.append(socket)
# Update bus-level filters to the union of all sockets'
# filters. For non-slcan interfaces (socketcan, kvaser,
# vector), this enables efficient hardware/kernel
# filtering. For slcan, the bus filters were already
# cleared on creation, so this is a no-op (all sockets
# on slcan share the unfiltered bus).
if not k.lower().startswith('slcan'):
# filters. For hardware-filtered interfaces (socketcan,
# kvaser, vector, pcan, ixxat), this enables efficient
# kernel/device filtering. For software-filtered
# interfaces (slcan, candle, gs_usb, cantact), the bus
# filters were already cleared on creation, so this is
# a no-op (all sockets share the unfiltered bus).
if not _is_sw_filtered(k):
filters = [s.filters for s in t.sockets
if s.filters is not None]
if filters:
t.bus.set_filters(reduce(add, filters))
socket.name = k
else:
bus = can_Bus(*args, **kwargs)
# Serial interfaces like slcan only do software
# filtering inside BusABC.recv(): the recv loop reads
# one frame, finds it doesn't match, and returns
# None -- silently consuming serial bandwidth without
# returning the frame to the mux. This starves the
# mux on busy buses.
# Software-filtered interfaces (slcan, candle, gs_usb,
# cantact, etc.) only filter inside BusABC.recv(): the
# recv loop reads one frame, finds it doesn't match,
# and returns None -- silently consuming serial/USB
# bandwidth without returning the frame to the mux.
# On USB adapters with timeout=0 mapped to ~1ms reads,
# this means only one non-matching frame is consumed
# per poll cycle, starving matching ECU responses that
# sit behind echo frames in the hardware buffer.
#
# For slcan, clear the filters from the bus so that
# bus.recv() returns ALL frames. Per-socket filtering
# in distribute() via _matches_filters() handles
# delivery. Other interfaces (socketcan, kvaser,
# vector, candle) perform efficient hardware/kernel
# filtering and should keep their bus-level filters.
if kwargs.get('can_filters') and \
k.lower().startswith('slcan'):
# For all software-filtered interfaces, clear the bus
# filters so bus.recv() returns ALL frames. Per-socket
# filtering in distribute() via _matches_filters()
# handles delivery. Hardware-filtered interfaces
# (socketcan, kvaser, vector, pcan, ixxat) keep their
# bus-level filters for efficiency.
if kwargs.get('can_filters') and _is_sw_filtered(k):
bus.set_filters(None)
socket.name = k
self.pool[k] = SocketMapper(bus, [socket])
Expand All @@ -242,17 +310,25 @@ def unregister(self, socket):
if socket.name is None:
raise TypeError("SocketWrapper.name should never be None")

mapper_to_shutdown = None
with self.pool_mutex:
try:
t = self.pool[socket.name]
t.sockets.remove(socket)
if not t.sockets:
t.closing = True
t.bus.shutdown()
del self.pool[socket.name]
mapper_to_shutdown = t
except KeyError:
warning("Socket %s already removed from pool" % socket.name)

# Shutdown the bus outside pool_mutex. Acquire bus_lock to
# wait for any in-progress read_bus() or send_bus() to finish
# before shutting down the underlying transport.
if mapper_to_shutdown is not None:
with mapper_to_shutdown.bus_lock:
mapper_to_shutdown.bus.shutdown()


SocketsPool = _SocketsPool()

Expand Down Expand Up @@ -341,6 +417,9 @@ def recv_raw(self, x=0xffff):
"""Returns a tuple containing (cls, pkt_data, time)"""
msg = self.can_iface.recv()

if msg is None:
return self.basecls, None, None

hdr = msg.is_extended_id << 31 | msg.is_remote_frame << 30 | \
msg.is_error_frame << 29 | msg.arbitration_id

Expand Down Expand Up @@ -382,7 +461,13 @@ def select(sockets, remain=conf.recv_poll_rate):
:returns: an array of sockets that were selected and
the function to be called next to get the packets (i.g. recv)
"""
# Move kernel-buffered CAN frames into the per-socket rx_queues
# BEFORE checking which sockets are ready. The previous order
# (check, then multiplex) returned a stale ready-list that did
# not include sockets whose data had just been multiplexed,
# causing a one-iteration delay.
SocketsPool.multiplex_rx_packets()

ready_sockets = \
[s for s in sockets if isinstance(s, PythonCANSocket) and
len(s.can_iface.rx_queue)]
Expand All @@ -401,6 +486,13 @@ def close(self):
"""Closes this socket"""
if self.closed:
return
# Final poll to ensure all kernel-buffered frames are distributed
# to any shared socket instances before we unregister.
try:
SocketsPool.multiplex_rx_packets()
except Exception:
log_runtime.debug("Exception during SocketsPool multiplex in close",
exc_info=True)
super(PythonCANSocket, self).close()
self.can_iface.shutdown()

Expand Down
Loading
Loading