Implement capture/tracing for redundant transport (#157)
diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml
index 04efaae..20584d3 100644
--- a/.idea/dictionaries/pavel.xml
+++ b/.idea/dictionaries/pavel.xml
@@ -50,11 +50,14 @@
<w>datagrams</w>
<w>dcabf</w>
<w>ddddddd</w>
+ <w>ddlm</w>
<w>deadbeefdeadbeefdeadbeefdeadbeef</w>
<w>deallocated</w>
<w>debian</w>
<w>decaxta</w>
+ <w>deduplicates</w>
<w>deduplicator</w>
+ <w>deduplicators</w>
<w>demultiplexer</w>
<w>demultiplexing</w>
<w>demux</w>
diff --git a/noxfile.py b/noxfile.py
index 75dd3b5..2c66a16 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -153,7 +153,7 @@
return 0
session.install("-e", f".[{','.join(EXTRAS_REQUIRE.keys())}]")
- session.install("git+https://github.com/UAVCAN/yakut@orc") # TODO: use stable version from PyPI when deployed.
+ session.install("git+https://github.com/UAVCAN/yakut@af4a4fc0ca6fa0b62b3ec266d3f8cddeceb27446") # TODO: use PyPI
demo_dir = ROOT_DIR / "demo"
tmp_dir = Path(session.create_tmp()).resolve()
diff --git a/pyuavcan/VERSION b/pyuavcan/VERSION
index ff006e9..6942b49 100644
--- a/pyuavcan/VERSION
+++ b/pyuavcan/VERSION
@@ -1 +1 @@
-1.2.0.b4
+1.2.0.b5
diff --git a/pyuavcan/application/_port_list_publisher.py b/pyuavcan/application/_port_list_publisher.py
index bd2c724..a96ed97 100644
--- a/pyuavcan/application/_port_list_publisher.py
+++ b/pyuavcan/application/_port_list_publisher.py
@@ -81,12 +81,13 @@
if publisher is None:
return
- input_ds = [x.specifier.data_specifier for x in self.node.presentation.transport.input_sessions]
+ trans = self.node.presentation.transport
+ input_ds = [x.specifier.data_specifier for x in trans.input_sessions]
srv_in_ds = [x for x in input_ds if isinstance(x, ServiceDataSpecifier)]
state = _State(
pub={
x.specifier.data_specifier.subject_id
- for x in self.node.presentation.transport.output_sessions
+ for x in trans.output_sessions
if isinstance(x.specifier.data_specifier, MessageDataSpecifier)
},
sub={x.subject_id for x in input_ds if isinstance(x, MessageDataSpecifier)},
@@ -99,8 +100,8 @@
if state_changed or time_expired:
_logger.debug("%r: Publishing: state_changed=%r, state=%r", self, state_changed, state)
self._state = state
- self._updates_since_pub = 0
- publisher.publish_soon(_make_port_list(self._state)) # Should we handle ResourceClosedError here?
+ self._updates_since_pub = 0 # Should we handle ResourceClosedError here?
+ publisher.publish_soon(_make_port_list(self._state, trans.capture_active))
def __repr__(self) -> str:
return pyuavcan.util.repr_attributes(self, self.node)
@@ -109,10 +110,12 @@
_logger = logging.getLogger(__name__)
-def _make_port_list(state: _State) -> List:
+def _make_port_list(state: _State, packet_capture_mode: bool) -> List:
+ from uavcan.primitive import Empty_1_0 as Empty
+
return List(
publishers=_make_subject_id_list(state.pub),
- subscribers=_make_subject_id_list(state.sub),
+ subscribers=_make_subject_id_list(state.sub) if not packet_capture_mode else SubjectIDList(total=Empty()),
clients=_make_service_id_list(state.cln),
servers=_make_service_id_list(state.srv),
)
@@ -150,7 +153,7 @@
srv=set(range(512)),
)
- msg = _make_port_list(state)
+ msg = _make_port_list(state, False)
assert msg.publishers.sparse_list is not None
pubs = [x.value for x in msg.publishers.sparse_list]
diff --git a/pyuavcan/transport/_tracer.py b/pyuavcan/transport/_tracer.py
index 1e075cf..531ca7e 100644
--- a/pyuavcan/transport/_tracer.py
+++ b/pyuavcan/transport/_tracer.py
@@ -21,6 +21,15 @@
timestamp: pyuavcan.transport.Timestamp
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.Transport]:
+ """
+ Static reference to the type of transport that can emit captures of this type.
+ For example, for UAVCAN/serial it would be :class:`pyuavcan.transport.serial.SerialTransport`.
+ Although the method is static, it shall be overridden by all inheritors.
+ """
+ raise NotImplementedError
+
CaptureCallback = typing.Callable[[Capture], None]
@@ -179,7 +188,5 @@
Reconstructed multi-frame transfers are reported as a single event when the last frame is received.
Capture instances that are not supported by the current transport are silently ignored and None is returned.
- This is to simplify tracing over heterogeneous transports where there are several tracer instances used
- concurrently, one per transport type.
"""
raise NotImplementedError
diff --git a/pyuavcan/transport/_transport.py b/pyuavcan/transport/_transport.py
index 12393d5..c9b6909 100644
--- a/pyuavcan/transport/_transport.py
+++ b/pyuavcan/transport/_transport.py
@@ -215,6 +215,14 @@
"""
raise NotImplementedError
+ @property
+ @abc.abstractmethod
+ def capture_active(self) -> bool:
+ """
+ Whether :meth:`begin_capture` was invoked and packet capture is being performed on this transport.
+ """
+ raise NotImplementedError
+
@staticmethod
@abc.abstractmethod
def make_tracer() -> Tracer:
diff --git a/pyuavcan/transport/can/_can.py b/pyuavcan/transport/can/_can.py
index 959653d..ec7a2cd 100644
--- a/pyuavcan/transport/can/_can.py
+++ b/pyuavcan/transport/can/_can.py
@@ -70,6 +70,8 @@
Please read the module documentation for details.
"""
+ TRANSFER_ID_MODULO = TRANSFER_ID_MODULO
+
def __init__(
self,
media: Media,
@@ -239,6 +241,10 @@
self._capture_handlers.append(handler)
self._reconfigure_acceptance_filters()
+ @property
+ def capture_active(self) -> bool:
+ return len(self._capture_handlers) > 0
+
@staticmethod
def make_tracer() -> CANTracer:
"""
diff --git a/pyuavcan/transport/can/_tracer.py b/pyuavcan/transport/can/_tracer.py
index ed6ca2c..3722bde 100644
--- a/pyuavcan/transport/can/_tracer.py
+++ b/pyuavcan/transport/can/_tracer.py
@@ -6,6 +6,7 @@
import typing
import dataclasses
import pyuavcan
+import pyuavcan.transport.can
from pyuavcan.transport import Trace, TransferTrace, AlienSessionSpecifier, AlienTransferMetadata, Capture
from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp, Priority
from ._session import TransferReassemblyErrorID, TransferReassembler
@@ -46,6 +47,10 @@
direction = "tx" if self.own else "rx"
return pyuavcan.util.repr_attributes(self, self.timestamp, direction, self.frame)
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.can.CANTransport]:
+ return pyuavcan.transport.can.CANTransport
+
@dataclasses.dataclass(frozen=True)
class CANErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/loopback/_loopback.py b/pyuavcan/transport/loopback/_loopback.py
index 95a033d..bc81226 100644
--- a/pyuavcan/transport/loopback/_loopback.py
+++ b/pyuavcan/transport/loopback/_loopback.py
@@ -177,6 +177,10 @@
def begin_capture(self, handler: pyuavcan.transport.CaptureCallback) -> None:
self._capture_handlers.append(handler)
+ @property
+ def capture_active(self) -> bool:
+ return len(self._capture_handlers) > 0
+
@staticmethod
def make_tracer() -> LoopbackTracer:
"""
diff --git a/pyuavcan/transport/loopback/_tracer.py b/pyuavcan/transport/loopback/_tracer.py
index 9f163cb..14c9be2 100644
--- a/pyuavcan/transport/loopback/_tracer.py
+++ b/pyuavcan/transport/loopback/_tracer.py
@@ -2,9 +2,10 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <pavel@uavcan.org>
+from __future__ import annotations
import typing
import dataclasses
-import pyuavcan.transport
+import pyuavcan.transport.loopback
from pyuavcan.transport import Trace, TransferTrace, Capture
@@ -16,6 +17,10 @@
transfer: pyuavcan.transport.AlienTransfer
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.loopback.LoopbackTransport]:
+ return pyuavcan.transport.loopback.LoopbackTransport
+
class LoopbackTracer(pyuavcan.transport.Tracer):
"""
diff --git a/pyuavcan/transport/redundant/__init__.py b/pyuavcan/transport/redundant/__init__.py
index 6b72403..b344c04 100644
--- a/pyuavcan/transport/redundant/__init__.py
+++ b/pyuavcan/transport/redundant/__init__.py
@@ -372,3 +372,7 @@
from ._session import RedundantFeedback as RedundantFeedback
from ._error import InconsistentInferiorConfigurationError as InconsistentInferiorConfigurationError
+
+from ._tracer import RedundantCapture as RedundantCapture
+from ._tracer import RedundantDuplicateTransferTrace as RedundantDuplicateTransferTrace
+from ._tracer import RedundantTracer as RedundantTracer
diff --git a/pyuavcan/transport/redundant/_deduplicator/_base.py b/pyuavcan/transport/redundant/_deduplicator/_base.py
index 78d5b04..f57ff04 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_base.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_base.py
@@ -2,21 +2,59 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <pavel@uavcan.org>
+from __future__ import annotations
import abc
+import typing
import pyuavcan.transport
class Deduplicator(abc.ABC):
+ """
+ The abstract class implementing the transfer-wise deduplication strategy.
+ **Users of redundant transports do not need to deduplicate their transfers manually
+ as it will be done automatically.**
+ Please read the module documentation for further details.
+ """
+
+ MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD = int(2 ** 48)
+ """
+ An inferior transport whose transfer-ID modulo is less than this value is expected to experience
+ transfer-ID overflows routinely during its operation. Otherwise, the transfer-ID is not expected to
+ overflow for centuries.
+
+ A transfer-ID counter that is expected to overflow is called "cyclic", otherwise it's "monotonic".
+ Read https://forum.uavcan.org/t/alternative-transport-protocols/324.
+ See :meth:`new`.
+ """
+
+ @staticmethod
+ def new(transfer_id_modulo: int) -> Deduplicator:
+ """
+ A helper factory that constructs a :class:`MonotonicDeduplicator` if the argument is not less than
+ :attr:`MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD`, otherwise constructs a :class:`CyclicDeduplicator`.
+ """
+ from . import CyclicDeduplicator, MonotonicDeduplicator
+
+ if transfer_id_modulo >= Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+ return MonotonicDeduplicator()
+ return CyclicDeduplicator(transfer_id_modulo)
+
@abc.abstractmethod
def should_accept_transfer(
- self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+ self,
+ *,
+ iface_id: int,
+ transfer_id_timeout: float,
+ timestamp: pyuavcan.transport.Timestamp,
+ source_node_id: typing.Optional[int],
+ transfer_id: int,
) -> bool:
"""
The iface-ID is an arbitrary integer that is unique within the redundant group identifying the transport
instance the transfer was received from.
It could be the index of the redundant interface (e.g., 0, 1, 2 for a triply-redundant transport),
or it could be something else like a memory address of a related object.
- Embedded applications usually use indexes, whereas in PyUAVCAN it may be more convenient to use ``id()``.
+ Embedded applications usually use indexes, whereas in PyUAVCAN it may be more convenient to use :func:`id`.
The transfer-ID timeout is specified in seconds. It is used to handle the case of a node restart.
"""
diff --git a/pyuavcan/transport/redundant/_deduplicator/_cyclic.py b/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
index 6d0f5d8..8adb558 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
@@ -15,32 +15,36 @@
self._remote_states: typing.List[typing.Optional[_RemoteState]] = []
def should_accept_transfer(
- self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+ self,
+ *,
+ iface_id: int,
+ transfer_id_timeout: float,
+ timestamp: pyuavcan.transport.Timestamp,
+ source_node_id: typing.Optional[int],
+ transfer_id: int,
) -> bool:
- if transfer.source_node_id is None:
+ if source_node_id is None:
# Anonymous transfers are fully stateless, so always accepted.
# This may lead to duplications and reordering but this is a design limitation.
return True
# If a similar architecture is used on an embedded system, this normally would be a static array.
- if len(self._remote_states) <= transfer.source_node_id:
- self._remote_states += [None] * (transfer.source_node_id - len(self._remote_states) + 1)
- assert len(self._remote_states) == transfer.source_node_id + 1
+ if len(self._remote_states) <= source_node_id:
+ self._remote_states += [None] * (source_node_id - len(self._remote_states) + 1)
+ assert len(self._remote_states) == source_node_id + 1
- if self._remote_states[transfer.source_node_id] is None:
+ if self._remote_states[source_node_id] is None:
# First transfer from this node, create new state and accept unconditionally.
- self._remote_states[transfer.source_node_id] = _RemoteState(
- iface_id=iface_id, last_timestamp=transfer.timestamp
- )
+ self._remote_states[source_node_id] = _RemoteState(iface_id=iface_id, last_timestamp=timestamp)
return True
# We have seen transfers from this node before, so we need to perform actual deduplication.
- state = self._remote_states[transfer.source_node_id]
+ state = self._remote_states[source_node_id]
assert state is not None
# If the current interface was seen working recently, reject traffic from other interfaces.
# Note that the time delta may be negative due to timestamping variations and inner latency variations.
- time_delta = transfer.timestamp.monotonic - state.last_timestamp.monotonic
+ time_delta = timestamp.monotonic - state.last_timestamp.monotonic
iface_switch_allowed = time_delta > transfer_id_timeout
if not iface_switch_allowed and state.iface_id != iface_id:
return False
@@ -50,7 +54,7 @@
# Either we're on the same interface or (the interface is new and the current one seems to be down).
state.iface_id = iface_id
- state.last_timestamp = transfer.timestamp
+ state.last_timestamp = timestamp
return True
diff --git a/pyuavcan/transport/redundant/_deduplicator/_monotonic.py b/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
index e5d3293..3b2e604 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
@@ -13,38 +13,42 @@
self._remote_states: typing.List[typing.Optional[_RemoteState]] = []
def should_accept_transfer(
- self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+ self,
+ *,
+ iface_id: int,
+ transfer_id_timeout: float,
+ timestamp: pyuavcan.transport.Timestamp,
+ source_node_id: typing.Optional[int],
+ transfer_id: int,
) -> bool:
del iface_id # Not used in monotonic deduplicator.
- if transfer.source_node_id is None:
+ if source_node_id is None:
# Anonymous transfers are fully stateless, so always accepted.
# This may lead to duplications and reordering but this is a design limitation.
return True
# If a similar architecture is used on an embedded system, this normally would be a static array.
- if len(self._remote_states) <= transfer.source_node_id:
- self._remote_states += [None] * (transfer.source_node_id - len(self._remote_states) + 1)
- assert len(self._remote_states) == transfer.source_node_id + 1
+ if len(self._remote_states) <= source_node_id:
+ self._remote_states += [None] * (source_node_id - len(self._remote_states) + 1)
+ assert len(self._remote_states) == source_node_id + 1
- if self._remote_states[transfer.source_node_id] is None:
+ if self._remote_states[source_node_id] is None:
# First transfer from this node, create new state and accept unconditionally.
- self._remote_states[transfer.source_node_id] = _RemoteState(
- last_transfer_id=transfer.transfer_id, last_timestamp=transfer.timestamp
- )
+ self._remote_states[source_node_id] = _RemoteState(last_transfer_id=transfer_id, last_timestamp=timestamp)
return True
# We have seen transfers from this node before, so we need to perform actual deduplication.
- state = self._remote_states[transfer.source_node_id]
+ state = self._remote_states[source_node_id]
assert state is not None
# If we have seen transfers with higher TID values recently, reject this one as duplicate.
- tid_timeout = (transfer.timestamp.monotonic - state.last_timestamp.monotonic) > transfer_id_timeout
- if not tid_timeout and transfer.transfer_id <= state.last_transfer_id:
+ tid_timeout = (timestamp.monotonic - state.last_timestamp.monotonic) > transfer_id_timeout
+ if not tid_timeout and transfer_id <= state.last_transfer_id:
return False
# Otherwise, this is either a new transfer or a TID timeout condition has occurred.
- state.last_transfer_id = transfer.transfer_id
- state.last_timestamp = transfer.timestamp
+ state.last_transfer_id = transfer_id
+ state.last_timestamp = timestamp
return True
diff --git a/pyuavcan/transport/redundant/_redundant_transport.py b/pyuavcan/transport/redundant/_redundant_transport.py
index e310dab..82ee43f 100644
--- a/pyuavcan/transport/redundant/_redundant_transport.py
+++ b/pyuavcan/transport/redundant/_redundant_transport.py
@@ -9,6 +9,8 @@
import pyuavcan.transport
from ._session import RedundantInputSession, RedundantOutputSession, RedundantSession
from ._error import InconsistentInferiorConfigurationError
+from ._deduplicator import Deduplicator
+from ._tracer import RedundantTracer, RedundantCapture
_logger = logging.getLogger(__name__)
@@ -33,15 +35,6 @@
Please read the module documentation for details.
"""
- MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD = int(2 ** 48)
- """
- An inferior transport whose transfer-ID modulo is less than this value is expected to experience
- transfer-ID overflows routinely during its operation. Otherwise, the transfer-ID is not expected to
- overflow for centuries.
- A transfer-ID counter that is expected to overflow is called "cyclic", otherwise it's "monotonic".
- Read https://forum.uavcan.org/t/alternative-transport-protocols/324.
- """
-
def __init__(self, *, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:
"""
:param loop: All inferiors shall run on the same event loop,
@@ -50,7 +43,7 @@
"""
self._cols: typing.List[pyuavcan.transport.Transport] = []
self._rows: typing.Dict[pyuavcan.transport.SessionSpecifier, RedundantSession] = {}
- self._capture_handlers: typing.List[pyuavcan.transport.CaptureCallback] = []
+ self._unwrapped_capture_handlers: typing.List[typing.Callable[[RedundantCapture], None]] = []
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._check_matrix_consistency()
@@ -112,7 +105,9 @@
) -> RedundantInputSession:
out = self._get_session(
specifier,
- lambda fin: RedundantInputSession(specifier, payload_metadata, self._get_tid_modulo, self._loop, fin),
+ lambda fin: RedundantInputSession(
+ specifier, payload_metadata, lambda: self.protocol_parameters.transfer_id_modulo, self._loop, fin
+ ),
)
assert isinstance(out, RedundantInputSession)
self._check_matrix_consistency()
@@ -170,8 +165,6 @@
the operation will be rolled back to ensure state consistency.
"""
self._validate_inferior(transport)
- for ch in self._capture_handlers:
- transport.begin_capture(ch)
self._cols.append(transport)
try:
for redundant_session in self._rows.values():
@@ -181,6 +174,9 @@
raise
finally:
self._check_matrix_consistency()
+ # Launch the capture as late as possible to not leave it dangling if the attachment failed.
+ for ch in self._unwrapped_capture_handlers:
+ transport.begin_capture(self._wrap_capture_handler(transport, ch))
def detach_inferior(self, transport: pyuavcan.transport.Transport) -> None:
"""
@@ -234,25 +230,32 @@
def begin_capture(self, handler: pyuavcan.transport.CaptureCallback) -> None:
"""
Stores the handler in the local list of handlers.
- Invokes :class:`pyuavcan.transport.Transport.begin_capture` on each inferior with the provided handler.
+ Invokes :class:`pyuavcan.transport.Transport.begin_capture` on each inferior.
If at least one inferior raises an exception, it is propagated immediately and the remaining inferiors
will remain in an inconsistent state.
When a new inferior is added later, the stored handlers will be automatically used to enable capture on it.
If such auto-restoration behavior is undesirable, configure capture individually per-inferior instead.
- The redundant transport does not define its own capture events and does not wrap captured events reported
- by its inferiors.
+ Every capture emitted by the inferiors is wrapped into :class:`RedundantCapture`,
+ which contains additional metadata about the inferior transport instance that emitted the capture.
+ This is done to let users understand which transport of the redundant group has
+ provided the capture and also this information is used by :class:`RedundantTracer`
+ to automatically manage transfer deduplication.
"""
- self._capture_handlers.append(handler)
+ self._unwrapped_capture_handlers.append(handler)
for c in self._cols:
- c.begin_capture(handler)
+ c.begin_capture(self._wrap_capture_handler(c, handler))
+
+ @property
+ def capture_active(self) -> bool:
+ return len(self._unwrapped_capture_handlers) > 0
@staticmethod
- def make_tracer() -> pyuavcan.transport.Tracer:
+ def make_tracer() -> RedundantTracer:
"""
- This method is not implemented for redundant transport. Access the inferiors directly instead.
+ See :class:`RedundantTracer`.
"""
- raise NotImplementedError
+ return RedundantTracer()
async def spoof(self, transfer: pyuavcan.transport.AlienTransfer, monotonic_deadline: float) -> bool:
"""
@@ -262,6 +265,7 @@
First exception to occur terminates the operation and is raised immediately.
This is different from regular sending; the assumption is that the caller necessarily wants to ensure
that spoofing takes place against every inferior.
+ If this is not the case, spoof each inferior separately.
"""
if not self._cols:
return False
@@ -297,8 +301,11 @@
)
# Ensure all inferiors use the same transfer-ID overflow policy.
- if self._get_tid_modulo() is None:
- if transport.protocol_parameters.transfer_id_modulo < self.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+ if self.protocol_parameters.transfer_id_modulo >= Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+ if (
+ transport.protocol_parameters.transfer_id_modulo
+ < Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD
+ ):
raise InconsistentInferiorConfigurationError(
f"The new inferior shall use monotonic transfer-ID counters in order to match the "
f"other inferiors in the redundant transport group"
@@ -358,14 +365,24 @@
owner._close_inferior(new_index) # pylint: disable=protected-access
raise
- def _get_tid_modulo(self) -> typing.Optional[int]:
- if self.protocol_parameters.transfer_id_modulo < self.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
- return self.protocol_parameters.transfer_id_modulo
- return None
-
def _check_matrix_consistency(self) -> None:
for row in self._rows.values():
assert len(row.inferiors) == len(self._cols)
+ def _wrap_capture_handler(
+ self,
+ inferior: pyuavcan.transport.Transport,
+ handler: typing.Callable[[RedundantCapture], None],
+ ) -> pyuavcan.transport.CaptureCallback:
+ # If you are reading this, send me a postcard.
+ return lambda cap: handler(
+ RedundantCapture(
+ cap.timestamp,
+ inferior=cap,
+ iface_id=id(inferior),
+ transfer_id_modulo=self.protocol_parameters.transfer_id_modulo, # THIS IS PROBABLY SLOW?
+ )
+ )
+
def _get_repr_fields(self) -> typing.Tuple[typing.List[typing.Any], typing.Dict[str, typing.Any]]:
return list(self.inferiors), {}
diff --git a/pyuavcan/transport/redundant/_session/_input.py b/pyuavcan/transport/redundant/_session/_input.py
index 9cd3383..cd0be73 100644
--- a/pyuavcan/transport/redundant/_session/_input.py
+++ b/pyuavcan/transport/redundant/_session/_input.py
@@ -10,7 +10,7 @@
import pyuavcan.transport
import pyuavcan.util
from ._base import RedundantSession, RedundantSessionStatistics
-from .._deduplicator import Deduplicator, MonotonicDeduplicator, CyclicDeduplicator
+from .._deduplicator import Deduplicator
_logger = logging.getLogger(__name__)
@@ -52,7 +52,7 @@
self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
- tid_modulo_provider: typing.Callable[[], typing.Optional[int]],
+ tid_modulo_provider: typing.Callable[[], int],
loop: asyncio.AbstractEventLoop,
finalizer: typing.Callable[[], None],
):
@@ -92,12 +92,7 @@
# Ensure that the deduplicator is constructed when the first inferior is launched.
if self._deduplicator is None:
- tid_modulo = self._get_tid_modulo()
- if tid_modulo is None:
- self._deduplicator = MonotonicDeduplicator()
- else:
- assert 0 < tid_modulo <= 2 ** 56, "Sanity check"
- self._deduplicator = CyclicDeduplicator(tid_modulo)
+ self._deduplicator = Deduplicator.new(self._get_tid_modulo())
_logger.debug("%s: Constructed new deduplicator: %s", self, self._deduplicator)
# Synchronize the settings for the newly added inferior with its siblings.
@@ -239,7 +234,13 @@
) -> None:
assert self._deduplicator is not None
iface_id = id(session)
- if self._deduplicator.should_accept_transfer(iface_id, self.transfer_id_timeout, transfer):
+ if self._deduplicator.should_accept_transfer(
+ iface_id=iface_id,
+ transfer_id_timeout=self.transfer_id_timeout,
+ timestamp=transfer.timestamp,
+ source_node_id=transfer.source_node_id,
+ transfer_id=transfer.transfer_id,
+ ):
_logger.debug("%s: Accepting %s from %016x", self, transfer, iface_id)
self._stat_transfers += 1
self._stat_payload_bytes += sum(map(len, transfer.fragmented_payload))
@@ -509,7 +510,7 @@
ses = RedundantInputSession(
spec,
meta,
- tid_modulo_provider=lambda: None, # Like UDP or serial - infinite modulo.
+ tid_modulo_provider=lambda: 2 ** 56, # Like UDP or serial - infinite modulo.
loop=loop,
finalizer=lambda: None,
)
diff --git a/pyuavcan/transport/redundant/_tracer.py b/pyuavcan/transport/redundant/_tracer.py
new file mode 100644
index 0000000..ee53cc7
--- /dev/null
+++ b/pyuavcan/transport/redundant/_tracer.py
@@ -0,0 +1,159 @@
+# Copyright (c) 2021 UAVCAN Consortium
+# This software is distributed under the terms of the MIT License.
+# Author: Pavel Kirienko <pavel@uavcan.org>
+
+from __future__ import annotations
+import typing
+import logging
+import dataclasses
+import pyuavcan
+import pyuavcan.transport.redundant
+from ._deduplicator import Deduplicator
+
+
+@dataclasses.dataclass(frozen=True)
+class RedundantCapture(pyuavcan.transport.Capture):
+ """
+ Composes :class:`pyuavcan.transport.Capture` with a reference to the
+ transport instance that yielded this capture.
+ The user may construct such captures manually when performing postmortem analysis of a network data dump
+ to feed them later into :class:`RedundantTracer`.
+ """
+
+ inferior: pyuavcan.transport.Capture
+ """
+ The original capture from the inferior transport.
+ """
+
+ iface_id: int
+ """
+ A unique number that identifies this transport in its redundant group.
+ """
+
+ transfer_id_modulo: int
+ """
+ The number of unique transfer-ID values (that is, the maximum possible transfer-ID plus one)
+ for the transport that emitted this capture.
+ This is actually a transport-specific constant.
+ This value is used by :class:`RedundantTracer` to select the appropriate transfer deduplication strategy.
+ """
+
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.redundant.RedundantTransport]:
+ return pyuavcan.transport.redundant.RedundantTransport
+
+
+@dataclasses.dataclass(frozen=True)
+class RedundantDuplicateTransferTrace(pyuavcan.transport.Trace):
+ """
+ Indicates that the last capture object completed a valid transfer that was discarded as a duplicate
+ (either received from another redundant interface or deterministic data loss mitigation (DDLM) is employed).
+
+ Observe that it is NOT a subclass of :class:`pyuavcan.transport.TransferTrace`!
+ It shall not be one because duplicates should not be processed normally.
+ """
+
+
+class RedundantTracer(pyuavcan.transport.Tracer):
+ """
+ The redundant tracer automatically deduplicates transfers received from multiple redundant transports.
+ It can be used either in real-time or during postmortem analysis.
+ In the latter case the user would construct instances of :class:`RedundantCapture` manually and feed them
+ into the tracer one-by-one.
+ """
+
+ def __init__(self) -> None:
+ self._deduplicators: typing.Dict[RedundantTracer._DeduplicatorSelector, Deduplicator] = {}
+ self._last_transfer_id_modulo = 0
+ self._inferior_tracers: typing.Dict[
+ typing.Tuple[typing.Type[pyuavcan.transport.Transport], int],
+ pyuavcan.transport.Tracer,
+ ] = {}
+
+ def update(self, cap: pyuavcan.transport.Capture) -> typing.Optional[pyuavcan.transport.Trace]:
+ """
+ All instances of :class:`pyuavcan.transport.TransferTrace` are deduplicated,
+ duplicates are simply dropped and :class:`RedundantDuplicateTransferTrace` is returned.
+ All other instances (such as :class:`pyuavcan.transport.ErrorTrace`) are returned unchanged.
+ """
+ _logger.debug("%r: Processing %r", self, cap)
+ if not isinstance(cap, RedundantCapture):
+ return None
+
+ if cap.transfer_id_modulo != self._last_transfer_id_modulo:
+ _logger.info(
+ "%r: TID modulo change detected, resetting state (%d deduplicators dropped): %r --> %r",
+ self,
+ len(self._deduplicators),
+ self._last_transfer_id_modulo,
+ cap.transfer_id_modulo,
+ )
+ # Should we also drop the tracers here? If an inferior transport is removed its tracer will be sitting
+ # here useless, we don't want that. But on the other hand, disturbing the state too much is also no good.
+ self._last_transfer_id_modulo = cap.transfer_id_modulo
+ self._deduplicators.clear()
+
+ tracer = self._get_inferior_tracer(cap.inferior.get_transport_type(), cap.iface_id)
+ trace = tracer.update(cap.inferior)
+ if not isinstance(trace, pyuavcan.transport.TransferTrace):
+ _logger.debug("%r: BYPASS: %r", self, trace)
+ return trace
+
+ meta = trace.transfer.metadata
+ deduplicator = self._get_deduplicator(
+ meta.session_specifier.destination_node_id,
+ meta.session_specifier.data_specifier,
+ cap.transfer_id_modulo,
+ )
+ should_accept = deduplicator.should_accept_transfer(
+ iface_id=cap.iface_id,
+ transfer_id_timeout=trace.transfer_id_timeout,
+ timestamp=trace.timestamp,
+ source_node_id=meta.session_specifier.source_node_id,
+ transfer_id=meta.transfer_id,
+ )
+ if should_accept:
+ _logger.debug("%r: ACCEPT: %r", self, trace)
+ return trace
+ _logger.debug("%r: REJECT: %r", self, trace)
+ return RedundantDuplicateTransferTrace(cap.timestamp)
+
+ def _get_deduplicator(
+ self,
+ destination_node_id: typing.Optional[int],
+ data_specifier: pyuavcan.transport.DataSpecifier,
+ transfer_id_modulo: int,
+ ) -> Deduplicator:
+ selector = RedundantTracer._DeduplicatorSelector(destination_node_id, data_specifier)
+ try:
+ return self._deduplicators[selector]
+ except LookupError:
+ dd = Deduplicator.new(transfer_id_modulo)
+ _logger.debug("%r: New deduplicator for %r: %r", self, selector, dd)
+ self._deduplicators[selector] = dd
+ return self._deduplicators[selector]
+
+ def _get_inferior_tracer(
+ self,
+ inferior_type: typing.Type[pyuavcan.transport.Transport],
+ inferior_iface_id: int,
+ ) -> pyuavcan.transport.Tracer:
+ selector = inferior_type, inferior_iface_id
+ try:
+ return self._inferior_tracers[selector]
+ except LookupError:
+ it = inferior_type.make_tracer()
+ _logger.debug("%r: New inferior tracer for %r: %r", self, selector, it)
+ self._inferior_tracers[selector] = it
+ return self._inferior_tracers[selector]
+
+ @dataclasses.dataclass(frozen=True)
+ class _DeduplicatorSelector:
+ destination_node_id: typing.Optional[int]
+ data_specifier: pyuavcan.transport.DataSpecifier
+
+ def __repr__(self) -> str:
+ return pyuavcan.util.repr_attributes(self, self._inferior_tracers)
+
+
+_logger = logging.getLogger(__name__)
diff --git a/pyuavcan/transport/serial/_serial.py b/pyuavcan/transport/serial/_serial.py
index 6f360b1..e12dfa5 100644
--- a/pyuavcan/transport/serial/_serial.py
+++ b/pyuavcan/transport/serial/_serial.py
@@ -43,6 +43,8 @@
Please read the module documentation for details.
"""
+ TRANSFER_ID_MODULO = SerialFrame.TRANSFER_ID_MASK + 1
+
VALID_MTU_RANGE = (1024, 1024 ** 3)
"""
The maximum MTU is practically unlimited, and it is also the default MTU.
@@ -163,7 +165,7 @@
@property
def protocol_parameters(self) -> pyuavcan.transport.ProtocolParameters:
return pyuavcan.transport.ProtocolParameters(
- transfer_id_modulo=SerialFrame.TRANSFER_ID_MASK + 1,
+ transfer_id_modulo=self.TRANSFER_ID_MODULO,
max_nodes=len(SerialFrame.NODE_ID_RANGE),
mtu=self._mtu,
)
@@ -266,6 +268,10 @@
"""
self._capture_handlers.append(handler)
+ @property
+ def capture_active(self) -> bool:
+ return len(self._capture_handlers) > 0
+
@staticmethod
def make_tracer() -> SerialTracer:
"""
diff --git a/pyuavcan/transport/serial/_tracer.py b/pyuavcan/transport/serial/_tracer.py
index eb6af29..d3018b5 100644
--- a/pyuavcan/transport/serial/_tracer.py
+++ b/pyuavcan/transport/serial/_tracer.py
@@ -7,6 +7,7 @@
import logging
import dataclasses
import pyuavcan
+import pyuavcan.transport.serial
from pyuavcan.transport import Trace, TransferTrace, Capture, AlienSessionSpecifier, AlienTransferMetadata
from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp
from pyuavcan.transport.commons.high_overhead_transport import AlienTransferReassembler, TransferReassembler
@@ -49,6 +50,10 @@
direction = "tx" if self.own else "rx"
return pyuavcan.util.repr_attributes(self, direction, fragment)
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.serial.SerialTransport]:
+ return pyuavcan.transport.serial.SerialTransport
+
@dataclasses.dataclass(frozen=True)
class SerialErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/udp/_tracer.py b/pyuavcan/transport/udp/_tracer.py
index 524309c..b8ddf1e 100644
--- a/pyuavcan/transport/udp/_tracer.py
+++ b/pyuavcan/transport/udp/_tracer.py
@@ -8,6 +8,7 @@
import dataclasses
from ipaddress import IPv4Address, IPv6Address
import pyuavcan
+import pyuavcan.transport.udp
from pyuavcan.transport import Trace, TransferTrace, Capture, AlienSessionSpecifier, AlienTransferMetadata
from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp
from pyuavcan.transport.commons.high_overhead_transport import AlienTransferReassembler, TransferReassembler
@@ -172,6 +173,10 @@
)
return ses_spec, frame
+ @staticmethod
+ def get_transport_type() -> typing.Type[pyuavcan.transport.udp.UDPTransport]:
+ return pyuavcan.transport.udp.UDPTransport
+
@dataclasses.dataclass(frozen=True)
class UDPErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/udp/_udp.py b/pyuavcan/transport/udp/_udp.py
index 479870a..757403c 100644
--- a/pyuavcan/transport/udp/_udp.py
+++ b/pyuavcan/transport/udp/_udp.py
@@ -37,6 +37,8 @@
Please read the module documentation for details.
"""
+ TRANSFER_ID_MODULO = UDPFrame.TRANSFER_ID_MASK + 1
+
VALID_MTU_RANGE = 1200, 9000
"""
The minimum is based on the IPv6 specification, which guarantees that the path MTU is at least 1280 bytes large.
@@ -178,7 +180,7 @@
@property
def protocol_parameters(self) -> pyuavcan.transport.ProtocolParameters:
return pyuavcan.transport.ProtocolParameters(
- transfer_id_modulo=UDPFrame.TRANSFER_ID_MASK + 1,
+ transfer_id_modulo=self.TRANSFER_ID_MODULO,
max_nodes=self._sock_factory.max_nodes,
mtu=self._mtu,
)
@@ -290,6 +292,10 @@
self._sniffer = self._sock_factory.make_sniffer(self._process_capture)
self._capture_handlers.append(handler)
+ @property
+ def capture_active(self) -> bool:
+ return self._sniffer is not None
+
@staticmethod
def make_tracer() -> UDPTracer:
"""
diff --git a/tests/transport/can/_can.py b/tests/transport/can/_can.py
index dca2103..b0ef322 100644
--- a/tests/transport/can/_can.py
+++ b/tests/transport/can/_can.py
@@ -1005,8 +1005,10 @@
assert isinstance(cap, CANCapture)
captures_other.append(cap)
+ assert not tr.capture_active
tr.begin_capture(add_capture)
tr.begin_capture(add_capture_other)
+ assert tr.capture_active
assert media.acceptance_filters == [
FilterConfiguration.new_promiscuous(FrameFormat.BASE),
FilterConfiguration.new_promiscuous(FrameFormat.EXTENDED),
diff --git a/tests/transport/loopback/_loopback.py b/tests/transport/loopback/_loopback.py
index 0786696..efa84e6 100644
--- a/tests/transport/loopback/_loopback.py
+++ b/tests/transport/loopback/_loopback.py
@@ -302,6 +302,7 @@
mon_events: typing.List[pyuavcan.transport.Capture] = []
tr.begin_capture(mon_events.append)
+ assert tr.capture_active
transfer = AlienTransfer(
AlienTransferMetadata(Priority.IMMEDIATE, 54321, AlienSessionSpecifier(1234, None, MessageDataSpecifier(7777))),
diff --git a/tests/transport/redundant/_redundant.py b/tests/transport/redundant/_redundant.py
index 5e9c39e..11a5c99 100644
--- a/tests/transport/redundant/_redundant.py
+++ b/tests/transport/redundant/_redundant.py
@@ -14,6 +14,7 @@
from pyuavcan.transport.loopback import LoopbackTransport
from pyuavcan.transport.serial import SerialTransport
from pyuavcan.transport.udp import UDPTransport
+from pyuavcan.transport.can import CANTransport
from tests.transport.serial import VIRTUAL_BUS_URI as SERIAL_URI
@@ -301,22 +302,159 @@
await asyncio.sleep(1) # Let all pending tasks finalize properly to avoid stack traces in the output.
-def _unittest_redundant_transport_capture() -> None:
- def mon(_x: object) -> None:
- return None
+@pytest.mark.asyncio # type: ignore
+async def _unittest_redundant_transport_capture() -> None:
+ from threading import Lock
+ from pyuavcan.transport import Capture, Trace, TransferTrace, Priority, ServiceDataSpecifier
+ from pyuavcan.transport import AlienTransfer, AlienTransferMetadata, AlienSessionSpecifier
+ from pyuavcan.transport.redundant import RedundantDuplicateTransferTrace, RedundantCapture
+ from tests.transport.can.media.mock import MockMedia as CANMockMedia
+ asyncio.get_event_loop().slow_callback_duration = 5.0
+
+ tracer = RedundantTransport.make_tracer()
+ traces: typing.List[typing.Optional[Trace]] = []
+ lock = Lock()
+
+ def handle_capture(cap: Capture) -> None:
+ with lock:
+ # Drop TX frames, they are not interesting for this test.
+ assert isinstance(cap, RedundantCapture)
+ if isinstance(cap.inferior, pyuavcan.transport.serial.SerialCapture) and cap.inferior.own:
+ return
+ if isinstance(cap.inferior, pyuavcan.transport.can.CANCapture) and cap.inferior.own:
+ return
+ print("CAPTURE:", cap)
+ traces.append(tracer.update(cap))
+
+ async def wait(how_many: int) -> None:
+ for _ in range(10):
+ await asyncio.sleep(0.1)
+ with lock:
+ if len(traces) >= 2:
+ return
+ assert False, "No traces received"
+
+ # Setup capture -- one is added before capture started, the other is added later.
+ # Make sure they are treated identically.
tr = RedundantTransport()
- inf_a = LoopbackTransport(1234)
- inf_b = LoopbackTransport(1234)
- tr.begin_capture(mon)
- assert inf_a.capture_handlers == []
- assert inf_b.capture_handlers == []
+ inf_a: pyuavcan.transport.Transport = SerialTransport(SERIAL_URI, 1234)
+ inf_b: pyuavcan.transport.Transport = SerialTransport(SERIAL_URI, 1234)
tr.attach_inferior(inf_a)
- assert inf_a.capture_handlers == [mon]
- assert inf_b.capture_handlers == []
+ assert not tr.capture_active
+ assert not inf_a.capture_active
+ assert not inf_b.capture_active
+ tr.begin_capture(handle_capture)
+ assert tr.capture_active
+ assert inf_a.capture_active
+ assert not inf_b.capture_active
tr.attach_inferior(inf_b)
- assert inf_a.capture_handlers == [mon]
- assert inf_b.capture_handlers == [mon]
+ assert tr.capture_active
+ assert inf_a.capture_active
+ assert inf_b.capture_active
+
+ # Send a transfer and make sure it is handled and deduplicated correctly.
+ transfer = AlienTransfer(
+ AlienTransferMetadata(
+ priority=Priority.IMMEDIATE,
+ transfer_id=1234,
+ session_specifier=AlienSessionSpecifier(
+ source_node_id=321,
+ destination_node_id=222,
+ data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+ ),
+ ),
+ [memoryview(b"hello")],
+ )
+ assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+ await wait(2)
+ with lock:
+ # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+ assert len(traces) == 2
+ trace = traces.pop(0)
+ assert isinstance(trace, TransferTrace)
+ assert trace.transfer == transfer
+ # This is the duplicate.
+ assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+ assert not traces
+
+ # Spoof the same thing again, get nothing out: transfers discarded by the inferior's own reassemblers.
+ # WARNING: this will fail if too much time has passed since the previous transfer due to TID timeout.
+ assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+ await wait(2)
+ with lock:
+ assert None is traces.pop(0)
+ assert None is traces.pop(0)
+ assert not traces
+
+ # But if we change ONLY destination, deduplication will not take place.
+ transfer = AlienTransfer(
+ AlienTransferMetadata(
+ priority=Priority.IMMEDIATE,
+ transfer_id=1234,
+ session_specifier=AlienSessionSpecifier(
+ source_node_id=321,
+ destination_node_id=333,
+ data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+ ),
+ ),
+ [memoryview(b"hello")],
+ )
+ assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+ await wait(2)
+ with lock:
+ # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+ assert len(traces) == 2
+ trace = traces.pop(0)
+ assert isinstance(trace, TransferTrace)
+ assert trace.transfer == transfer
+ # This is the duplicate.
+ assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+ assert not traces
+
+ # Change the inferior configuration and make sure it is handled properly.
+ tr.detach_inferior(inf_a)
+ tr.detach_inferior(inf_b)
+ inf_a.close()
+ inf_b.close()
+ # The new inferiors use cyclic transfer-ID; the tracer should reconfigure itself automatically!
+ can_peers: typing.Set[CANMockMedia] = set()
+ inf_a = CANTransport(CANMockMedia(can_peers, 64, 2), 111)
+ inf_b = CANTransport(CANMockMedia(can_peers, 64, 2), 111)
+ tr.attach_inferior(inf_a)
+ tr.attach_inferior(inf_b)
+ # Capture should have been launched automatically.
+ assert inf_a.capture_active
+ assert inf_b.capture_active
+
+ # Send transfer over CAN and observe that it is handled well.
+ transfer = AlienTransfer(
+ AlienTransferMetadata(
+ priority=Priority.IMMEDIATE,
+ transfer_id=19,
+ session_specifier=AlienSessionSpecifier(
+ source_node_id=111,
+ destination_node_id=22,
+ data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+ ),
+ ),
+ [memoryview(b"hello")],
+ )
+ assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+ await wait(2)
+ with lock:
+ # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+ assert len(traces) == 2
+ trace = traces.pop(0)
+ assert isinstance(trace, TransferTrace)
+ assert trace.transfer == transfer
+ # This is the duplicate.
+ assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+ assert not traces
+
+ # Dispose of everything.
+ tr.close()
+ await asyncio.sleep(1.0)
def _unittest_redundant_transport_reconfiguration() -> None:
diff --git a/tests/transport/serial/_serial.py b/tests/transport/serial/_serial.py
index 4fe4e78..c0ce3ca 100644
--- a/tests/transport/serial/_serial.py
+++ b/tests/transport/serial/_serial.py
@@ -476,7 +476,9 @@
tr = pyuavcan.transport.serial.SerialTransport("loop://", None, mtu=1024)
mon_events: typing.List[pyuavcan.transport.Capture] = []
+ assert not tr.capture_active
tr.begin_capture(mon_events.append)
+ assert tr.capture_active
transfer = AlienTransfer(
AlienTransferMetadata(
diff --git a/tests/transport/udp/_udp.py b/tests/transport/udp/_udp.py
index be7d47b..1641295 100644
--- a/tests/transport/udp/_udp.py
+++ b/tests/transport/udp/_udp.py
@@ -265,7 +265,9 @@
assert isinstance(s, UDPCapture)
captures.append(s)
+ assert not tr_capture.capture_active
tr_capture.begin_capture(inhale)
+ assert tr_capture.capture_active
await asyncio.sleep(1.0)
tr = UDPTransport("127.50.0.111")