Source code for pytoolbox.network.smpte2022.generator

from pytoolbox.network.rtp import RtpPacket
from .base import FecPacket

__all__ = ['FecGenerator']


[docs]class FecGenerator(object): # pylint:disable=too-many-instance-attributes """ A SMPTE 2022-1 FEC streams generator. This generator accept incoming RTP media packets and compute corresponding FEC packets. """ # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Properties >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @property def L(self): # pylint:disable=invalid-name """ Returns the Horizontal size of the FEC matrix (columns). **Example usage** >>> print(FecGenerator(4, 5).L) 4 """ return self._L @property def D(self): # pylint:disable=invalid-name """ Returns the vertical size of the FEC matrix (rows). **Example usage** >>> print(FecGenerator(4, 5).D) 5 """ return self._D # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Constructor >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[docs] def __init__(self, L: int, D: int): # pylint:disable=invalid-name,too-many-instance-attributes """ Construct a FecGenerator. :param L: Horizontal size of the FEC matrix (columns) :param D: Vertical size of the FEC matrix (rows) """ self._L, self._D = L, D # pylint:disable=invalid-name self._col_sequence = self._row_sequence = 1 self._media_sequence = None self._medias = [] self._invalid = self._total = 0
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Functions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[docs] @staticmethod def on_new_col(col: FecPacket): """ Called by FecGenerator when a new column FEC packet is generated and available for output. By default this method only print a message to `stdout`. .. seealso:: You can `monkey patch <http://stackoverflow.com/questions/5626193>`_ it. :param col: Generated column FEC packet """ print( f'New COL FEC packet ' f'seq={col.sequence} ' f'snbase={col.snbase} ' f'LxD={col.L}x{col.D} ' f'trec={col.timestamp_recovery}')
[docs] @staticmethod def on_new_row(row: FecPacket): """ Called by FecGenerator when a new row FEC packet is generated and available for output. By default this method only print a message to `stdout`. .. seealso:: You can `monkey patch <http://stackoverflow.com/questions/5626193>`_ it. :param row: Generated row FEC packet """ print( f'New ROW FEC packet ' f'seq={row.sequence} ' f'snbase={row.snbase} ' f'LxD={row.L}x{row.D} ' f'trec={row.timestamp_recovery}')
[docs] def on_reset(self, media: RtpPacket): """ Called by FecGenerator when the algorithm is reseted (an incoming media is out of sequence). By default this method only print a message to `stdout`. .. seealso:: You can `monkey patch <http://stackoverflow.com/questions/5626193>`_ it. :param media: Out of sequence media packet """ print( f'Media seq={media.sequence} is out of sequence ' f'(expected {self._media_sequence}) : FEC algorithm reseted !')
[docs] def put_media(self, media: RtpPacket): """ Put an incoming media packet. :param media: Incoming media packet **Example usage** Testing input of out of sequence medias: >>> g = FecGenerator(4, 5) >>> R = RtpPacket >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('Tabby', 'utf-8'))) Media seq=1 is out of sequence (expected None) : FEC algorithm reseted ! >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('1234', 'utf-8'))) Media seq=1 is out of sequence (expected 2) : FEC algorithm reseted ! >>> g.put_media(R.create(4, 400, R.MP2T_PT, bytearray('abcd', 'utf-8'))) Media seq=4 is out of sequence (expected 2) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('python', 'utf-8'))) Media seq=2 is out of sequence (expected 5) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('Kuota Kharma Evo', 'utf-8'))) Media seq=2 is out of sequence (expected 3) : FEC algorithm reseted ! >>> print(g) Matrix size L x D = 4 x 5 Total invalid media packets = 0 Total media packets received = 5 Column sequence number = 1 Row sequence number = 1 Media sequence number = 3 Medias buffer (seq. numbers) = [2] >>> if isinstance(g._medias[0].payload, bytearray): ... assert g._medias[0].payload == bytearray('Kuota Kharma Evo', 'utf-8') ... else: ... assert g._medias[0].payload == 'Kuota Kharma Evo' Testing a complete 3x4 matrix: >>> g = FecGenerator(3, 4) >>> R = RtpPacket >>> g.put_media(R.create(1, 100, R.MP2T_PT, bytearray('Tabby', 'utf-8'))) Media seq=1 is out of sequence (expected None) : FEC algorithm reseted ! >>> g.put_media(R.create(2, 200, R.MP2T_PT, bytearray('1234', 'utf-8'))) >>> g.put_media(R.create(3, 300, R.MP2T_PT, bytearray('abcd', 'utf-8'))) New ROW FEC packet seq=1 snbase=1 LxD=3xNone trec=384 >>> g.put_media(R.create(4, 400, R.MP2T_PT, bytearray('python', 'utf-8'))) >>> g.put_media(R.create(5, 500, R.MP2T_PT, bytearray('Kuota harma Evo', 'utf-8'))) >>> g.put_media(R.create(6, 600, R.MP2T_PT, bytearray('h0ffman', 'utf-8'))) New ROW FEC packet seq=2 snbase=4 LxD=3xNone trec=572 >>> g.put_media(R.create(7, 700, R.MP2T_PT, bytearray('mutable', 'utf-8'))) >>> g.put_media(R.create(8, 800, R.MP2T_PT, bytearray('10061987', 'utf-8'))) >>> g.put_media(R.create(9, 900, R.MP2T_PT, bytearray('OSCIED', 'utf-8'))) New ROW FEC packet seq=3 snbase=7 LxD=3xNone trec=536 >>> g.put_media(R.create(10, 1000, R.MP2T_PT, bytearray('5ème élément', 'utf-8'))) New COL FEC packet seq=1 snbase=1 LxD=3x4 trec=160 >>> print(g) Matrix size L x D = 3 x 4 Total invalid media packets = 0 Total media packets received = 10 Column sequence number = 2 Row sequence number = 4 Media sequence number = 11 Medias buffer (seq. numbers) = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> g.put_media(R.create(11, 1100, R.MP2T_PT, bytearray('Chaos Theory', 'utf-8'))) New COL FEC packet seq=2 snbase=2 LxD=3x4 trec=1616 >>> g.put_media(R.create(12, 1200, R.MP2T_PT, bytearray('Yes, it WORKS !', 'utf-8'))) New ROW FEC packet seq=4 snbase=10 LxD=3xNone trec=788 New COL FEC packet seq=3 snbase=3 LxD=3x4 trec=1088 >>> print(g) Matrix size L x D = 3 x 4 Total invalid media packets = 0 Total media packets received = 12 Column sequence number = 4 Row sequence number = 5 Media sequence number = 13 Medias buffer (seq. numbers) = [] """ self._total += 1 if not media.valid: self._invalid += 1 return # Compute expected media sequence number for next packet sequence = (media.sequence + 1) & RtpPacket.S_MASK # Ensure that protected media packets are not out of sequence to generate valid FEC # packets. If media packet sequence number is not at attended value, it may mean : # - Looped VLC broadcast session restarted media # - Some media packet are really lost between the emitter and this software # - An unknown feature (aka bug) makes this beautiful tool crazy ! if self._media_sequence is not None and media.sequence == self._media_sequence: self._medias.append(media) else: self._medias = [media] self.on_reset(media) self._media_sequence = sequence # Compute a new row FEC packet when a new row just filled with packets if len(self._medias) % self._L == 0: row_medias = self._medias[-self._L:] assert len(row_medias) == self._L row = FecPacket.compute( self._row_sequence, FecPacket.XOR, FecPacket.ROW, self._L, self._D, row_medias) self._row_sequence = (self._row_sequence + 1) & RtpPacket.S_MASK self.on_new_row(row) # Compute a new column FEC packet when a new column just filled with packets if len(self._medias) > self._L * (self._D - 1): first = len(self._medias) - self._L * (self._D - 1) - 1 col_medias = self._medias[first::self._L] assert len(col_medias) == self._D col = FecPacket.compute( self._col_sequence, FecPacket.XOR, FecPacket.COL, self._L, self._D, col_medias) self._col_sequence = (self._col_sequence + 1) & RtpPacket.S_MASK self.on_new_col(col) if len(self._medias) == self._L * self._D: self._medias = []
def __str__(self): """ Returns a string containing a formated representation of the FEC streams generator. **Example usage** >>> print(FecGenerator(5, 6)) Matrix size L x D = 5 x 6 Total invalid media packets = 0 Total media packets received = 0 Column sequence number = 1 Row sequence number = 1 Media sequence number = None Medias buffer (seq. numbers) = [] """ medias = [p.sequence for p in self._medias] return f"""Matrix size L x D = {self._L} x {self._D} Total invalid media packets = {self._invalid} Total media packets received = {self._total} Column sequence number = {self._col_sequence} Row sequence number = {self._row_sequence} Media sequence number = {self._media_sequence} Medias buffer (seq. numbers) = {medias}"""