pytoolbox.network.smpte2022.base module

class pytoolbox.network.smpte2022.base.FecPacket(data=None, length=0)[source]

Bases: object

This represent a real-time transport protocol (RTP) packet.

Packet header

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|       SNBase low bits         |        Length recovery        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|E| PT recovery |                    Mask                       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                          TS recovery                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|N|D|type |index|    Offset     |      NA       |SNBase ext bits|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

The constructor will parse input bytes array to fill packet’s fields. In case of error (e.g. bad version number) the constructor will abort filling fields and un-updated fields are set to their corresponding default value.

Parameters:
  • bytes (bytearray) – Input array of bytes to parse as a RTP packet with FEC payload

  • length (int) – Amount of bytes to read from the array of bytes

Returns:

Generated RTP packet with SMPTE 2022-1 FEC payload (aka FEC packet)

Example usage

Testing header fields value (based on packet 3 of capture DCM_FEC_2D_6_10.pcap):

  • 1st row: RTP header, sequence = 37 798

  • 2nd row: FEC header, SN = 50 288, PT recovery = 0, TS recovery = 7850

>>> header = bytearray.fromhex('806093a6 00000000 00000000 c4700000 80000000 00001eaa 00060a00')
>>> length = 1344 - len(header)
>>> print(length)
1316
>>> bytes = header + bytearray(length)
>>> print(len(bytes))
1344
>>> fec = FecPacket(bytes, len(bytes))
>>> assert fec.valid
>>> print(fec)
errors                = []
sequence              = 37798
algorithm             = XOR
direction             = COL
snbase                = 50288
offset                = 6
na                    = 10
L x D                 = 6 x 10
payload type recovery = 0
timestamp recovery    = 7850
length recovery       = 0
payload recovery size = 1316
missing               = []

Testing header fields value (based on packet 5 of capture DCM_FEC_2D_6_10.pcap):

  • 1st row: RTP header, sequence = 63 004

  • 2nd row: FEC header, SN = 50 344, PT recovery = 0, TS recovery = 878

>>> header = bytearray.fromhex('8060f61c 00000000 00000000 c4a80000 80000000 0000036e 40010600')
>>> length = 1344 - len(header)
>>> print(length)
1316
>>> bytes = header + bytearray(length)
>>> print(len(bytes))
1344
>>> fec = FecPacket(bytes, len(bytes))
>>> assert fec.valid
>>> print(fec)
errors                = []
sequence              = 63004
algorithm             = XOR
direction             = ROW
snbase                = 50344
offset                = 1
na                    = 6
L x D                 = 6 x None
payload type recovery = 0
timestamp recovery    = 878
length recovery       = 0
payload recovery size = 1316
missing               = []
ER_PAYLOAD_TYPE = 'RTP Header : Payload type must be set to 96'
ER_EXTENDED = 'SMPTE 2022-1 Header : Extended must be set to one'
ER_MASK = 'SMPTE 2022-1 Header : Mask must be set to zero'
ER_N = 'SMPTE 2022-1 Header : N must be set to zero'
ER_DIRECTION = 'SMPTE 2022-1 Header : Direction must be COL or ROW'
ER_LD = 'SMPTE 2022-1 Header : The following limitation failed : L*D <= 256'
ER_L = 'SMPTE 2022-1 Header : The following limitation failed : 1 <= L <= 50'
ER_D = 'SMPTE 2022-1 Header : The following limitation failed : 4 <= D <= 50'
ER_PAYLOAD = 'FEC packet must have a payload'
ER_ALGORITHM = 'SMPTE 2022-1 Header : Only XOR FEC algorithm is handled'
ER_VALID_MP2T = 'One of the packets is an invalid RTP packet (+expected MPEG2-TS payload)'
ER_OFFSET = '(packets) Computed offset is out of range [1..255]'
ER_SEQUENCE = "One of the packets doesn't verify : sequence = snbase + i * offset, 0<i<na"
ER_INDEX = 'Unable to get missing media packet index'
ER_J = 'Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * offset'
HEADER_LENGTH = 16
E_MASK = 128
PT_MASK = 127
N_MASK = 128
D_MASK = 64
T_MASK = 56
T_SHIFT = 3
I_MASK = 7
SNBL_MASK = 65535
SNBE_SHIFT = 16
DIRECTION_NAMES = ('COL', 'ROW')
DIRECTION_RANGE = range(0, 2)
COL = 0
ROW = 1
ALGORITHM_NAMES = ('XOR', 'Hamming', 'ReedSolomon')
ALGORITHM_RANGE = range(0, 3)
XOR = 0
Hamming = 1
ReedSolomon = 2
property valid

Returns True if this packet is a valid RTP packet (with FEC payload).

property errors

Returns an array containing any errors.

Returns:

array of error message(s).

Example usage

Testing invalid header:

