Implement capture/tracing for redundant transport (#157)

diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml
index 04efaae..20584d3 100644
--- a/.idea/dictionaries/pavel.xml
+++ b/.idea/dictionaries/pavel.xml
@@ -50,11 +50,14 @@
       <w>datagrams</w>
       <w>dcabf</w>
       <w>ddddddd</w>
+      <w>ddlm</w>
       <w>deadbeefdeadbeefdeadbeefdeadbeef</w>
       <w>deallocated</w>
       <w>debian</w>
       <w>decaxta</w>
+      <w>deduplicates</w>
       <w>deduplicator</w>
+      <w>deduplicators</w>
       <w>demultiplexer</w>
       <w>demultiplexing</w>
       <w>demux</w>
diff --git a/noxfile.py b/noxfile.py
index 75dd3b5..2c66a16 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -153,7 +153,7 @@
         return 0
 
     session.install("-e", f".[{','.join(EXTRAS_REQUIRE.keys())}]")
-    session.install("git+https://github.com/UAVCAN/yakut@orc")  # TODO: use stable version from PyPI when deployed.
+    session.install("git+https://github.com/UAVCAN/yakut@af4a4fc0ca6fa0b62b3ec266d3f8cddeceb27446")  # TODO: use PyPI
 
     demo_dir = ROOT_DIR / "demo"
     tmp_dir = Path(session.create_tmp()).resolve()
diff --git a/pyuavcan/VERSION b/pyuavcan/VERSION
index ff006e9..6942b49 100644
--- a/pyuavcan/VERSION
+++ b/pyuavcan/VERSION
@@ -1 +1 @@
-1.2.0.b4
+1.2.0.b5
diff --git a/pyuavcan/application/_port_list_publisher.py b/pyuavcan/application/_port_list_publisher.py
index bd2c724..a96ed97 100644
--- a/pyuavcan/application/_port_list_publisher.py
+++ b/pyuavcan/application/_port_list_publisher.py
@@ -81,12 +81,13 @@
         if publisher is None:
             return
 
-        input_ds = [x.specifier.data_specifier for x in self.node.presentation.transport.input_sessions]
+        trans = self.node.presentation.transport
+        input_ds = [x.specifier.data_specifier for x in trans.input_sessions]
         srv_in_ds = [x for x in input_ds if isinstance(x, ServiceDataSpecifier)]
         state = _State(
             pub={
                 x.specifier.data_specifier.subject_id
-                for x in self.node.presentation.transport.output_sessions
+                for x in trans.output_sessions
                 if isinstance(x.specifier.data_specifier, MessageDataSpecifier)
             },
             sub={x.subject_id for x in input_ds if isinstance(x, MessageDataSpecifier)},
@@ -99,8 +100,8 @@
         if state_changed or time_expired:
             _logger.debug("%r: Publishing: state_changed=%r, state=%r", self, state_changed, state)
             self._state = state
-            self._updates_since_pub = 0
-            publisher.publish_soon(_make_port_list(self._state))  # Should we handle ResourceClosedError here?
+            self._updates_since_pub = 0  # Should we handle ResourceClosedError here?
+            publisher.publish_soon(_make_port_list(self._state, trans.capture_active))
 
     def __repr__(self) -> str:
         return pyuavcan.util.repr_attributes(self, self.node)
@@ -109,10 +110,12 @@
 _logger = logging.getLogger(__name__)
 
 
-def _make_port_list(state: _State) -> List:
+def _make_port_list(state: _State, packet_capture_mode: bool) -> List:
+    from uavcan.primitive import Empty_1_0 as Empty
+
     return List(
         publishers=_make_subject_id_list(state.pub),
-        subscribers=_make_subject_id_list(state.sub),
+        subscribers=_make_subject_id_list(state.sub) if not packet_capture_mode else SubjectIDList(total=Empty()),
         clients=_make_service_id_list(state.cln),
         servers=_make_service_id_list(state.srv),
     )
@@ -150,7 +153,7 @@
         srv=set(range(512)),
     )
 
-    msg = _make_port_list(state)
+    msg = _make_port_list(state, False)
 
     assert msg.publishers.sparse_list is not None
     pubs = [x.value for x in msg.publishers.sparse_list]
diff --git a/pyuavcan/transport/_tracer.py b/pyuavcan/transport/_tracer.py
index 1e075cf..531ca7e 100644
--- a/pyuavcan/transport/_tracer.py
+++ b/pyuavcan/transport/_tracer.py
@@ -21,6 +21,15 @@
 
     timestamp: pyuavcan.transport.Timestamp
 
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.Transport]:
+        """
+        Static reference to the type of transport that can emit captures of this type.
+        For example, for UAVCAN/serial it would be :class:`pyuavcan.transport.serial.SerialTransport`.
+        Although the method is static, it shall be overridden by all inheritors.
+        """
+        raise NotImplementedError
+
 
 CaptureCallback = typing.Callable[[Capture], None]
 
@@ -179,7 +188,5 @@
         Reconstructed multi-frame transfers are reported as a single event when the last frame is received.
 
         Capture instances that are not supported by the current transport are silently ignored and None is returned.
-        This is to simplify tracing over heterogeneous transports where there are several tracer instances used
-        concurrently, one per transport type.
         """
         raise NotImplementedError
diff --git a/pyuavcan/transport/_transport.py b/pyuavcan/transport/_transport.py
index 12393d5..c9b6909 100644
--- a/pyuavcan/transport/_transport.py
+++ b/pyuavcan/transport/_transport.py
@@ -215,6 +215,14 @@
         """
         raise NotImplementedError
 
+    @property
+    @abc.abstractmethod
+    def capture_active(self) -> bool:
+        """
+        Whether :meth:`begin_capture` was invoked and packet capture is being performed on this transport.
+        """
+        raise NotImplementedError
+
     @staticmethod
     @abc.abstractmethod
     def make_tracer() -> Tracer:
diff --git a/pyuavcan/transport/can/_can.py b/pyuavcan/transport/can/_can.py
index 959653d..ec7a2cd 100644
--- a/pyuavcan/transport/can/_can.py
+++ b/pyuavcan/transport/can/_can.py
@@ -70,6 +70,8 @@
     Please read the module documentation for details.
     """
 
+    TRANSFER_ID_MODULO = TRANSFER_ID_MODULO
+
     def __init__(
         self,
         media: Media,
@@ -239,6 +241,10 @@
         self._capture_handlers.append(handler)
         self._reconfigure_acceptance_filters()
 
+    @property
+    def capture_active(self) -> bool:
+        return len(self._capture_handlers) > 0
+
     @staticmethod
     def make_tracer() -> CANTracer:
         """
diff --git a/pyuavcan/transport/can/_tracer.py b/pyuavcan/transport/can/_tracer.py
index ed6ca2c..3722bde 100644
--- a/pyuavcan/transport/can/_tracer.py
+++ b/pyuavcan/transport/can/_tracer.py
@@ -6,6 +6,7 @@
 import typing
 import dataclasses
 import pyuavcan
+import pyuavcan.transport.can
 from pyuavcan.transport import Trace, TransferTrace, AlienSessionSpecifier, AlienTransferMetadata, Capture
 from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp, Priority
 from ._session import TransferReassemblyErrorID, TransferReassembler
@@ -46,6 +47,10 @@
         direction = "tx" if self.own else "rx"
         return pyuavcan.util.repr_attributes(self, self.timestamp, direction, self.frame)
 
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.can.CANTransport]:
+        return pyuavcan.transport.can.CANTransport
+
 
 @dataclasses.dataclass(frozen=True)
 class CANErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/loopback/_loopback.py b/pyuavcan/transport/loopback/_loopback.py
index 95a033d..bc81226 100644
--- a/pyuavcan/transport/loopback/_loopback.py
+++ b/pyuavcan/transport/loopback/_loopback.py
@@ -177,6 +177,10 @@
     def begin_capture(self, handler: pyuavcan.transport.CaptureCallback) -> None:
         self._capture_handlers.append(handler)
 
+    @property
+    def capture_active(self) -> bool:
+        return len(self._capture_handlers) > 0
+
     @staticmethod
     def make_tracer() -> LoopbackTracer:
         """
diff --git a/pyuavcan/transport/loopback/_tracer.py b/pyuavcan/transport/loopback/_tracer.py
index 9f163cb..14c9be2 100644
--- a/pyuavcan/transport/loopback/_tracer.py
+++ b/pyuavcan/transport/loopback/_tracer.py
@@ -2,9 +2,10 @@
 # This software is distributed under the terms of the MIT License.
 # Author: Pavel Kirienko <pavel@uavcan.org>
 
+from __future__ import annotations
 import typing
 import dataclasses
-import pyuavcan.transport
+import pyuavcan.transport.loopback
 from pyuavcan.transport import Trace, TransferTrace, Capture
 
 
@@ -16,6 +17,10 @@
 
     transfer: pyuavcan.transport.AlienTransfer
 
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.loopback.LoopbackTransport]:
+        return pyuavcan.transport.loopback.LoopbackTransport
+
 
 class LoopbackTracer(pyuavcan.transport.Tracer):
     """
diff --git a/pyuavcan/transport/redundant/__init__.py b/pyuavcan/transport/redundant/__init__.py
index 6b72403..b344c04 100644
--- a/pyuavcan/transport/redundant/__init__.py
+++ b/pyuavcan/transport/redundant/__init__.py
@@ -372,3 +372,7 @@
 from ._session import RedundantFeedback as RedundantFeedback
 
 from ._error import InconsistentInferiorConfigurationError as InconsistentInferiorConfigurationError
+
+from ._tracer import RedundantCapture as RedundantCapture
+from ._tracer import RedundantDuplicateTransferTrace as RedundantDuplicateTransferTrace
+from ._tracer import RedundantTracer as RedundantTracer
diff --git a/pyuavcan/transport/redundant/_deduplicator/_base.py b/pyuavcan/transport/redundant/_deduplicator/_base.py
index 78d5b04..f57ff04 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_base.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_base.py
@@ -2,21 +2,59 @@
 # This software is distributed under the terms of the MIT License.
 # Author: Pavel Kirienko <pavel@uavcan.org>
 
+from __future__ import annotations
 import abc
+import typing
 import pyuavcan.transport
 
 
 class Deduplicator(abc.ABC):
+    """
+    The abstract class implementing the transfer-wise deduplication strategy.
+    **Users of redundant transports do not need to deduplicate their transfers manually
+    as it will be done automatically.**
+    Please read the module documentation for further details.
+    """
+
+    MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD = int(2 ** 48)
+    """
+    An inferior transport whose transfer-ID modulo is less than this value is expected to experience
+    transfer-ID overflows routinely during its operation. Otherwise, the transfer-ID is not expected to
+    overflow for centuries.
+
+    A transfer-ID counter that is expected to overflow is called "cyclic", otherwise it's "monotonic".
+    Read https://forum.uavcan.org/t/alternative-transport-protocols/324.
+    See :meth:`new`.
+    """
+
+    @staticmethod
+    def new(transfer_id_modulo: int) -> Deduplicator:
+        """
+        A helper factory that constructs a :class:`MonotonicDeduplicator` if the argument is not less than
+        :attr:`MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD`, otherwise constructs a :class:`CyclicDeduplicator`.
+        """
+        from . import CyclicDeduplicator, MonotonicDeduplicator
+
+        if transfer_id_modulo >= Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+            return MonotonicDeduplicator()
+        return CyclicDeduplicator(transfer_id_modulo)
+
     @abc.abstractmethod
     def should_accept_transfer(
-        self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+        self,
+        *,
+        iface_id: int,
+        transfer_id_timeout: float,
+        timestamp: pyuavcan.transport.Timestamp,
+        source_node_id: typing.Optional[int],
+        transfer_id: int,
     ) -> bool:
         """
         The iface-ID is an arbitrary integer that is unique within the redundant group identifying the transport
         instance the transfer was received from.
         It could be the index of the redundant interface (e.g., 0, 1, 2 for a triply-redundant transport),
         or it could be something else like a memory address of a related object.
-        Embedded applications usually use indexes, whereas in PyUAVCAN it may be more convenient to use ``id()``.
+        Embedded applications usually use indexes, whereas in PyUAVCAN it may be more convenient to use :func:`id`.
 
         The transfer-ID timeout is specified in seconds. It is used to handle the case of a node restart.
         """
