pytoolbox.network.smpte2022.base module¶
- class pytoolbox.network.smpte2022.base.FecPacket(data=None, length=0)[source]¶
Bases:
object
This represent a real-time transport protocol (RTP) packet.
Packet header
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | SNBase low bits | Length recovery | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |E| PT recovery | Mask | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TS recovery | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |N|D|type |index| Offset | NA |SNBase ext bits| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
The constructor will parse input bytes array to fill packet’s fields. In case of error (e.g. bad version number) the constructor will abort filling fields and un-updated fields are set to their corresponding default value.
- Parameters:
- Returns:
Generated RTP packet with SMPTE 2022-1 FEC payload (aka FEC packet)
Example usage
Testing header fields value (based on packet 3 of capture DCM_FEC_2D_6_10.pcap):
1st row: RTP header, sequence = 37 798
2nd row: FEC header, SN = 50 288, PT recovery = 0, TS recovery = 7850
>>> header = bytearray.fromhex('806093a6 00000000 00000000 c4700000 80000000 00001eaa 00060a00') >>> length = 1344 - len(header) >>> print(length) 1316 >>> bytes = header + bytearray(length) >>> print(len(bytes)) 1344 >>> fec = FecPacket(bytes, len(bytes)) >>> assert fec.valid >>> print(fec) errors = [] sequence = 37798 algorithm = XOR direction = COL snbase = 50288 offset = 6 na = 10 L x D = 6 x 10 payload type recovery = 0 timestamp recovery = 7850 length recovery = 0 payload recovery size = 1316 missing = []
Testing header fields value (based on packet 5 of capture DCM_FEC_2D_6_10.pcap):
1st row: RTP header, sequence = 63 004
2nd row: FEC header, SN = 50 344, PT recovery = 0, TS recovery = 878
>>> header = bytearray.fromhex('8060f61c 00000000 00000000 c4a80000 80000000 0000036e 40010600') >>> length = 1344 - len(header) >>> print(length) 1316 >>> bytes = header + bytearray(length) >>> print(len(bytes)) 1344 >>> fec = FecPacket(bytes, len(bytes)) >>> assert fec.valid >>> print(fec) errors = [] sequence = 63004 algorithm = XOR direction = ROW snbase = 50344 offset = 1 na = 6 L x D = 6 x None payload type recovery = 0 timestamp recovery = 878 length recovery = 0 payload recovery size = 1316 missing = []
- ER_PAYLOAD_TYPE = 'RTP Header : Payload type must be set to 96'¶
- ER_EXTENDED = 'SMPTE 2022-1 Header : Extended must be set to one'¶
- ER_MASK = 'SMPTE 2022-1 Header : Mask must be set to zero'¶
- ER_N = 'SMPTE 2022-1 Header : N must be set to zero'¶
- ER_DIRECTION = 'SMPTE 2022-1 Header : Direction must be COL or ROW'¶
- ER_LD = 'SMPTE 2022-1 Header : The following limitation failed : L*D <= 256'¶
- ER_L = 'SMPTE 2022-1 Header : The following limitation failed : 1 <= L <= 50'¶
- ER_D = 'SMPTE 2022-1 Header : The following limitation failed : 4 <= D <= 50'¶
- ER_PAYLOAD = 'FEC packet must have a payload'¶
- ER_ALGORITHM = 'SMPTE 2022-1 Header : Only XOR FEC algorithm is handled'¶
- ER_VALID_MP2T = 'One of the packets is an invalid RTP packet (+expected MPEG2-TS payload)'¶
- ER_OFFSET = '(packets) Computed offset is out of range [1..255]'¶
- ER_SEQUENCE = "One of the packets doesn't verify : sequence = snbase + i * offset, 0<i<na"¶
- ER_INDEX = 'Unable to get missing media packet index'¶
- ER_J = 'Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * offset'¶
- HEADER_LENGTH = 16¶
- E_MASK = 128¶
- PT_MASK = 127¶
- N_MASK = 128¶
- D_MASK = 64¶
- T_MASK = 56¶
- T_SHIFT = 3¶
- I_MASK = 7¶
- SNBL_MASK = 65535¶
- SNBE_SHIFT = 16¶
- DIRECTION_NAMES = ('COL', 'ROW')¶
- DIRECTION_RANGE = range(0, 2)¶
- COL = 0¶
- ROW = 1¶
- ALGORITHM_NAMES = ('XOR', 'Hamming', 'ReedSolomon')¶
- ALGORITHM_RANGE = range(0, 3)¶
- XOR = 0¶
- Hamming = 1¶
- ReedSolomon = 2¶
- property valid¶
Returns True if this packet is a valid RTP packet (with FEC payload).
- property errors¶
Returns an array containing any errors.
- Returns:
array of error message(s).
Example usage
Testing invalid header:
#TODO >>> fec = FecPacket(bytearray(FecPacket.HEADER_LENGTH-1), FecPacket.HEADER_LENGTH-1) #TODO >>> print(fec.errors) #TODO [‘RTP Header : Version must be set to 2’, ‘RTP packet must have a payload’]
- property D¶
Returns the vertical size of the FEC matrix (rows).
Example usage
>>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)), ... RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))] >>> fec = FecPacket.compute(10, FecPacket.XOR, FecPacket.COL, 1, 2, packets) >>> print(fec.D) 2
- property L¶
Returns the horizontal size of the FEC matrix (columns).
Example usage
>>> from pytoolbox.network.rtp import RtpPacket >>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)), ... RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))] >>> fec = FecPacket.compute(6, FecPacket.XOR, FecPacket.ROW, 2, 1, packets) >>> print(fec.L) 2
- property header_size¶
Returns the length (aka size) of the header.
Example usage
>>> from pytoolbox.network.rtp import RtpPacket >>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)), ... RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))] >>> fec = FecPacket.compute(1985, FecPacket.XOR, FecPacket.ROW, 2, 1, packets) >>> print(fec.header_size) 16
- property payload_size¶
Returns the length (aka size) of the payload.
Example usage
>>> from pytoolbox.network.rtp import RtpPacket >>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)), ... RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))] >>> fec = FecPacket.compute(27, FecPacket.XOR, FecPacket.ROW, 2, 1, packets) >>> print(fec.payload_size) 1234
- property header_bytes¶
Returns SMPTE 2022-1 FEC header bytes.
Example usage
>>> from pytoolbox.network.rtp import RtpPacket >>> packets = [ ... RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)), ... RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234)) ... ] >>> fec = FecPacket.compute(26, FecPacket.XOR, FecPacket.ROW, 2, 1, packets) >>> print(fec) errors = [] sequence = 26 algorithm = XOR direction = ROW snbase = 10 offset = 1 na = 2 L x D = 2 x None payload type recovery = 0 timestamp recovery = 172 length recovery = 1193 payload recovery size = 1234 missing = [] >>> fec_header = fec.header_bytes >>> len(fec_header) # FecPacket.HEADER_LENGTH 16 >>> ''.join(' %02x' % b for b in fec_header) ' 00 0a 04 a9 80 00 00 00 00 00 00 ac 40 01 02 00' >>> fec_header += fec.payload_recovery >>> rtp = RtpPacket.create(26, 100, RtpPacket.DYNAMIC_PT, fec_header) >>> header = rtp.header_bytes + fec_header >>> fec == FecPacket(header, len(header)) True
- property bytes¶
- classmethod compute(sequence, algorithm, direction, L, D, packets)[source]¶
This method will generate FEC packet’s field by applying FEC algorithm to input packets. In case of error (e.g. bad version number) the method will abort filling fields and un-updated fields are set to their corresponding default value.
- Parameters:
sequence (int) – Sequence number of computed FEC packet
algorithm (str) – Name of algorithm used to compute payload recovery from packets payload
direction (str) – Direction (column or row) of computed FEC packet (see RFC to understand)
L (int) – Horizontal size of the FEC matrix (columns)
D (int) – Vertical size of the FEC matrix (rows)
packets (array(RtPacket)) – Array containing RTP packets to protect
Example usage
Testing invalid input collection of packets:
>>> from pytoolbox.network.rtp import RtpPacket >>> packets = [ ... RtpPacket.create(10, 10, RtpPacket.MP2T_PT, 'a'), ... RtpPacket.create(22, 22, RtpPacket.MP2T_PT, 'b') ... ] >>> fec = FecPacket.compute(1, FecPacket.XOR, FecPacket.COL, 2, 2, packets) Traceback (most recent call last): ... ValueError: One of the packets doesn't verify : sequence = snbase + i * offset, 0<i<na
Testing valid input collection of packets:
>>> packets = [ ... RtpPacket.create(10, 10, RtpPacket.MP2T_PT, bytearray('gaga', 'utf-8')), ... RtpPacket.create(14, 14, RtpPacket.MP2T_PT, bytearray('salut', 'utf-8')), ... RtpPacket.create(18, 18, RtpPacket.MP2T_PT, bytearray('12345', 'utf-8')), ... RtpPacket.create(22, 22, RtpPacket.MP2T_PT, bytearray('robot', 'utf-8')) ... ] >>> fec = FecPacket.compute(2, FecPacket.XOR, FecPacket.COL, 4, 4, packets) >>> print(fec) errors = [] sequence = 2 algorithm = XOR direction = COL snbase = 10 offset = 4 na = 4 L x D = 4 x 4 payload type recovery = 0 timestamp recovery = 0 length recovery = 1 payload recovery size = 5 missing = [] >>> print(''.join('%02x:' % x for x in fec.payload_recovery)) 57:5d:5a:4f:35:
Testing fec packet generation (based on source RTP packets):
>>> from os import urandom >>> from random import randint >>> from pytoolbox.network.rtp import RtpPacket >>> L = 4 >>> D = 5 >>> OFF = 2 >>> # Generate a [D][L] matrix of randomly generated RTP packets >>> matrix = [ ... [ ... RtpPacket.create(L * j + i, (L * j + i) * 100 + randint(0, 50), ... RtpPacket.MP2T_PT, bytearray(urandom(randint(50, 100)))) ... for i in range(L) ... ] ... for j in range(D) ... ] >>> len(matrix) # D 5 >>> len(matrix[0]) # L 4
Retrieve the OFF’th column of the matrix:
>>> expected_payload_type_recovery = 0 >>> expected_timestamp_recovery = 0 >>> expected_lenght_recovery = 0 >>> expected_payload_recovery = bytearray(100) >>> packets = [] >>> for i in range(D): ... packet = matrix[i][OFF] ... packets.append(packet) ... # Compute expected recovery fields values ... expected_payload_type_recovery ^= packet.payload_type ... expected_timestamp_recovery ^= packet.timestamp ... expected_lenght_recovery ^= packet.payload_size ... for j in range(packet.payload_size): ... expected_payload_recovery[j] ^= packet.payload[j] >>> fec = FecPacket.compute(15, FecPacket.XOR, FecPacket.COL, L, D, packets) >>> fec.valid True >>> fec.snbase 2 >>> matrix[0][OFF].sequence 2 >>> fec.na # D 5 >>> fec.offset # L 4 >>> assert fec.payload_type_recovery == expected_payload_type_recovery >>> assert fec.timestamp_recovery == expected_timestamp_recovery >>> assert fec.length_recovery == expected_lenght_recovery >>> for i in range(fec.payload_size): ... if fec.payload_recovery[i] != expected_payload_recovery[i]: ... print('Payload recovery test failed with i = ' + i)
- set_missing(media_sequence)[source]¶
Register a protected media packet as missing.
Example usage
>>> packets = [ ... RtpPacket.create(65530, 65530, RtpPacket.MP2T_PT, bytearray('gaga', 'utf-8')), ... RtpPacket.create(65533, 65533, RtpPacket.MP2T_PT, bytearray('salut', 'utf-8')), ... RtpPacket.create( 0, 0, RtpPacket.MP2T_PT, bytearray('12345', 'utf-8')), ... RtpPacket.create( 3, 3, RtpPacket.MP2T_PT, bytearray('robot', 'utf-8')) ... ] >>> fec = FecPacket.compute(4, FecPacket.XOR, FecPacket.COL, 3, 4, packets) >>> print(fec) errors = [] sequence = 4 algorithm = XOR direction = COL snbase = 65530 offset = 3 na = 4 L x D = 3 x 4 payload type recovery = 0 timestamp recovery = 4 length recovery = 1 payload recovery size = 5 missing = []
Testing that bad input values effectively throws an exception:
>>> fec.set_missing(fec.snbase + fec.offset + 1) Traceback (most recent call last): ... ValueError: Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * ... >>> fec.set_recovered(-1) Traceback (most recent call last): ... ValueError: Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * ...
Testing set / get of a unique missing value:
>>> fec.set_missing(0) 2 >>> fec.missing[0] 0 >>> len(fec.missing) 1
Testing simple recovery of a unique value:
>>> fec.set_recovered(0) 2 >>> len(fec.missing) 0
Testing set / get of multiple missing values (including re-setting of a value):
>>> fec.set_missing(3) 3 >>> fec.set_missing(3) 3 >>> len(fec.missing) 1 >>> fec.set_missing(fec.snbase + fec.offset) 1 >>> fec.set_missing(fec.snbase) 0 >>> len(fec.missing) 3 >>> fec.missing [3, 65533, 65530]
Testing re-recovery of a value:
>>> fec.set_recovered(3) 3 >>> fec.set_recovered(3) Traceback (most recent call last): ... ValueError: list.remove(x): x not in list >>> fec.missing [65533, 65530]