#TODO >>> fec = FecPacket(bytearray(FecPacket.HEADER_LENGTH-1), FecPacket.HEADER_LENGTH-1) #TODO >>> print(fec.errors) #TODO [‘RTP Header : Version must be set to 2’, ‘RTP packet must have a payload’]

property D

Returns the vertical size of the FEC matrix (rows).

Example usage

>>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)),
...            RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))]
>>> fec = FecPacket.compute(10, FecPacket.XOR, FecPacket.COL, 1, 2, packets)
>>> print(fec.D)
2
property L

Returns the horizontal size of the FEC matrix (columns).

Example usage

>>> from pytoolbox.network.rtp import RtpPacket
>>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)),
...            RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))]
>>> fec = FecPacket.compute(6, FecPacket.XOR, FecPacket.ROW, 2, 1, packets)
>>> print(fec.L)
2
property header_size

Returns the length (aka size) of the header.

Example usage

>>> from pytoolbox.network.rtp import RtpPacket
>>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)),
...            RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))]
>>> fec = FecPacket.compute(1985, FecPacket.XOR, FecPacket.ROW, 2, 1, packets)
>>> print(fec.header_size)
16
property payload_size

Returns the length (aka size) of the payload.

Example usage

>>> from pytoolbox.network.rtp import RtpPacket
>>> packets = [RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)),
...            RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))]
>>> fec = FecPacket.compute(27, FecPacket.XOR, FecPacket.ROW, 2, 1, packets)
>>> print(fec.payload_size)
1234
property header_bytes

Returns SMPTE 2022-1 FEC header bytes.

Example usage

>>> from pytoolbox.network.rtp import RtpPacket
>>> packets = [
...     RtpPacket.create(10, 100, RtpPacket.MP2T_PT, bytearray(123)),
...     RtpPacket.create(11, 200, RtpPacket.MP2T_PT, bytearray(1234))
... ]
>>> fec = FecPacket.compute(26, FecPacket.XOR, FecPacket.ROW, 2, 1, packets)
>>> print(fec)
errors                = []
sequence              = 26
algorithm             = XOR
direction             = ROW
snbase                = 10
offset                = 1
na                    = 2
L x D                 = 2 x None
payload type recovery = 0
timestamp recovery    = 172
length recovery       = 1193
payload recovery size = 1234
missing               = []
>>> fec_header = fec.header_bytes
>>> len(fec_header)  # FecPacket.HEADER_LENGTH
16
>>> ''.join(' %02x' % b for b in fec_header)
' 00 0a 04 a9 80 00 00 00 00 00 00 ac 40 01 02 00'
>>> fec_header += fec.payload_recovery
>>> rtp = RtpPacket.create(26, 100, RtpPacket.DYNAMIC_PT, fec_header)
>>> header = rtp.header_bytes + fec_header
>>> fec == FecPacket(header, len(header))
True
property bytes
__init__(data=None, length=0)[source]
classmethod compute(sequence, algorithm, direction, L, D, packets)[source]

This method will generate FEC packet’s field by applying FEC algorithm to input packets. In case of error (e.g. bad version number) the method will abort filling fields and un-updated fields are set to their corresponding default value.

Parameters:
  • sequence (int) – Sequence number of computed FEC packet

  • algorithm (str) – Name of algorithm used to compute payload recovery from packets payload

  • direction (str) – Direction (column or row) of computed FEC packet (see RFC to understand)

  • L (int) – Horizontal size of the FEC matrix (columns)

  • D (int) – Vertical size of the FEC matrix (rows)

  • packets (array(RtPacket)) – Array containing RTP packets to protect

Example usage

Testing invalid input collection of packets:

>>> from pytoolbox.network.rtp import RtpPacket
>>> packets = [
...     RtpPacket.create(10, 10, RtpPacket.MP2T_PT, 'a'),
...     RtpPacket.create(22, 22, RtpPacket.MP2T_PT, 'b')
... ]
>>> fec = FecPacket.compute(1, FecPacket.XOR, FecPacket.COL, 2, 2, packets)
Traceback (most recent call last):
    ...
ValueError: One of the packets doesn't verify : sequence = snbase + i * offset, 0<i<na

Testing valid input collection of packets:

>>> packets = [
...     RtpPacket.create(10, 10, RtpPacket.MP2T_PT, bytearray('gaga', 'utf-8')),
...     RtpPacket.create(14, 14, RtpPacket.MP2T_PT, bytearray('salut', 'utf-8')),
...     RtpPacket.create(18, 18, RtpPacket.MP2T_PT, bytearray('12345', 'utf-8')),
...     RtpPacket.create(22, 22, RtpPacket.MP2T_PT, bytearray('robot', 'utf-8'))
... ]
>>> fec = FecPacket.compute(2, FecPacket.XOR, FecPacket.COL, 4, 4, packets)
>>> print(fec)
errors                = []
sequence              = 2
algorithm             = XOR
direction             = COL
snbase                = 10
offset                = 4
na                    = 4
L x D                 = 4 x 4
payload type recovery = 0
timestamp recovery    = 0
length recovery       = 1
payload recovery size = 5
missing               = []
>>> print(''.join('%02x:' % x for x in fec.payload_recovery))
57:5d:5a:4f:35:

