Source code for pytoolbox.network.rtp

import struct

__all__ = ['RtpPacket']


[docs]class RtpPacket(object): # pylint:disable=too-many-instance-attributes """ This represent a real-time transport protocol (RTP) packet. * :rfc:`3550` * `Wikipedia (RTP) <http://en.wikipedia.org/wiki/Real-time_Transport_Protocol>`_ * `Parameters (RTP) <http://www.iana.org/assignments/rtp-parameters/rtp-parameters.xml>`_ **Packet header** * RFC 3550 page 13 .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |V=2|P|X| CC |M| PT | sequence number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | timestamp | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | synchronization source (SSRC) identifier | +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ | contributing source (CSRC) identifiers | | .... | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ **Extension header** * RFC 3550 page 19 .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | defined by profile | length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | header extension | | .... | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ """ # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Constants >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ER_VERSION = 'RTP Header : Version must be set to 2' ER_PADDING_LENGTH = 'RTP Header : Bad padding length' ER_EXTENSION_LENGTH = 'RTP Header : Bad extension length' ER_PAYLOAD = 'RTP packet must have a payload' HEADER_LENGTH = 12 V_MASK = 0xc0 V_SHIFT = 6 P_MASK = 0x20 X_MASK = 0x10 CC_MASK = 0x0f M_MASK = 0x80 PT_MASK = 0x7f DYNAMIC_PT = 96 # Dynamic payload type MP2T_PT = 33 # MPEG2 TS payload type MP2T_CLK = 90000 # MPEG2 TS clock rate [Hz] S_MASK = 0x0000ffff TS_MASK = 0xffffffff # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Properties >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @property def valid(self): """Returns True if this packet is a valid RTP packet.""" return len(self.errors) == 0 @property def validMP2T(self): # pylint:disable=invalid-name """Returns True if this packet is a valid RTP packet containing a MPEG2-TS payload.""" return self.valid and self.payload_type == self.MP2T_PT @property def errors(self): """ Returns an array containing any errors. :return: array of error message(s). **Example usage** Testing invalid header: >>> from pytoolbox.unittest import asserts >>> rtp = RtpPacket(bytearray(RtpPacket.HEADER_LENGTH-1), RtpPacket.HEADER_LENGTH-1) >>> asserts.list_equal(rtp.errors, [ ... 'RTP Header : Version must be set to 2', ... 'RTP packet must have a payload' ... ]) Testing a valid RTP packet with a MPEG2-TS payload: >>> rtp = RtpPacket.create(6, 777, RtpPacket.MP2T_PT, 'salut') >>> asserts.list_equal(rtp.errors, []) """ errors = [] if self._error: errors.append(self._error) if self.version != 2: errors.append(self.ER_VERSION) if self.payload_size == 0: errors.append(self.ER_PAYLOAD) return errors @property def clock_rate(self): """Return the MPEG2-TS clock rate of a MPEG2-TS payload or 1 if this is not.""" return self.MP2T_CLK if self.payload_type == self.MP2T_PT else 1 @property def header_size(self): """ Returns the length (aka size) of the header. **Example usage** >>> rtp = RtpPacket.create(6, 777, RtpPacket.MP2T_PT, 'salut') >>> print(rtp.header_size) 12 """ return self.HEADER_LENGTH + 4 * len(self.csrc) @property def payload_size(self): """ Returns the length (aka size) of the payload. **Example usage** >>> rtp = RtpPacket.create(6, 777, RtpPacket.MP2T_PT, 'salut') >>> print(rtp.payload_size) 5 """ return len(self.payload) if self.payload else 0 @property def time(self): """Return computed time (*timestamp / clock rate*).""" return self.timestamp / self.clock_rate @property def header_bytes(self): """ Return the RTP header bytes. *Example usage* >>> rtp = RtpPacket.create(6, 777, RtpPacket.MP2T_PT, bytearray.fromhex('00 01 02 03')) >>> print(rtp) version = 2 errors = [] padding = False extension = False marker = False payload type = 33 sequence = 6 timestamp = 777 clock rate = 90000 time = 0 ssrc = 0 csrc count = 0 payload size = 4 >>> header = rtp.header_bytes >>> assert len(header) == 12 >>> print(''.join(' %02x' % b for b in header)) 80 21 00 06 00 00 03 09 00 00 00 00 >>> header += rtp.payload >>> assert rtp == RtpPacket(header, len(header)) >>> rtp = RtpPacket.create(0xffffffff, 0xffffffffff, RtpPacket.DYNAMIC_PT, bytearray(1023)) >>> print(rtp) version = 2 errors = [] padding = False extension = False marker = False payload type = 96 sequence = 65535 timestamp = 4294967295 clock rate = 1 time = 4294967295 ssrc = 0 csrc count = 0 payload size = 1023 >>> header = rtp.header_bytes >>> assert len(header) == 12 >>> print(''.join(' %02x' % b for b in header)) 80 60 ff ff ff ff ff ff 00 00 00 00 >>> header += rtp.payload >>> assert rtp == RtpPacket(header, len(header)) """ cc = len(self.csrc) # pylint:disable=invalid-name header = bytearray(self.HEADER_LENGTH + 4 * cc) header[0] = ( ((self.version << self.V_SHIFT) & self.V_MASK) + (self.P_MASK if self.padding else 0) + (self.X_MASK if self.extension else 0) + (cc & self.CC_MASK)) header[1] = ( (self.M_MASK if self.marker else 0) + (self.payload_type & self.PT_MASK)) struct.pack_into(b'!H', header, 2, self.sequence) struct.pack_into(b'!I', header, 4, self.timestamp) struct.pack_into(b'!I', header, 8, self.ssrc) i = 12 for contributor in self.csrc: struct.pack_into(b'!I', header, i, contributor) i += 4 return header @property def bytes(self): """Return the RTP packet header and payload bytes.""" return self.header_bytes + self.payload # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Constructor >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[docs] def __init__(self, data, length): """ This constructor will parse input bytes array to fill packet's fields. In case of error (e.g. bad version number) the constructor will abort filling fields and un-updated fields are set to their corresponding default value. :param bytes: Input array of bytes to parse as a RTP packet :type bytes: bytearray :param length: Amount of bytes to read from the array of bytes :type length: int **Example usage** Testing invalid headers: >>> rtp = RtpPacket(bytearray(RtpPacket.HEADER_LENGTH-1), RtpPacket.HEADER_LENGTH-1) >>> rtp.valid # Bad length False >>> rtp = RtpPacket(bytearray(RtpPacket.HEADER_LENGTH), RtpPacket.HEADER_LENGTH) >>> rtp.valid # Bad version False >>> bytes = bytearray(RtpPacket.HEADER_LENGTH) >>> bytes[0] = 0xa0 >>> rtp = RtpPacket(bytes, RtpPacket.HEADER_LENGTH) >>> rtp.valid # Padding enabled but not present False Testing header fields value: >>> bytes = bytes.fromhex('80 a1 a4 25 ca fe b5 04 b0 60 5e bb 12 34') >>> rtp = RtpPacket(bytes, len(bytes)) >>> rtp.valid True >>> print(rtp) version = 2 errors = [] padding = False extension = False marker = True payload type = 33 sequence = 42021 timestamp = 3405690116 clock rate = 90000 time = 37841 ssrc = 2959105723 csrc count = 0 payload size = 2 >>> rtp.csrc [] >>> rtp.payload[0] 18 >>> rtp.payload[1] 52 Testing header fields value (with padding, extension and ccrc): >>> bytes = bytes.fromhex('b5a1a401 cafea421 b0605ebb 11111111 22222222 33333333 ' ... '44444444 55555555 00000004 87654321 12340002') >>> rtp = RtpPacket(bytes, len(bytes)) >>> rtp.valid True >>> rtp.version 2 >>> rtp.padding True >>> rtp.extension True >>> rtp.marker True >>> rtp.payload_type 33 >>> rtp.sequence 41985 >>> rtp.timestamp 3405685793 >>> rtp.clock_rate 90000 >>> rtp.ssrc 2959105723 >>> len(rtp.csrc) 5 >>> rtp.csrc [286331153, 572662306, 858993459, 1145324612, 1431655765] >>> rtp.payload bytearray(b'\\x124') """ # Fields default values self.version = 0 self.padding = False self.extension = False self.marker = False self.payload_type = 0 self.sequence = 0 self.timestamp = 0 self.ssrc = 0 self.csrc = [] self.payload = [] self._error = None offset = self.HEADER_LENGTH if length < offset: return self.version = (data[0] & self.V_MASK) >> 6 if self.version != 2: return self.padding = (data[0] & self.P_MASK) == self.P_MASK if self.padding: # Remove padding if present padding_length = data[-1] if padding_length == 0 or length < (offset + padding_length): self._error = self.ER_PADDING_LENGTH return length -= padding_length self.extension = (data[0] & self.X_MASK) == self.X_MASK cc = data[0] & self.CC_MASK # pylint:disable=invalid-name self.csrc = [] # cc > 0 ? new long[cc] : null self.marker = (data[1] & self.M_MASK) == self.M_MASK self.payload_type = data[1] & self.PT_MASK self.sequence = data[2] * 256 + data[3] self.timestamp = ((data[4] * 256 + data[5]) * 256 + data[6]) * 256 + data[7] self.ssrc = ((data[8] * 256 + data[9]) * 256 + data[10]) * 256 + data[11] for _ in range(cc): self.csrc.append( ((data[offset] * 256 + data[offset + 1]) * 256 + data[offset + 2]) * 256 + data[offset + 3]) offset += 4 # FIXME In session.c of VLC they store per-source statistics in a rtp_source_t struct if self.extension: # Extension header (ignored for now) extension_length = data[offset + 2] * 256 + data[offset + 3] offset += 4 + extension_length if length < offset: self._error = self.ER_EXTENSION_LENGTH return # And finally ... The payload ! self.payload = data[offset:length]
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Functions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # public int compareTo(RtpPacket pPacket): # BEFORE = -1 # EQUAL = 0 # AFTER = 1 # # This optimization is usually worthwhile, and can always be added # if (this == pPacket) return EQUAL # if (this.sequence < pPacket.sequence) return BEFORE # if (this.sequence > pPacket.sequence) return AFTER # return EQUAL
[docs] @classmethod def create(cls, sequence, timestamp, payload_type, payload): """ Create a valid RTP packet with a given payload. :param sequence: Packet sequence number (16 bits) :type sequence: int :param timestamp: Packet timestamp (32 bits) :type timestamp: int :param payload_type: Packet payload type (7 bits) :type payload_type: int :param payload: Packet payload, can be an array of bytes or a string :type payload: bytearray **Example usage** >>> p = RtpPacket.create(10, 1024, RtpPacket.MP2T_PT, 'The payload string') >>> q = RtpPacket.create(11, 1028, RtpPacket.MP2T_PT, bytearray.fromhex('00 11 22 33')) >>> r = RtpPacket.create(11, 1028, RtpPacket.DYNAMIC_PT, bytearray.fromhex('cc aa ff ee')) >>> assert p.validMP2T and q.validMP2T and r.valid """ rtp = cls(None, 0) rtp.version = 2 rtp.padding = False rtp.extension = False rtp.marker = False rtp.payload_type = payload_type & cls.PT_MASK rtp.sequence = sequence & cls.S_MASK rtp.timestamp = timestamp & cls.TS_MASK rtp.ssrc = 0 rtp.csrc = [] rtp.payload = payload return rtp
def __eq__(self, other): """ Equality test. .. warning:: Test equality only on some fields (not all) ! """ return ( isinstance(other, self.__class__) and self.sequence == other.sequence and self.timestamp == other.timestamp and self.payload_type == other.payload_type and self.payload == other.payload) def __str__(self): """ Returns a string containing a formated representation of the packet fields. **Example usage** >>> rtp = RtpPacket.create(6, 777, RtpPacket.MP2T_PT, 'salut les loulous') >>> print(rtp) version = 2 errors = [] padding = False extension = False marker = False payload type = 33 sequence = 6 timestamp = 777 clock rate = 90000 time = 0 ssrc = 0 csrc count = 0 payload size = 17 """ return f"""version = {self.version} errors = {self.errors} padding = {self.padding} extension = {self.extension} marker = {self.marker} payload type = {self.payload_type} sequence = {self.sequence} timestamp = {self.timestamp} clock rate = {self.clock_rate} time = {self.time:.0f} ssrc = {self.ssrc} csrc count = {len(self.csrc)} payload size = {self.payload_size}"""