diff --git a/pyuavcan/transport/redundant/_deduplicator/_cyclic.py b/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
index 6d0f5d8..8adb558 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_cyclic.py
@@ -15,32 +15,36 @@
         self._remote_states: typing.List[typing.Optional[_RemoteState]] = []
 
     def should_accept_transfer(
-        self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+        self,
+        *,
+        iface_id: int,
+        transfer_id_timeout: float,
+        timestamp: pyuavcan.transport.Timestamp,
+        source_node_id: typing.Optional[int],
+        transfer_id: int,
     ) -> bool:
-        if transfer.source_node_id is None:
+        if source_node_id is None:
             # Anonymous transfers are fully stateless, so always accepted.
             # This may lead to duplications and reordering but this is a design limitation.
             return True
 
         # If a similar architecture is used on an embedded system, this normally would be a static array.
-        if len(self._remote_states) <= transfer.source_node_id:
-            self._remote_states += [None] * (transfer.source_node_id - len(self._remote_states) + 1)
-            assert len(self._remote_states) == transfer.source_node_id + 1
+        if len(self._remote_states) <= source_node_id:
+            self._remote_states += [None] * (source_node_id - len(self._remote_states) + 1)
+            assert len(self._remote_states) == source_node_id + 1
 
-        if self._remote_states[transfer.source_node_id] is None:
+        if self._remote_states[source_node_id] is None:
             # First transfer from this node, create new state and accept unconditionally.
-            self._remote_states[transfer.source_node_id] = _RemoteState(
-                iface_id=iface_id, last_timestamp=transfer.timestamp
-            )
+            self._remote_states[source_node_id] = _RemoteState(iface_id=iface_id, last_timestamp=timestamp)
             return True
 
         # We have seen transfers from this node before, so we need to perform actual deduplication.
-        state = self._remote_states[transfer.source_node_id]
+        state = self._remote_states[source_node_id]
         assert state is not None
 
         # If the current interface was seen working recently, reject traffic from other interfaces.
         # Note that the time delta may be negative due to timestamping variations and inner latency variations.
-        time_delta = transfer.timestamp.monotonic - state.last_timestamp.monotonic
+        time_delta = timestamp.monotonic - state.last_timestamp.monotonic
         iface_switch_allowed = time_delta > transfer_id_timeout
         if not iface_switch_allowed and state.iface_id != iface_id:
             return False
@@ -50,7 +54,7 @@
 
         # Either we're on the same interface or (the interface is new and the current one seems to be down).
         state.iface_id = iface_id
-        state.last_timestamp = transfer.timestamp
+        state.last_timestamp = timestamp
         return True
 
 
diff --git a/pyuavcan/transport/redundant/_deduplicator/_monotonic.py b/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
index e5d3293..3b2e604 100644
--- a/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
+++ b/pyuavcan/transport/redundant/_deduplicator/_monotonic.py
@@ -13,38 +13,42 @@
         self._remote_states: typing.List[typing.Optional[_RemoteState]] = []
 
     def should_accept_transfer(
-        self, iface_id: int, transfer_id_timeout: float, transfer: pyuavcan.transport.TransferFrom
+        self,
+        *,
+        iface_id: int,
+        transfer_id_timeout: float,
+        timestamp: pyuavcan.transport.Timestamp,
+        source_node_id: typing.Optional[int],
+        transfer_id: int,
     ) -> bool:
         del iface_id  # Not used in monotonic deduplicator.
-        if transfer.source_node_id is None:
+        if source_node_id is None:
             # Anonymous transfers are fully stateless, so always accepted.
             # This may lead to duplications and reordering but this is a design limitation.
             return True
 
         # If a similar architecture is used on an embedded system, this normally would be a static array.
-        if len(self._remote_states) <= transfer.source_node_id:
-            self._remote_states += [None] * (transfer.source_node_id - len(self._remote_states) + 1)
-            assert len(self._remote_states) == transfer.source_node_id + 1
+        if len(self._remote_states) <= source_node_id:
+            self._remote_states += [None] * (source_node_id - len(self._remote_states) + 1)
+            assert len(self._remote_states) == source_node_id + 1
 
-        if self._remote_states[transfer.source_node_id] is None:
+        if self._remote_states[source_node_id] is None:
             # First transfer from this node, create new state and accept unconditionally.
-            self._remote_states[transfer.source_node_id] = _RemoteState(
-                last_transfer_id=transfer.transfer_id, last_timestamp=transfer.timestamp
-            )
+            self._remote_states[source_node_id] = _RemoteState(last_transfer_id=transfer_id, last_timestamp=timestamp)
             return True
 
         # We have seen transfers from this node before, so we need to perform actual deduplication.
-        state = self._remote_states[transfer.source_node_id]
+        state = self._remote_states[source_node_id]
         assert state is not None
 
         # If we have seen transfers with higher TID values recently, reject this one as duplicate.
-        tid_timeout = (transfer.timestamp.monotonic - state.last_timestamp.monotonic) > transfer_id_timeout
-        if not tid_timeout and transfer.transfer_id <= state.last_transfer_id:
+        tid_timeout = (timestamp.monotonic - state.last_timestamp.monotonic) > transfer_id_timeout
+        if not tid_timeout and transfer_id <= state.last_transfer_id:
             return False
 
         # Otherwise, this is either a new transfer or a TID timeout condition has occurred.
-        state.last_transfer_id = transfer.transfer_id
-        state.last_timestamp = transfer.timestamp
+        state.last_transfer_id = transfer_id
+        state.last_timestamp = timestamp
         return True
 
 
diff --git a/pyuavcan/transport/redundant/_redundant_transport.py b/pyuavcan/transport/redundant/_redundant_transport.py
index e310dab..82ee43f 100644
--- a/pyuavcan/transport/redundant/_redundant_transport.py
+++ b/pyuavcan/transport/redundant/_redundant_transport.py
@@ -9,6 +9,8 @@
 import pyuavcan.transport
 from ._session import RedundantInputSession, RedundantOutputSession, RedundantSession
 from ._error import InconsistentInferiorConfigurationError
