pytoolbox.network.smpte2022.generator module¶
- class pytoolbox.network.smpte2022.generator.FecGenerator(L: int, D: int)[source]¶
Bases:
object
A SMPTE 2022-1 FEC streams generator. This generator accept incoming RTP media packets and compute corresponding FEC packets.
- property L¶
Returns the Horizontal size of the FEC matrix (columns).
Example usage
>>> print(FecGenerator(4, 5).L) 4
- property D¶
Returns the vertical size of the FEC matrix (rows).
Example usage
>>> print(FecGenerator(4, 5).D) 5
- __init__(L: int, D: int)[source]¶
Construct a FecGenerator.
- Parameters:
L – Horizontal size of the FEC matrix (columns)
D – Vertical size of the FEC matrix (rows)
- static on_new_col(col: FecPacket)[source]¶
Called by FecGenerator when a new column FEC packet is generated and available for output.
By default this method only print a message to stdout.
See also
You can monkey patch it.
- Parameters:
col – Generated column FEC packet
- static on_new_row(row: FecPacket)[source]¶
Called by FecGenerator when a new row FEC packet is generated and available for output.
By default this method only print a message to stdout.
See also
You can monkey patch it.
- Parameters:
row – Generated row FEC packet
- on_reset(media: RtpPacket)[source]¶
Called by FecGenerator when the algorithm is reseted (an incoming media is out of sequence).
By default this method only print a message to stdout.
See also
You can monkey patch it.
- Parameters:
media – Out of sequence media packet
- put_media(media: RtpPacket)[source]¶
Put an incoming media packet.
- Parameters:
media – Incoming media packet
Example usage
Testing input of out of sequence medias:
>>> g = FecGenerator(4, 5) >>> R = RtpPacket >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('Tabby', 'utf-8'))) Media seq=1 is out of sequence (expected None) : FEC algorithm reseted ! >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('1234', 'utf-8'))) Media seq=1 is out of sequence (expected 2) : FEC algorithm reseted ! >>> g.put_media(R.create(4, 400, R.MP2T_PT, bytearray('abcd', 'utf-8'))) Media seq=4 is out of sequence (expected 2) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('python', 'utf-8'))) Media seq=2 is out of sequence (expected 5) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('Kuota Kharma Evo', 'utf-8'))) Media seq=2 is out of sequence (expected 3) : FEC algorithm reseted ! >>> print(g) Matrix size L x D = 4 x 5 Total invalid media packets = 0 Total media packets received = 5 Column sequence number = 1 Row sequence number = 1 Media sequence number = 3 Medias buffer (seq. numbers) = [2] >>> if isinstance(g._medias[0].payload, bytearray): ... assert g._medias[0].payload == bytearray('Kuota Kharma Evo', 'utf-8') ... else: ... assert g._medias[0].payload == 'Kuota Kharma Evo'
Testing a complete 3x4 matrix:
>>> g = FecGenerator(3, 4) >>> R = RtpPacket >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('Tabby', 'utf-8'))) Media seq=1 is out of sequence (expected None) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('1234', 'utf-8'))) >>> g.put_media(R.create(3, 300, R.MP2T_PT, bytearray('abcd', 'utf-8'))) New ROW FEC packet seq=1 snbase=1 LxD=3xNone trec=384 >>> g.put_media(R.create(4, 400, R.MP2T_PT, bytearray('python', 'utf-8'))) >>> g.put_media(R.create(5, 500, R.MP2T_PT, bytearray('Kuota harma Evo', 'utf-8'))) >>> g.put_media(R.create(6, 600, R.MP2T_PT, bytearray('h0ffman', 'utf-8'))) New ROW FEC packet seq=2 snbase=4 LxD=3xNone trec=572 >>> g.put_media(R.create(7, 700, R.MP2T_PT, bytearray('mutable', 'utf-8'))) >>> g.put_media(R.create(8, 800, R.MP2T_PT, bytearray('10061987', 'utf-8'))) >>> g.put_media(R.create(9, 900, R.MP2T_PT, bytearray('OSCIED', 'utf-8'))) New ROW FEC packet seq=3 snbase=7 LxD=3xNone trec=536 >>> g.put_media(R.create(10, 1000, R.MP2T_PT, bytearray('5ème élément', 'utf-8'))) New COL FEC packet seq=1 snbase=1 LxD=3x4 trec=160 >>> print(g) Matrix size L x D = 3 x 4 Total invalid media packets = 0 Total media packets received = 10 Column sequence number = 2 Row sequence number = 4 Media sequence number = 11 Medias buffer (seq. numbers) = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> g.put_media(R.create(11, 1100, R.MP2T_PT, bytearray('Chaos Theory', 'utf-8'))) New COL FEC packet seq=2 snbase=2 LxD=3x4 trec=1616 >>> g.put_media(R.create(12, 1200, R.MP2T_PT, bytearray('Yes, it WORKS !', 'utf-8'))) New ROW FEC packet seq=4 snbase=10 LxD=3xNone trec=788 New COL FEC packet seq=3 snbase=3 LxD=3x4 trec=1088 >>> print(g) Matrix size L x D = 3 x 4 Total invalid media packets = 0 Total media packets received = 12 Column sequence number = 4 Row sequence number = 5 Media sequence number = 13 Medias buffer (seq. numbers) = []