pytoolbox.network.smpte2022.receiver module

class pytoolbox.network.smpte2022.receiver.FecReceiver(output: StringIO)[source]

Bases: object

A SMPTE 2022-1 FEC streams receiver. This receiver accept incoming RTP media and FEC packets and make available the recovered media stream.

Example usage (with a network capture)

>> import dpkt
>> from pytoolbox.network.smpte2022.base import FecPacket, RtpPacket
>> from pytoolbox.network.smpte2022.receiver import FecReceiver
>> from struct import *
>>
>> fec_receiver = FecReceiver(open('test.ts', 'wb'))
>> fec_receiver.set_delay(1024, FecReceiver.PACKETS)
>>
>> for ts, buf in dpkt.pcap.Reader(open('test.dump')):
>>     udp = dpkt.ethernet.Ethernet(buf).data.data
>>     if udp.dport == 3300:
>>         media_packet = RtpPacket.create(
..             unpack('!h', udp.data[2:4])[0], unpack('!i', udp.data[4:8])[0],
..             RtpPacket.MP2T_PT, bytearray(udp.data[12:]))
>>         fec_receiver.put_media(media_packet, onlyMP2TS=True)
>>     elif udp.dport in (3302, 3304):
>>         fec_data = bytearray(udp.data)
>>         fec_receiver.put_fec(FecPacket(fec_data, len(fec_data)))
>> print(fec_receiver)
>> fec_receiver.flush()

Example usage

Media packets are sorted by the buffer, so, it’s time to test this feature:

>>> import io, random
>>> output = io.StringIO()
>>> receiver = FecReceiver(output)
>>> receiver.set_delay(1000, FecReceiver.PACKETS)
>>> source = list(range(30))
>>> random.shuffle(source)
>>> for i in source:
...     receiver.put_media(RtpPacket.create(i, i * 100, RtpPacket.MP2T_PT, str(i)), True)
>>> receiver.flush()
>>> output.getvalue()
'01234567891011121314151617181920212223242526272829'

Testing FEC algorithm correctness:

>>> import io, os, random
>>> output = io.BytesIO()
>>> receiver = FecReceiver(output)
>>> receiver.set_delay(1024, FecReceiver.PACKETS)
>>> L, D = 4, 5
>>> # Generate a [D][L] matrix of randomly generated RTP packets
>>> matrix = [[RtpPacket.create(L * j + i, (L * j + i) * 100 + random.randint(0, 50),
...           RtpPacket.MP2T_PT, bytearray(os.urandom(random.randint(50, 100))))
...           for i in range(L)] for j in range(D)]
>>> assert len(matrix) == D and len(matrix[0]) == L
>>> # Retrieve the first column of the matrix
>>> for column in matrix:
...     for media in column:
...         if media.sequence != 0:
...             receiver.put_media(media, True)
>>> fec = FecPacket.compute(1, FecPacket.XOR, FecPacket.COL, L, D, [p[0] for p in matrix[0:]])
>>> print('dir={0} snbase={1} offset={2} na={3}'.format(
...     fec.direction, fec.snbase, fec.offset, fec.na))
dir=0 snbase=0 offset=4 na=5
>>> receiver.put_fec(fec)
>>> print(receiver)
Name  Received Buffered Maximum Dropped
Media       19       20      20
Col          1        0       1       0
Row          0        0       0       0
Cross                 0       1
FEC statistics, media packets :
Recovered Aborted Overwritten Missing
        1       0           0       0
Current position (media sequence) : 0
Current delay (can be set) : 20 packets
FEC matrix size (LxD) : 4x5 = 20 packets
>>> receiver.flush()

The output does begin by recovered first packet of the matrix:

>>> print(matrix[0][1].payload == output.getvalue()[:len(matrix[0][1].payload)])
False
>>> print(matrix[0][0].payload == output.getvalue()[:len(matrix[0][0].payload)])
True
ER_DELAY_UNITS = "Unknown delay units '{0}'"
ER_DIRECTION = 'FEC packet direction is neither COL nor ROW : {0}'
ER_FLUSHING = 'Currently flushing buffers'
ER_MISSING_COUNT = 'They are {0} missing media packet, expected one (1)'
ER_FEC_DIRECTION = "Invalid FEC direction '{0}'"
ER_COL_MISMATCH = 'Column FEC packet n°{0}, expected n°{1}'
ER_COL_OVERWRITE = 'Another column FEC packet is already registered to protect media packet n°{0}'
ER_ROW_MISMATCH = 'Row FEC packet n°{0}, expected n°{1}'
ER_ROW_OVERWRITE = 'Another row FEC packet is already registered to protect media packet n°{0}'
ER_GET_COL_CASCADE = 'Column FEC cascade : Unable to compute sequence # of the media packet to recover{0}{1}{0}'
ER_GET_ROW_CASCADE = 'Row FEC cascade : Unable to compute sequence # of the media packet to recover{0}{1}{0}'
ER_NULL_COL_CASCADE = 'Column FEC cascade : Unable to find linked entry in crosses buffer'
ER_NULL_ROW_CASCADE = 'Row FEC cascade : Unable to find linked entry in crosses buffer'
ER_STARTUP = 'Current position still not initialized (startup state)'
ER_VALID_RTP_MP2TS = 'packet is not valid (expected RTP packet + MPEG2-TS payload)'
ER_VALID_RTP = 'packet is not valid (expected RTP packet)'
DELAY_NAMES = ['packets', 'seconds']
DELAY_RANGE = range(0, 2)
PACKETS = 0
SECONDS = 1
__init__(output: StringIO)[source]