Testing fec packet generation (based on source RTP packets):

>>> from os import urandom
>>> from random import randint
>>> from pytoolbox.network.rtp import RtpPacket
>>> L = 4
>>> D = 5
>>> OFF = 2
>>> # Generate a [D][L] matrix of randomly generated RTP packets
>>> matrix = [
...     [
...         RtpPacket.create(L * j + i, (L * j + i) * 100 + randint(0, 50),
...         RtpPacket.MP2T_PT, bytearray(urandom(randint(50, 100))))
...         for i in range(L)
...     ]
...     for j in range(D)
... ]
>>> len(matrix)  # D
5
>>> len(matrix[0])  # L
4

Retrieve the OFF’th column of the matrix:

>>> expected_payload_type_recovery = 0
>>> expected_timestamp_recovery = 0
>>> expected_lenght_recovery = 0
>>> expected_payload_recovery = bytearray(100)
>>> packets = []
>>> for i in range(D):
...     packet = matrix[i][OFF]
...     packets.append(packet)
...     # Compute expected recovery fields values
...     expected_payload_type_recovery ^= packet.payload_type
...     expected_timestamp_recovery ^= packet.timestamp
...     expected_lenght_recovery ^= packet.payload_size
...     for j in range(packet.payload_size):
...         expected_payload_recovery[j] ^= packet.payload[j]
>>> fec = FecPacket.compute(15, FecPacket.XOR, FecPacket.COL, L, D, packets)
>>> fec.valid
True
>>> fec.snbase
2
>>> matrix[0][OFF].sequence
2
>>> fec.na  # D
5
>>> fec.offset  # L
4
>>> assert fec.payload_type_recovery == expected_payload_type_recovery
>>> assert fec.timestamp_recovery == expected_timestamp_recovery
>>> assert fec.length_recovery == expected_lenght_recovery
>>> for i in range(fec.payload_size):
...     if fec.payload_recovery[i] != expected_payload_recovery[i]:
...         print('Payload recovery test failed with i = ' + i)
compute_j(media_sequence)[source]

TODO

set_missing(media_sequence)[source]

Register a protected media packet as missing.

Example usage

>>> packets = [
...     RtpPacket.create(65530, 65530, RtpPacket.MP2T_PT, bytearray('gaga', 'utf-8')),
...     RtpPacket.create(65533, 65533, RtpPacket.MP2T_PT, bytearray('salut', 'utf-8')),
...     RtpPacket.create(    0,     0, RtpPacket.MP2T_PT, bytearray('12345', 'utf-8')),
...     RtpPacket.create(    3,     3, RtpPacket.MP2T_PT, bytearray('robot', 'utf-8'))
... ]
>>> fec = FecPacket.compute(4, FecPacket.XOR, FecPacket.COL, 3, 4, packets)
>>> print(fec)
errors                = []
sequence              = 4
algorithm             = XOR
direction             = COL
snbase                = 65530
offset                = 3
na                    = 4
L x D                 = 3 x 4
payload type recovery = 0
timestamp recovery    = 4
length recovery       = 1
payload recovery size = 5
missing               = []

Testing that bad input values effectively throws an exception:

>>> fec.set_missing(fec.snbase + fec.offset + 1)
Traceback (most recent call last):
    ...
ValueError: Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * ...
>>> fec.set_recovered(-1)
Traceback (most recent call last):
    ...
ValueError: Unable to find a suitable j e N that satisfy : media_sequence = snbase + j * ...

Testing set / get of a unique missing value:

>>> fec.set_missing(0)
2
>>> fec.missing[0]
0
>>> len(fec.missing)
1

Testing simple recovery of a unique value:

>>> fec.set_recovered(0)
2
>>> len(fec.missing)
0

Testing set / get of multiple missing values (including re-setting of a value):

>>> fec.set_missing(3)
3
>>> fec.set_missing(3)
3
>>> len(fec.missing)
1
>>> fec.set_missing(fec.snbase + fec.offset)
1
>>> fec.set_missing(fec.snbase)
0
>>> len(fec.missing)
3
>>> fec.missing
[3, 65533, 65530]

Testing re-recovery of a value:

>>> fec.set_recovered(3)
3
>>> fec.set_recovered(3)
Traceback (most recent call last):
    ...
ValueError: list.remove(x): x not in list
>>> fec.missing
[65533, 65530]
set_recovered(media_sequence)[source]

TODO