+from ._deduplicator import Deduplicator
+from ._tracer import RedundantTracer, RedundantCapture
 
 
 _logger = logging.getLogger(__name__)
@@ -33,15 +35,6 @@
     Please read the module documentation for details.
     """
 
-    MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD = int(2 ** 48)
-    """
-    An inferior transport whose transfer-ID modulo is less than this value is expected to experience
-    transfer-ID overflows routinely during its operation. Otherwise, the transfer-ID is not expected to
-    overflow for centuries.
-    A transfer-ID counter that is expected to overflow is called "cyclic", otherwise it's "monotonic".
-    Read https://forum.uavcan.org/t/alternative-transport-protocols/324.
-    """
-
     def __init__(self, *, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:
         """
         :param loop: All inferiors shall run on the same event loop,
@@ -50,7 +43,7 @@
         """
         self._cols: typing.List[pyuavcan.transport.Transport] = []
         self._rows: typing.Dict[pyuavcan.transport.SessionSpecifier, RedundantSession] = {}
-        self._capture_handlers: typing.List[pyuavcan.transport.CaptureCallback] = []
+        self._unwrapped_capture_handlers: typing.List[typing.Callable[[RedundantCapture], None]] = []
         self._loop = loop if loop is not None else asyncio.get_event_loop()
         self._check_matrix_consistency()
 
@@ -112,7 +105,9 @@
     ) -> RedundantInputSession:
         out = self._get_session(
             specifier,
-            lambda fin: RedundantInputSession(specifier, payload_metadata, self._get_tid_modulo, self._loop, fin),
+            lambda fin: RedundantInputSession(
+                specifier, payload_metadata, lambda: self.protocol_parameters.transfer_id_modulo, self._loop, fin
+            ),
         )
         assert isinstance(out, RedundantInputSession)
         self._check_matrix_consistency()
@@ -170,8 +165,6 @@
         the operation will be rolled back to ensure state consistency.
         """
         self._validate_inferior(transport)
-        for ch in self._capture_handlers:
-            transport.begin_capture(ch)
         self._cols.append(transport)
         try:
             for redundant_session in self._rows.values():
@@ -181,6 +174,9 @@
             raise
         finally:
             self._check_matrix_consistency()
+        # Launch the capture as late as possible to not leave it dangling if the attachment failed.
+        for ch in self._unwrapped_capture_handlers:
+            transport.begin_capture(self._wrap_capture_handler(transport, ch))
 
     def detach_inferior(self, transport: pyuavcan.transport.Transport) -> None:
         """
@@ -234,25 +230,32 @@
     def begin_capture(self, handler: pyuavcan.transport.CaptureCallback) -> None:
         """
         Stores the handler in the local list of handlers.
-        Invokes :class:`pyuavcan.transport.Transport.begin_capture` on each inferior with the provided handler.
+        Invokes :class:`pyuavcan.transport.Transport.begin_capture` on each inferior.
         If at least one inferior raises an exception, it is propagated immediately and the remaining inferiors
         will remain in an inconsistent state.
         When a new inferior is added later, the stored handlers will be automatically used to enable capture on it.
         If such auto-restoration behavior is undesirable, configure capture individually per-inferior instead.
 
-        The redundant transport does not define its own capture events and does not wrap captured events reported
-        by its inferiors.
+        Every capture emitted by the inferiors is wrapped into :class:`RedundantCapture`,
+        which contains additional metadata about the inferior transport instance that emitted the capture.
+        This is done to let users understand which transport of the redundant group has
+        provided the capture and also this information is used by :class:`RedundantTracer`
+        to automatically manage transfer deduplication.
         """
-        self._capture_handlers.append(handler)
+        self._unwrapped_capture_handlers.append(handler)
         for c in self._cols:
-            c.begin_capture(handler)
+            c.begin_capture(self._wrap_capture_handler(c, handler))
+
+    @property
+    def capture_active(self) -> bool:
+        return len(self._unwrapped_capture_handlers) > 0
 
     @staticmethod
-    def make_tracer() -> pyuavcan.transport.Tracer:
+    def make_tracer() -> RedundantTracer:
         """
-        This method is not implemented for redundant transport. Access the inferiors directly instead.
+        See :class:`RedundantTracer`.
         """
-        raise NotImplementedError
+        return RedundantTracer()
 
     async def spoof(self, transfer: pyuavcan.transport.AlienTransfer, monotonic_deadline: float) -> bool:
         """
@@ -262,6 +265,7 @@
         First exception to occur terminates the operation and is raised immediately.
         This is different from regular sending; the assumption is that the caller necessarily wants to ensure
         that spoofing takes place against every inferior.