Construct a new FecReceiver and register output.

Parameters:

output – Where to output payload of the recovered stream.

Example usage

Not yet an output:

>>> FecReceiver(None)
Traceback (most recent call last):
    ...
ValueError: output is None
>>>
>>> import io
>>> output = io.StringIO()
>>> receiver = FecReceiver(output)
property current_delay: int

Return current delay based on the length of the media buffer.

set_delay(value, units) None[source]

Set desired size for the internal media buffer.

put_media(media, onlyMP2TS)[source]

Put an incoming media packet.

put_fec(fec: FecPacket) None[source]

Put an incoming FEC packet, the algorithm will do the following according to these scenarios:

  1. The fec packet is useless if none of the protected media packets is missing

  2. Only on media packet missing, fec packet is able to recover it now !

  3. More than one media packet is missing, fec packet stored for future recovery

flush() None[source]

Flush all buffers and output media packets to registered output (self.output).

cleanup() None[source]

Remove FEC packets that are stored / waiting but useless.

recover_media_packet(media_sequence, cross: dict, fec: FecPacket | None) None[source]

Recover a missing media packet helped by a FEC packet, this method is also called to register an incoming media packet if it is registered as missing.

out() None[source]

Extract packets to output in order to keep a ‘certain’ amount of them in the buffer.

static compute_col_address(media_socket)[source]

Compute column FEC socket based on media stream socket (port +2).

Example usage

>>> from pytoolbox.unittest import asserts
>>> asserts.dict_equal(FecReceiver.compute_col_address('192.168.50.100:8000'), {
...     'ip': '192.168.50.100', 'port': 8002
... })
>>> asserts.dict_equal(FecReceiver.compute_col_address(IPSocket('50.0.0.7:4000')), {
...     'ip': '50.0.0.7', 'port': 4002
... })
>>> print(FecReceiver.compute_col_address('salut'))
Traceback (most recent call last):
    ....
pytoolbox.exceptions.InvalidIPSocketError: salut is not a valid IP socket.
static compute_row_address(media_socket)[source]

Compute column FEC socket based on media stream socket (port +4).

Example usage

>>> from pytoolbox.unittest import asserts
>>> asserts.dict_equal(FecReceiver.compute_row_address('192.168.50.100:8000'), {
...     'ip': '192.168.50.100', 'port': 8004
... })
>>> asserts.dict_equal(FecReceiver.compute_row_address(IPSocket('50.0.0.7:4000')), {
...     'ip': '50.0.0.7', 'port': 4004
... })
>>> print(FecReceiver.compute_row_address('salut'))
Traceback (most recent call last):
    ....
pytoolbox.exceptions.InvalidIPSocketError: salut is not a valid IP socket.
static validity_window(current, start, end)[source]

Returns True if current is in the validity window bounded by start and end.

This method is circular-buffer aware and they are 2 cases (validity window [====]):

1) start=     6 end=9 :   0   1  2 3 4 5 [=======] 10 ... 65'533  65'534 65'535
2) start=65'534 end=1 :  ======] 2 3 4 5  6 7 8 9  10 ... 65'533 [=============

Example usage

Testing validity window condition:

>>> FecReceiver.validity_window(0, 5, 10)
False
>>> FecReceiver.validity_window(5, 5, 10)
True
>>> FecReceiver.validity_window(8, 5, 10)
True
>>> FecReceiver.validity_window(10, 5, 10)
True
>>> FecReceiver.validity_window(15, 5, 10)
False
>>> FecReceiver.validity_window(0, 65534, 2)
True
>>> FecReceiver.validity_window(2, 65534, 2)
True
>>> FecReceiver.validity_window(5, 65534, 2)
False
>>> FecReceiver.validity_window(65534, 65534, 2)
True
>>> FecReceiver.validity_window(65535, 65534, 2)
True