Source code for linuxnet.qos.qdiscs.prio

# Copyright (c) 2022, 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.

"""This module provides access to the PRIO queueing discipline
"""

from typing import List, Optional, Sequence, Tuple

from ..deps import get_logger
from ..exceptions import TcParsingError, TcError
from ..handle import Handle
from ..parsers import QClassParser, QDiscParser

from .qdisc import QDisc, QClass


_logger = get_logger("linuxnet.qos.qdiscs.prio")

[docs]class PrioQClass(QClass): """A class of the :class:`PrioQDisc` (``prio``) queuing discipline. """ def __str__(self): return f"PrioQClass({self.get_handle()})" def qclass_creation_args(self) -> List[str]: """The traffic classes of the ``prio`` qdisc are automatically instantiated when the qdisc is instantiated, and the same is true of the corresponding :class:`PrioQClass` objects. An attempt to invoke this method to instantiate such a class will result in a :class:`TcError` exception. :meta private: """ raise TcError( 'classes of the prio qdisc cannot be manually instantiated')
[docs] def get_description(self) -> str: """Returns a string describing the class and its attributes """ class_name = self.get_class_name() if class_name is None: retval = str(self) else: retval = f'{class_name}({self.get_handle()}) prio' return retval
@classmethod def parse(cls, qclass_output) -> 'PrioQClass': """Create a :class:`PrioQClass` from the output of **tc(8)** :meta private: """ # # The tc output looks like this: # # class prio 1:1 parent 1: # # Everyhing has been consumed by the caller. # return PrioQClass(qclass_output.get_handle(), qclass_output.get_parent_handle())
[docs]class PrioQDisc(QDisc): """This class provides access to the Priority queueing discipline of Linux (see **tc-prio(8)**). """ DEFAULT_BANDS = 3 DEFAULT_PRIOMAP = (1, 2, 2, 2, 1, 2, 0, 0 , 1, 1, 1, 1, 1, 1, 1, 1) def __init__(self, qdisc_handle: Handle, parent_handle: Optional[Handle], bands: Optional[int] =None, priomap: Optional[Sequence[int]] =None): """ :param qdisc_handle: handle of this :class:`PrioQDisc` :param parent_handle: handle of parent, ``None`` if this is a root queuing discipline :param bands: as documented in **tc-prio(8)** :param priomap: as documented in **tc-prio(8)** The ``bands`` and ``priomap`` parameters must be consistent: * ``priomap`` must have 16 entries (number of Linux priorities) * value of each entry in ``priomap`` must be less than ``bands`` """ super().__init__(qdisc_handle, parent_handle) if bands is None: self.__bands = self.DEFAULT_BANDS else: if bands <= 0: raise TcError(f'invalid number of bands: {bands}') self.__bands = bands if priomap is not None: n_prio = len(self.DEFAULT_PRIOMAP) if len(priomap) != n_prio: raise TcError(f'priomap must have {n_prio} entries') for band in priomap: # pylint: disable=superfluous-parens if not (0 <= band < self.__bands): raise TcError(f'invalid band number {band} in priomap') self.__priomap = tuple(priomap) else: # The default priomap requires at least 3 bands. if self.__bands < self.DEFAULT_BANDS: raise TcError( f'need at least {self.DEFAULT_BANDS} bands ' 'when using default priomap') self.__priomap = self.DEFAULT_PRIOMAP def __str__(self): return f"PrioQDisc({self.get_handle()})"
[docs] def get_description(self) -> str: """Returns a string describing the queuing discipline and its attributes """ priomap_str = ' '.join([str(i) for i in self.__priomap]) return (super().get_description() + f' bands {self.__bands} priomap {priomap_str}')
[docs] def get_bands(self) -> int: """Returns the number of bands """ return self.__bands
[docs] def get_priomap(self) -> Tuple[int]: """Returns the priomap """ return self.__priomap
[docs] def qdisc_creation_args(self) -> List[str]: """Returns the arguments expected by the **tc(8)** command to create a prio qdisc """ args = ['prio', 'bands', str(self.__bands), 'priomap'] for val in self.__priomap: args.append(str(val)) return args
def _instantiate_qdisc(self, config) -> None: """Instantiate this qdisc inside the kernel. """ super()._instantiate_qdisc(config) # # Create the child PrioQClass objects since the corresponding # kernel traffic classes are created automatically. # qdisc_handle = self.get_handle() major = qdisc_handle.major qclass_map = config.get_qclass_map() for band in self.__bands: class_handle = Handle(major, band+1) prioclass = PrioQClass(class_handle, qdisc_handle) prioclass._set_config(config) self._add_child_class(prioclass) qclass_map[class_handle] = prioclass _logger.info("%s: registered automatically created class %s", self._instantiate_qdisc.__qualname__, prioclass) @classmethod def parse(cls, qdisc_output) -> 'PrioQDisc': """Create a :class:`PrioQDisc` object from the output of the **tc(8)** command. :meta private: """ field_iter = qdisc_output.get_field_iter() # # The fields are generated from a split of a line like this: # # qdisc prio 2: root refcnt 2 bands 3 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1 # # The next field to be returned from field_iter is 'bands' # bands = None priomap = None for field in field_iter: if field == 'bands': try: bands = int(next(field_iter)) except ValueError as valerr: raise TcParsingError('bad number of bands') from valerr elif field == 'priomap': priomap = [int(v) for v in field_iter] else: raise TcParsingError(f"unknown argument '{field}'") return PrioQDisc(qdisc_output.get_handle(), qdisc_output.get_parent_handle(), bands, priomap)
QDiscParser.register_qdisc('prio', PrioQDisc) QClassParser.register_qclass('prio', PrioQClass)