+        If this is not the case, spoof each inferior separately.
         """
         if not self._cols:
             return False
@@ -297,8 +301,11 @@
                 )
 
             # Ensure all inferiors use the same transfer-ID overflow policy.
-            if self._get_tid_modulo() is None:
-                if transport.protocol_parameters.transfer_id_modulo < self.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+            if self.protocol_parameters.transfer_id_modulo >= Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
+                if (
+                    transport.protocol_parameters.transfer_id_modulo
+                    < Deduplicator.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD
+                ):
                     raise InconsistentInferiorConfigurationError(
                         f"The new inferior shall use monotonic transfer-ID counters in order to match the "
                         f"other inferiors in the redundant transport group"
@@ -358,14 +365,24 @@
             owner._close_inferior(new_index)  # pylint: disable=protected-access
             raise
 
-    def _get_tid_modulo(self) -> typing.Optional[int]:
-        if self.protocol_parameters.transfer_id_modulo < self.MONOTONIC_TRANSFER_ID_MODULO_THRESHOLD:
-            return self.protocol_parameters.transfer_id_modulo
-        return None
-
     def _check_matrix_consistency(self) -> None:
         for row in self._rows.values():
             assert len(row.inferiors) == len(self._cols)
 
+    def _wrap_capture_handler(
+        self,
+        inferior: pyuavcan.transport.Transport,
+        handler: typing.Callable[[RedundantCapture], None],
+    ) -> pyuavcan.transport.CaptureCallback:
+        # If you are reading this, send me a postcard.
+        return lambda cap: handler(
+            RedundantCapture(
+                cap.timestamp,
+                inferior=cap,
+                iface_id=id(inferior),
+                transfer_id_modulo=self.protocol_parameters.transfer_id_modulo,  # THIS IS PROBABLY SLOW?
+            )
+        )
+
     def _get_repr_fields(self) -> typing.Tuple[typing.List[typing.Any], typing.Dict[str, typing.Any]]:
         return list(self.inferiors), {}
diff --git a/pyuavcan/transport/redundant/_session/_input.py b/pyuavcan/transport/redundant/_session/_input.py
index 9cd3383..cd0be73 100644
--- a/pyuavcan/transport/redundant/_session/_input.py
+++ b/pyuavcan/transport/redundant/_session/_input.py
@@ -10,7 +10,7 @@
 import pyuavcan.transport
 import pyuavcan.util
 from ._base import RedundantSession, RedundantSessionStatistics
-from .._deduplicator import Deduplicator, MonotonicDeduplicator, CyclicDeduplicator
+from .._deduplicator import Deduplicator
 
 
 _logger = logging.getLogger(__name__)
@@ -52,7 +52,7 @@
         self,
         specifier: pyuavcan.transport.InputSessionSpecifier,
         payload_metadata: pyuavcan.transport.PayloadMetadata,
-        tid_modulo_provider: typing.Callable[[], typing.Optional[int]],
+        tid_modulo_provider: typing.Callable[[], int],
         loop: asyncio.AbstractEventLoop,
         finalizer: typing.Callable[[], None],
     ):
@@ -92,12 +92,7 @@
 
         # Ensure that the deduplicator is constructed when the first inferior is launched.
         if self._deduplicator is None:
-            tid_modulo = self._get_tid_modulo()
-            if tid_modulo is None:
-                self._deduplicator = MonotonicDeduplicator()
-            else:
-                assert 0 < tid_modulo <= 2 ** 56, "Sanity check"
-                self._deduplicator = CyclicDeduplicator(tid_modulo)
+            self._deduplicator = Deduplicator.new(self._get_tid_modulo())
             _logger.debug("%s: Constructed new deduplicator: %s", self, self._deduplicator)
 
         # Synchronize the settings for the newly added inferior with its siblings.
@@ -239,7 +234,13 @@
     ) -> None:
         assert self._deduplicator is not None
         iface_id = id(session)
-        if self._deduplicator.should_accept_transfer(iface_id, self.transfer_id_timeout, transfer):
+        if self._deduplicator.should_accept_transfer(
+            iface_id=iface_id,
+            transfer_id_timeout=self.transfer_id_timeout,
+            timestamp=transfer.timestamp,
+            source_node_id=transfer.source_node_id,
+            transfer_id=transfer.transfer_id,
+        ):
             _logger.debug("%s: Accepting %s from %016x", self, transfer, iface_id)
             self._stat_transfers += 1
             self._stat_payload_bytes += sum(map(len, transfer.fragmented_payload))
@@ -509,7 +510,7 @@
     ses = RedundantInputSession(
         spec,
         meta,
-        tid_modulo_provider=lambda: None,  # Like UDP or serial - infinite modulo.
+        tid_modulo_provider=lambda: 2 ** 56,  # Like UDP or serial - infinite modulo.
         loop=loop,
         finalizer=lambda: None,
     )
diff --git a/pyuavcan/transport/redundant/_tracer.py b/pyuavcan/transport/redundant/_tracer.py
new file mode 100644
index 0000000..ee53cc7
--- /dev/null
+++ b/pyuavcan/transport/redundant/_tracer.py
@@ -0,0 +1,159 @@
+# Copyright (c) 2021 UAVCAN Consortium
+# This software is distributed under the terms of the MIT License.
+# Author: Pavel Kirienko <pavel@uavcan.org>
+
+from __future__ import annotations
+import typing
+import logging
+import dataclasses
+import pyuavcan
+import pyuavcan.transport.redundant
+from ._deduplicator import Deduplicator
+
+
+@dataclasses.dataclass(frozen=True)
+class RedundantCapture(pyuavcan.transport.Capture):
+    """
+    Composes :class:`pyuavcan.transport.Capture` with a reference to the
+    transport instance that yielded this capture.
+    The user may construct such captures manually when performing postmortem analysis of a network data dump
+    to feed them later into :class:`RedundantTracer`.
+    """
+
+    inferior: pyuavcan.transport.Capture
+    """
+    The original capture from the inferior transport.
+    """
+
+    iface_id: int
+    """
+    A unique number that identifies this transport in its redundant group.
+    """
+
+    transfer_id_modulo: int
+    """
+    The number of unique transfer-ID values (that is, the maximum possible transfer-ID plus one)
+    for the transport that emitted this capture.
+    This is actually a transport-specific constant.
+    This value is used by :class:`RedundantTracer` to select the appropriate transfer deduplication strategy.
+    """
+
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.redundant.RedundantTransport]:
+        return pyuavcan.transport.redundant.RedundantTransport
+
+
+@dataclasses.dataclass(frozen=True)
+class RedundantDuplicateTransferTrace(pyuavcan.transport.Trace):
+    """
+    Indicates that the last capture object completed a valid transfer that was discarded as a duplicate
+    (either received from another redundant interface or deterministic data loss mitigation (DDLM) is employed).
+
+    Observe that it is NOT a subclass of :class:`pyuavcan.transport.TransferTrace`!
+    It shall not be one because duplicates should not be processed normally.
+    """
+
+
+class RedundantTracer(pyuavcan.transport.Tracer):
+    """
+    The redundant tracer automatically deduplicates transfers received from multiple redundant transports.
+    It can be used either in real-time or during postmortem analysis.
+    In the latter case the user would construct instances of :class:`RedundantCapture` manually and feed them
+    into the tracer one-by-one.
+    """
+
+    def __init__(self) -> None:
+        self._deduplicators: typing.Dict[RedundantTracer._DeduplicatorSelector, Deduplicator] = {}
+        self._last_transfer_id_modulo = 0
+        self._inferior_tracers: typing.Dict[
+            typing.Tuple[typing.Type[pyuavcan.transport.Transport], int],
+            pyuavcan.transport.Tracer,
+        ] = {}
+
+    def update(self, cap: pyuavcan.transport.Capture) -> typing.Optional[pyuavcan.transport.Trace]:
+        """
+        All instances of :class:`pyuavcan.transport.TransferTrace` are deduplicated,
+        duplicates are simply dropped and :class:`RedundantDuplicateTransferTrace` is returned.
+        All other instances (such as :class:`pyuavcan.transport.ErrorTrace`) are returned unchanged.
+        """
+        _logger.debug("%r: Processing %r", self, cap)
+        if not isinstance(cap, RedundantCapture):
+            return None
+
+        if cap.transfer_id_modulo != self._last_transfer_id_modulo:
+            _logger.info(
+                "%r: TID modulo change detected, resetting state (%d deduplicators dropped): %r --> %r",
+                self,
+                len(self._deduplicators),
+                self._last_transfer_id_modulo,
+                cap.transfer_id_modulo,
+            )
+            # Should we also drop the tracers here? If an inferior transport is removed its tracer will be sitting
+            # here useless, we don't want that. But on the other hand, disturbing the state too much is also no good.
+            self._last_transfer_id_modulo = cap.transfer_id_modulo
+            self._deduplicators.clear()
+
+        tracer = self._get_inferior_tracer(cap.inferior.get_transport_type(), cap.iface_id)
+        trace = tracer.update(cap.inferior)
+        if not isinstance(trace, pyuavcan.transport.TransferTrace):
+            _logger.debug("%r: BYPASS: %r", self, trace)
+            return trace
+
+        meta = trace.transfer.metadata
+        deduplicator = self._get_deduplicator(
+            meta.session_specifier.destination_node_id,
+            meta.session_specifier.data_specifier,
+            cap.transfer_id_modulo,
+        )
+        should_accept = deduplicator.should_accept_transfer(
+            iface_id=cap.iface_id,
+            transfer_id_timeout=trace.transfer_id_timeout,
+            timestamp=trace.timestamp,
+            source_node_id=meta.session_specifier.source_node_id,
+            transfer_id=meta.transfer_id,
+        )
+        if should_accept:
+            _logger.debug("%r: ACCEPT: %r", self, trace)
+            return trace
+        _logger.debug("%r: REJECT: %r", self, trace)
+        return RedundantDuplicateTransferTrace(cap.timestamp)
+
+    def _get_deduplicator(
+        self,
+        destination_node_id: typing.Optional[int],
+        data_specifier: pyuavcan.transport.DataSpecifier,
+        transfer_id_modulo: int,
+    ) -> Deduplicator:
+        selector = RedundantTracer._DeduplicatorSelector(destination_node_id, data_specifier)
+        try:
+            return self._deduplicators[selector]
+        except LookupError:
+            dd = Deduplicator.new(transfer_id_modulo)
+            _logger.debug("%r: New deduplicator for %r: %r", self, selector, dd)
+            self._deduplicators[selector] = dd
+        return self._deduplicators[selector]
+
+    def _get_inferior_tracer(
+        self,
+        inferior_type: typing.Type[pyuavcan.transport.Transport],
+        inferior_iface_id: int,
+    ) -> pyuavcan.transport.Tracer:
+        selector = inferior_type, inferior_iface_id
+        try:
+            return self._inferior_tracers[selector]
+        except LookupError:
+            it = inferior_type.make_tracer()
+            _logger.debug("%r: New inferior tracer for %r: %r", self, selector, it)
+            self._inferior_tracers[selector] = it
+        return self._inferior_tracers[selector]
+
+    @dataclasses.dataclass(frozen=True)
+    class _DeduplicatorSelector:
+        destination_node_id: typing.Optional[int]
+        data_specifier: pyuavcan.transport.DataSpecifier
+
+    def __repr__(self) -> str:
+        return pyuavcan.util.repr_attributes(self, self._inferior_tracers)
+
+
+_logger = logging.getLogger(__name__)
diff --git a/pyuavcan/transport/serial/_serial.py b/pyuavcan/transport/serial/_serial.py
index 6f360b1..e12dfa5 100644
--- a/pyuavcan/transport/serial/_serial.py
+++ b/pyuavcan/transport/serial/_serial.py
@@ -43,6 +43,8 @@
     Please read the module documentation for details.
     """
 
