pytoolbox.network.smpte2022.receiver module¶
- class pytoolbox.network.smpte2022.receiver.FecReceiver(output: StringIO)[source]¶
Bases:
object
A SMPTE 2022-1 FEC streams receiver. This receiver accept incoming RTP media and FEC packets and make available the recovered media stream.
Example usage (with a network capture)
>> import dpkt >> from pytoolbox.network.smpte2022.base import FecPacket, RtpPacket >> from pytoolbox.network.smpte2022.receiver import FecReceiver >> from struct import * >> >> fec_receiver = FecReceiver(open('test.ts', 'wb')) >> fec_receiver.set_delay(1024, FecReceiver.PACKETS) >> >> for ts, buf in dpkt.pcap.Reader(open('test.dump')): >> udp = dpkt.ethernet.Ethernet(buf).data.data >> if udp.dport == 3300: >> media_packet = RtpPacket.create( .. unpack('!h', udp.data[2:4])[0], unpack('!i', udp.data[4:8])[0], .. RtpPacket.MP2T_PT, bytearray(udp.data[12:])) >> fec_receiver.put_media(media_packet, onlyMP2TS=True) >> elif udp.dport in (3302, 3304): >> fec_data = bytearray(udp.data) >> fec_receiver.put_fec(FecPacket(fec_data, len(fec_data))) >> print(fec_receiver) >> fec_receiver.flush()
Example usage
Media packets are sorted by the buffer, so, it’s time to test this feature:
>>> import io, random >>> output = io.StringIO() >>> receiver = FecReceiver(output) >>> receiver.set_delay(1000, FecReceiver.PACKETS) >>> source = list(range(30)) >>> random.shuffle(source) >>> for i in source: ... receiver.put_media(RtpPacket.create(i, i * 100, RtpPacket.MP2T_PT, str(i)), True) >>> receiver.flush() >>> output.getvalue() '01234567891011121314151617181920212223242526272829'
Testing FEC algorithm correctness:
>>> import io, os, random >>> output = io.BytesIO() >>> receiver = FecReceiver(output) >>> receiver.set_delay(1024, FecReceiver.PACKETS) >>> L, D = 4, 5 >>> # Generate a [D][L] matrix of randomly generated RTP packets >>> matrix = [[RtpPacket.create(L * j + i, (L * j + i) * 100 + random.randint(0, 50), ... RtpPacket.MP2T_PT, bytearray(os.urandom(random.randint(50, 100)))) ... for i in range(L)] for j in range(D)] >>> assert len(matrix) == D and len(matrix[0]) == L >>> # Retrieve the first column of the matrix >>> for column in matrix: ... for media in column: ... if media.sequence != 0: ... receiver.put_media(media, True) >>> fec = FecPacket.compute(1, FecPacket.XOR, FecPacket.COL, L, D, [p[0] for p in matrix[0:]]) >>> print('dir={0} snbase={1} offset={2} na={3}'.format( ... fec.direction, fec.snbase, fec.offset, fec.na)) dir=0 snbase=0 offset=4 na=5 >>> receiver.put_fec(fec) >>> print(receiver) Name Received Buffered Maximum Dropped Media 19 20 20 Col 1 0 1 0 Row 0 0 0 0 Cross 0 1 FEC statistics, media packets : Recovered Aborted Overwritten Missing 1 0 0 0 Current position (media sequence) : 0 Current delay (can be set) : 20 packets FEC matrix size (LxD) : 4x5 = 20 packets >>> receiver.flush()
The output does begin by recovered first packet of the matrix:
>>> print(matrix[0][1].payload == output.getvalue()[:len(matrix[0][1].payload)]) False >>> print(matrix[0][0].payload == output.getvalue()[:len(matrix[0][0].payload)]) True
- ER_DELAY_UNITS = "Unknown delay units '{0}'"¶
- ER_DIRECTION = 'FEC packet direction is neither COL nor ROW : {0}'¶
- ER_FLUSHING = 'Currently flushing buffers'¶
- ER_MISSING_COUNT = 'They are {0} missing media packet, expected one (1)'¶
- ER_FEC_DIRECTION = "Invalid FEC direction '{0}'"¶
- ER_COL_MISMATCH = 'Column FEC packet n°{0}, expected n°{1}'¶
- ER_COL_OVERWRITE = 'Another column FEC packet is already registered to protect media packet n°{0}'¶
- ER_ROW_MISMATCH = 'Row FEC packet n°{0}, expected n°{1}'¶
- ER_ROW_OVERWRITE = 'Another row FEC packet is already registered to protect media packet n°{0}'¶
- ER_GET_COL_CASCADE = 'Column FEC cascade : Unable to compute sequence # of the media packet to recover{0}{1}{0}'¶
- ER_GET_ROW_CASCADE = 'Row FEC cascade : Unable to compute sequence # of the media packet to recover{0}{1}{0}'¶
- ER_NULL_COL_CASCADE = 'Column FEC cascade : Unable to find linked entry in crosses buffer'¶
- ER_NULL_ROW_CASCADE = 'Row FEC cascade : Unable to find linked entry in crosses buffer'¶
- ER_STARTUP = 'Current position still not initialized (startup state)'¶
- ER_VALID_RTP_MP2TS = 'packet is not valid (expected RTP packet + MPEG2-TS payload)'¶
- ER_VALID_RTP = 'packet is not valid (expected RTP packet)'¶
- DELAY_NAMES = ['packets', 'seconds']¶
- DELAY_RANGE = range(0, 2)¶
- PACKETS = 0¶
- SECONDS = 1¶
- __init__(output: StringIO)[source]¶
Construct a new FecReceiver and register output.
- Parameters:
output – Where to output payload of the recovered stream.
Example usage
Not yet an output:
>>> FecReceiver(None) Traceback (most recent call last): ... ValueError: output is None >>> >>> import io >>> output = io.StringIO() >>> receiver = FecReceiver(output)
- put_fec(fec: FecPacket) None [source]¶
Put an incoming FEC packet, the algorithm will do the following according to these scenarios:
The fec packet is useless if none of the protected media packets is missing
Only on media packet missing, fec packet is able to recover it now !
More than one media packet is missing, fec packet stored for future recovery
- flush() None [source]¶
Flush all buffers and output media packets to registered output (
self.output
).
- recover_media_packet(media_sequence, cross: dict, fec: FecPacket | None) None [source]¶
Recover a missing media packet helped by a FEC packet, this method is also called to register an incoming media packet if it is registered as missing.
- out() None [source]¶
Extract packets to output in order to keep a ‘certain’ amount of them in the buffer.
- static compute_col_address(media_socket)[source]¶
Compute column FEC socket based on media stream socket (port +2).
Example usage
>>> from pytoolbox.unittest import asserts >>> asserts.dict_equal(FecReceiver.compute_col_address('192.168.50.100:8000'), { ... 'ip': '192.168.50.100', 'port': 8002 ... }) >>> asserts.dict_equal(FecReceiver.compute_col_address(IPSocket('50.0.0.7:4000')), { ... 'ip': '50.0.0.7', 'port': 4002 ... }) >>> print(FecReceiver.compute_col_address('salut')) Traceback (most recent call last): .... pytoolbox.exceptions.InvalidIPSocketError: salut is not a valid IP socket.
- static compute_row_address(media_socket)[source]¶
Compute column FEC socket based on media stream socket (port +4).
Example usage
>>> from pytoolbox.unittest import asserts >>> asserts.dict_equal(FecReceiver.compute_row_address('192.168.50.100:8000'), { ... 'ip': '192.168.50.100', 'port': 8004 ... }) >>> asserts.dict_equal(FecReceiver.compute_row_address(IPSocket('50.0.0.7:4000')), { ... 'ip': '50.0.0.7', 'port': 4004 ... }) >>> print(FecReceiver.compute_row_address('salut')) Traceback (most recent call last): .... pytoolbox.exceptions.InvalidIPSocketError: salut is not a valid IP socket.
- static validity_window(current, start, end)[source]¶
Returns True if current is in the validity window bounded by start and end.
This method is circular-buffer aware and they are 2 cases (validity window
[====]
):1) start= 6 end=9 : 0 1 2 3 4 5 [=======] 10 ... 65'533 65'534 65'535 2) start=65'534 end=1 : ======] 2 3 4 5 6 7 8 9 10 ... 65'533 [=============
Example usage
Testing validity window condition:
>>> FecReceiver.validity_window(0, 5, 10) False >>> FecReceiver.validity_window(5, 5, 10) True >>> FecReceiver.validity_window(8, 5, 10) True >>> FecReceiver.validity_window(10, 5, 10) True >>> FecReceiver.validity_window(15, 5, 10) False >>> FecReceiver.validity_window(0, 65534, 2) True >>> FecReceiver.validity_window(2, 65534, 2) True >>> FecReceiver.validity_window(5, 65534, 2) False >>> FecReceiver.validity_window(65534, 65534, 2) True >>> FecReceiver.validity_window(65535, 65534, 2) True