+    TRANSFER_ID_MODULO = SerialFrame.TRANSFER_ID_MASK + 1
+
     VALID_MTU_RANGE = (1024, 1024 ** 3)
     """
     The maximum MTU is practically unlimited, and it is also the default MTU.
@@ -163,7 +165,7 @@
     @property
     def protocol_parameters(self) -> pyuavcan.transport.ProtocolParameters:
         return pyuavcan.transport.ProtocolParameters(
-            transfer_id_modulo=SerialFrame.TRANSFER_ID_MASK + 1,
+            transfer_id_modulo=self.TRANSFER_ID_MODULO,
             max_nodes=len(SerialFrame.NODE_ID_RANGE),
             mtu=self._mtu,
         )
@@ -266,6 +268,10 @@
         """
         self._capture_handlers.append(handler)
 
+    @property
+    def capture_active(self) -> bool:
+        return len(self._capture_handlers) > 0
+
     @staticmethod
     def make_tracer() -> SerialTracer:
         """
diff --git a/pyuavcan/transport/serial/_tracer.py b/pyuavcan/transport/serial/_tracer.py
index eb6af29..d3018b5 100644
--- a/pyuavcan/transport/serial/_tracer.py
+++ b/pyuavcan/transport/serial/_tracer.py
@@ -7,6 +7,7 @@
 import logging
 import dataclasses
 import pyuavcan
+import pyuavcan.transport.serial
 from pyuavcan.transport import Trace, TransferTrace, Capture, AlienSessionSpecifier, AlienTransferMetadata
 from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp
 from pyuavcan.transport.commons.high_overhead_transport import AlienTransferReassembler, TransferReassembler
@@ -49,6 +50,10 @@
         direction = "tx" if self.own else "rx"
         return pyuavcan.util.repr_attributes(self, direction, fragment)
 
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.serial.SerialTransport]:
+        return pyuavcan.transport.serial.SerialTransport
+
 
 @dataclasses.dataclass(frozen=True)
 class SerialErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/udp/_tracer.py b/pyuavcan/transport/udp/_tracer.py
index 524309c..b8ddf1e 100644
--- a/pyuavcan/transport/udp/_tracer.py
+++ b/pyuavcan/transport/udp/_tracer.py
@@ -8,6 +8,7 @@
 import dataclasses
 from ipaddress import IPv4Address, IPv6Address
 import pyuavcan
+import pyuavcan.transport.udp
 from pyuavcan.transport import Trace, TransferTrace, Capture, AlienSessionSpecifier, AlienTransferMetadata
 from pyuavcan.transport import AlienTransfer, TransferFrom, Timestamp
 from pyuavcan.transport.commons.high_overhead_transport import AlienTransferReassembler, TransferReassembler
@@ -172,6 +173,10 @@
         )
         return ses_spec, frame
 
+    @staticmethod
+    def get_transport_type() -> typing.Type[pyuavcan.transport.udp.UDPTransport]:
+        return pyuavcan.transport.udp.UDPTransport
+
 
 @dataclasses.dataclass(frozen=True)
 class UDPErrorTrace(pyuavcan.transport.ErrorTrace):
diff --git a/pyuavcan/transport/udp/_udp.py b/pyuavcan/transport/udp/_udp.py
index 479870a..757403c 100644
--- a/pyuavcan/transport/udp/_udp.py
+++ b/pyuavcan/transport/udp/_udp.py
@@ -37,6 +37,8 @@
     Please read the module documentation for details.
     """
 
+    TRANSFER_ID_MODULO = UDPFrame.TRANSFER_ID_MASK + 1
+
     VALID_MTU_RANGE = 1200, 9000
     """
     The minimum is based on the IPv6 specification, which guarantees that the path MTU is at least 1280 bytes large.
@@ -178,7 +180,7 @@
     @property
     def protocol_parameters(self) -> pyuavcan.transport.ProtocolParameters:
         return pyuavcan.transport.ProtocolParameters(
-            transfer_id_modulo=UDPFrame.TRANSFER_ID_MASK + 1,
+            transfer_id_modulo=self.TRANSFER_ID_MODULO,
             max_nodes=self._sock_factory.max_nodes,
             mtu=self._mtu,
         )
@@ -290,6 +292,10 @@
             self._sniffer = self._sock_factory.make_sniffer(self._process_capture)
         self._capture_handlers.append(handler)
 
+    @property
+    def capture_active(self) -> bool:
+        return self._sniffer is not None
+
     @staticmethod
     def make_tracer() -> UDPTracer:
         """
diff --git a/tests/transport/can/_can.py b/tests/transport/can/_can.py
index dca2103..b0ef322 100644
--- a/tests/transport/can/_can.py
+++ b/tests/transport/can/_can.py
@@ -1005,8 +1005,10 @@
         assert isinstance(cap, CANCapture)
         captures_other.append(cap)
 
+    assert not tr.capture_active
     tr.begin_capture(add_capture)
     tr.begin_capture(add_capture_other)
+    assert tr.capture_active
     assert media.acceptance_filters == [
         FilterConfiguration.new_promiscuous(FrameFormat.BASE),
         FilterConfiguration.new_promiscuous(FrameFormat.EXTENDED),
diff --git a/tests/transport/loopback/_loopback.py b/tests/transport/loopback/_loopback.py
index 0786696..efa84e6 100644
--- a/tests/transport/loopback/_loopback.py
+++ b/tests/transport/loopback/_loopback.py
@@ -302,6 +302,7 @@
 
     mon_events: typing.List[pyuavcan.transport.Capture] = []
     tr.begin_capture(mon_events.append)
+    assert tr.capture_active
 
     transfer = AlienTransfer(
         AlienTransferMetadata(Priority.IMMEDIATE, 54321, AlienSessionSpecifier(1234, None, MessageDataSpecifier(7777))),
diff --git a/tests/transport/redundant/_redundant.py b/tests/transport/redundant/_redundant.py
index 5e9c39e..11a5c99 100644
--- a/tests/transport/redundant/_redundant.py
+++ b/tests/transport/redundant/_redundant.py
@@ -14,6 +14,7 @@
 from pyuavcan.transport.loopback import LoopbackTransport
 from pyuavcan.transport.serial import SerialTransport
 from pyuavcan.transport.udp import UDPTransport
+from pyuavcan.transport.can import CANTransport
 from tests.transport.serial import VIRTUAL_BUS_URI as SERIAL_URI
 
 
@@ -301,22 +302,159 @@
     await asyncio.sleep(1)  # Let all pending tasks finalize properly to avoid stack traces in the output.
 
 
-def _unittest_redundant_transport_capture() -> None:
-    def mon(_x: object) -> None:
-        return None
+@pytest.mark.asyncio  # type: ignore
+async def _unittest_redundant_transport_capture() -> None:
+    from threading import Lock
+    from pyuavcan.transport import Capture, Trace, TransferTrace, Priority, ServiceDataSpecifier
+    from pyuavcan.transport import AlienTransfer, AlienTransferMetadata, AlienSessionSpecifier
+    from pyuavcan.transport.redundant import RedundantDuplicateTransferTrace, RedundantCapture
+    from tests.transport.can.media.mock import MockMedia as CANMockMedia
 
+    asyncio.get_event_loop().slow_callback_duration = 5.0
+
+    tracer = RedundantTransport.make_tracer()
+    traces: typing.List[typing.Optional[Trace]] = []
+    lock = Lock()
+
+    def handle_capture(cap: Capture) -> None:
+        with lock:
+            # Drop TX frames, they are not interesting for this test.
+            assert isinstance(cap, RedundantCapture)
+            if isinstance(cap.inferior, pyuavcan.transport.serial.SerialCapture) and cap.inferior.own:
+                return
+            if isinstance(cap.inferior, pyuavcan.transport.can.CANCapture) and cap.inferior.own:
+                return
+            print("CAPTURE:", cap)
+            traces.append(tracer.update(cap))
+
+    async def wait(how_many: int) -> None:
+        for _ in range(10):
+            await asyncio.sleep(0.1)
+            with lock:
+                if len(traces) >= 2:
+                    return
+        assert False, "No traces received"
+
+    # Setup capture -- one is added before capture started, the other is added later.
+    # Make sure they are treated identically.
     tr = RedundantTransport()
-    inf_a = LoopbackTransport(1234)
-    inf_b = LoopbackTransport(1234)
-    tr.begin_capture(mon)
-    assert inf_a.capture_handlers == []
-    assert inf_b.capture_handlers == []
+    inf_a: pyuavcan.transport.Transport = SerialTransport(SERIAL_URI, 1234)
+    inf_b: pyuavcan.transport.Transport = SerialTransport(SERIAL_URI, 1234)
     tr.attach_inferior(inf_a)
-    assert inf_a.capture_handlers == [mon]
-    assert inf_b.capture_handlers == []
+    assert not tr.capture_active
+    assert not inf_a.capture_active
+    assert not inf_b.capture_active
+    tr.begin_capture(handle_capture)
+    assert tr.capture_active
+    assert inf_a.capture_active
+    assert not inf_b.capture_active
     tr.attach_inferior(inf_b)
-    assert inf_a.capture_handlers == [mon]
-    assert inf_b.capture_handlers == [mon]
+    assert tr.capture_active
+    assert inf_a.capture_active
+    assert inf_b.capture_active
+
+    # Send a transfer and make sure it is handled and deduplicated correctly.
+    transfer = AlienTransfer(
+        AlienTransferMetadata(
+            priority=Priority.IMMEDIATE,
+            transfer_id=1234,
+            session_specifier=AlienSessionSpecifier(
+                source_node_id=321,
+                destination_node_id=222,
+                data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+            ),
+        ),
+        [memoryview(b"hello")],
+    )
+    assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+    await wait(2)
+    with lock:
+        # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+        assert len(traces) == 2
+        trace = traces.pop(0)
+        assert isinstance(trace, TransferTrace)
+        assert trace.transfer == transfer
+        # This is the duplicate.
+        assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+        assert not traces
+
+    # Spoof the same thing again, get nothing out: transfers discarded by the inferior's own reassemblers.
+    # WARNING: this will fail if too much time has passed since the previous transfer due to TID timeout.
+    assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+    await wait(2)
+    with lock:
+        assert None is traces.pop(0)
+        assert None is traces.pop(0)
+        assert not traces
+
+    # But if we change ONLY destination, deduplication will not take place.
+    transfer = AlienTransfer(
+        AlienTransferMetadata(
+            priority=Priority.IMMEDIATE,
+            transfer_id=1234,
+            session_specifier=AlienSessionSpecifier(
+                source_node_id=321,
+                destination_node_id=333,
+                data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+            ),
+        ),
+        [memoryview(b"hello")],
+    )
+    assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+    await wait(2)
+    with lock:
+        # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+        assert len(traces) == 2
+        trace = traces.pop(0)
+        assert isinstance(trace, TransferTrace)
+        assert trace.transfer == transfer
+        # This is the duplicate.
+        assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+        assert not traces
+
+    # Change the inferior configuration and make sure it is handled properly.
+    tr.detach_inferior(inf_a)
+    tr.detach_inferior(inf_b)
+    inf_a.close()
+    inf_b.close()
+    # The new inferiors use cyclic transfer-ID; the tracer should reconfigure itself automatically!
+    can_peers: typing.Set[CANMockMedia] = set()
+    inf_a = CANTransport(CANMockMedia(can_peers, 64, 2), 111)
+    inf_b = CANTransport(CANMockMedia(can_peers, 64, 2), 111)
+    tr.attach_inferior(inf_a)
+    tr.attach_inferior(inf_b)
+    # Capture should have been launched automatically.
+    assert inf_a.capture_active
+    assert inf_b.capture_active
+
+    # Send transfer over CAN and observe that it is handled well.
+    transfer = AlienTransfer(
+        AlienTransferMetadata(
+            priority=Priority.IMMEDIATE,
+            transfer_id=19,
+            session_specifier=AlienSessionSpecifier(
+                source_node_id=111,
+                destination_node_id=22,
+                data_specifier=ServiceDataSpecifier(77, ServiceDataSpecifier.Role.REQUEST),
+            ),
+        ),
+        [memoryview(b"hello")],
+    )
+    assert await tr.spoof(transfer, monotonic_deadline=tr.loop.time() + 1.0)
+    await wait(2)
+    with lock:
+        # Check the status of the deduplication process. We should get two: one transfer, one duplicate.
+        assert len(traces) == 2
+        trace = traces.pop(0)
+        assert isinstance(trace, TransferTrace)
+        assert trace.transfer == transfer
+        # This is the duplicate.
+        assert isinstance(traces.pop(0), RedundantDuplicateTransferTrace)
+        assert not traces
+
+    # Dispose of everything.
+    tr.close()
+    await asyncio.sleep(1.0)
 
 
 def _unittest_redundant_transport_reconfiguration() -> None:
diff --git a/tests/transport/serial/_serial.py b/tests/transport/serial/_serial.py
index 4fe4e78..c0ce3ca 100644
--- a/tests/transport/serial/_serial.py
+++ b/tests/transport/serial/_serial.py
@@ -476,7 +476,9 @@
     tr = pyuavcan.transport.serial.SerialTransport("loop://", None, mtu=1024)
 
     mon_events: typing.List[pyuavcan.transport.Capture] = []
+    assert not tr.capture_active
     tr.begin_capture(mon_events.append)
+    assert tr.capture_active
 
     transfer = AlienTransfer(
         AlienTransferMetadata(
diff --git a/tests/transport/udp/_udp.py b/tests/transport/udp/_udp.py
index be7d47b..1641295 100644
--- a/tests/transport/udp/_udp.py
+++ b/tests/transport/udp/_udp.py
@@ -265,7 +265,9 @@
         assert isinstance(s, UDPCapture)
         captures.append(s)
 
+    assert not tr_capture.capture_active
     tr_capture.begin_capture(inhale)
+    assert tr_capture.capture_active
     await asyncio.sleep(1.0)
 
     tr = UDPTransport("127.50.0.111")