Source code for linuxnet.qos.parsers

# Copyright (c) 2022, 2023, 2025, Panagiotis Tsirigotis

# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module contains parsers to create Python objects from the output
of the **tc(8)** command.
"""

# pylint: disable=too-many-lines

from collections import deque
from typing import Any, Callable, Iterator, List, Optional

from .deps import get_logger
from .exceptions import TcError, TcParsingError
from .handle import Handle


_logger = get_logger("linuxnet.qos.parsers")


class LookaheadIterator:
    """A LookaheadIterator is an iterator that provides the ability to
    put back previously returned tokens.

    Conceptual view of the LookaheadIterator::

                               deque
       +---------------+  +---+---+---+---+---+
       | back-iterator |  | T | T | T |...| T |
       +---------------+  +---+---+---+---+---+
                                ^
                                |
                              Cursor

    * Tokens to the right of the cursor have been consumed.
    * Tokens up to, but not including, the cursor are previously consumed
      tokens that have been put back.
    * New tokens are obtained from the back-iterator.
    * The value of the cursor indicates the number of put-back tokens.
    * The maximum size of the deque is equal to the lookahead.
    """
    def __init__(self, iterable, lookahead: int):
        """
        :param iterable: an iterable object from which we create
            the back-iterator
        :param lookahead: number of tokens of look ahead
        """
        self.__iter = iter(iterable)
        if lookahead <= 0:
            raise ValueError(f'bad lookahead value {lookahead}')
        self.__tokens = deque(maxlen=lookahead)
        self.__cursor = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.__cursor == 0:
            token = next(self.__iter)
            self.__tokens.appendleft(token)
        else:
            self.__cursor -= 1
            token = self.__tokens[self.__cursor]
        return token

    def peek(self) -> Optional[Any]:
        """Returns the next token, but does not consume it.
        The value ``None`` is returned if there are no more tokens.
        This can be ambiguous if ``None`` is a possible value held
        in the underlying iterable object.
        """
        try:
            token = self.__next__()
            self.put_back(token)
            return token
        except StopIteration:
            return None

    def put_back(self, token: Any) -> None:
        """Put back the specified token. This must be a token previously
        returned by the iterator (identity is checked, not equality)
        """
        if self.__cursor == len(self.__tokens):
            # Either there are no consumed tokens, or this is an attempt to
            # put back one more tokens than those already consumed.
            raise ValueError('not a consumed token')
        if token is not self.__tokens[self.__cursor]:
            raise ValueError('wrong token')
        self.__cursor += 1

    def rewind(self, step=1) -> 'LookaheadIterator':
        """Put back last ``step`` tokens

        A :exc:`ValueError` will be raised if there are not enough
        tokens to put back.
        """
        avail = len(self.__tokens) - self.__cursor
        if step > avail:
            raise ValueError(f'unable to rewind {step} token(s)')
        self.__cursor += step
        return self

    def disable(self) -> None:
        """The iterator is disabled by
            1) exhausting the underling iterator
            2) clearing any stored tokens
        """
        self.__iter = iter([])
        self.__tokens.clear()
        self.__cursor = 0


class LineGroupIterator(LookaheadIterator):
    """The LineGroupIterator is used to parse the output of
    ``tc filter ls``. It returns lines one-by-one and is capable of
    backtracking. This allows the filter-specific
    parsing code to return a line back to the iterator if it
    does not belong to it.
    """
    def __init__(self, tc_output: List[str]):
        super().__init__(tc_output, lookahead=1)
        self.__current_line = None
        self.__field_iter = None

    def __iter__(self):
        return self

    def __next__(self) -> str:
        self.__current_line = super().__next__()
        self.__field_iter = None
        return self.__current_line

    def next_field(self) -> str:
        """Returns the next field of the current line
        """
        return next(self.get_field_iter())

    def get_field_iter(self) -> Iterator[str]:
        """Returns an iterator over the fields of the current line
        """
        if self.__current_line is None:
            raise TcError("attempt to access next field before line iteration")
        if self.__field_iter is None:
            self.__field_iter = LookaheadIterator(
                                        self.__current_line.split(), 1)
        return self.__field_iter

    def clear_field_iter(self):
        """This method removes the field iterator so that a call
        to :meth:`get_field_iter` will create a new one to scan
        the line from the beginning.
        """
        if self.__field_iter is not None:
            self.__field_iter.disable()
            self.__field_iter = None

    def get_last_line(self) -> Optional[str]:
        """Returns the last line returned by :meth:`__next__`
        """
        return self.__current_line

    def put_back(self, token: str) -> None:
        """Put back the specified token which is a line (string).
        This invalidates self.__current_line, and clears the field iterator.
        """
        super().put_back(token)
        self.__current_line = None
        self.clear_field_iter()

    def rewind(self, step=1) -> 'LineGroupIterator':
        """Put back last ``step`` lines

        A :exc:`ValueError` will be raised if there are not enough
        lines to put back.
        """
        super().rewind(step)
        self.__current_line = None
        self.clear_field_iter()
        return self


[docs]class FilterOutputLine: """A class that holds a line of filter output. One can iterate over the fields of the line:: def parse_fields(fline: FilterOutputLine): for field in fline: if field == 'xxx': ... The entire line can be returned by using the :func:`str` builtin function. """ def __init__(self, line: str, fields: List[str]): """ :param line: the complete **tc(8)** filter line :param fields: list of fields of the line **after** the filter type """ self.__line = line self.__fields = fields def __iter__(self): return iter(self.__fields) def __str__(self): return self.__line
[docs]class FilterOutput: """An instance of this class contains the **tc(8)** output for a single filter. This includes the filter description lines, followed by action description lines. The lines are grouped, with groups separated by empty lines. The first line group contains the filter lines. Each successive line group describes an action. """ def __init__(self, proto: str, prio: int, filter_type: str, owner: 'QNode'): """ :param proto: filter protocol :param prio: filter priority :param filter_type: filter type :param owner: :class:`QDisc`/:class:`QClass` that owns the filter """ self.__proto = proto self.__prio = prio self.__filter_type = filter_type self.__owner = owner # The __filter_lines are the lines starting with the word 'filter' # The __nonfilter_lines are the rest. self.__filter_lines = [] self.__nonfilter_lines = [] self.__nonfilter_lines_iter = None self.__action_fields = None def matches(self, proto: str, prio: int, filter_type: str) -> bool: """Returns ``True`` if the ``proto``, ``prio``, ``filter_type`` parameters match with the corresponding attributes of this object. :meta private: """ return (self.__proto == proto and self.__prio == prio and self.__filter_type == filter_type)
[docs] def get_prio(self) -> int: """Returns the priority value """ return self.__prio
[docs] def get_proto(self) -> str: """Returns the protocol value """ return self.__proto
[docs] def get_filter_type(self) -> str: """Returns the filter type """ return self.__filter_type
[docs] def get_filter_owner(self) -> 'QNode': """Returns the :class:`QDisc`/:class:`QClass` that owns the filter """ return self.__owner
[docs] def get_first_line(self) -> str: """Returns the first line from the **tc(8)** output for this filter """ return str(self.__filter_lines[0])
[docs] def has_nonfilter_lines(self) -> bool: """Returns ``True`` if the filter output has any lines not starting with the word ``filter`` """ return bool(self.__nonfilter_lines)
def has_actions(self) -> bool: """Returns ``True`` if the filter has any actions. We assume that any remaining non-filter lines are action lines. :meta private: """ return (self.__action_fields is not None or self.has_nonfilter_lines() and self.nonfilter_lines_iter().peek() is not None) def get_action_fields(self) -> Optional[List[str]]: """Returns action-related fields that were in the 'filter' line, if any. :meta private: """ return self.__action_fields def add_filter_line(self, line: str, fields: List[str]) -> None: """Add a line that starts with ``filter `` :meta private: """ self.__filter_lines.append(FilterOutputLine(line, fields)) def add_line(self, line: str) -> None: """Add a non-filter prefixed line :meta private: """ self.__nonfilter_lines.append(line)
[docs] def action_in_filter_line(self, action_fields: List[str]) -> None: """ This method can be used by a filter parser to indicate that an action is part of a filter line (happens with the ``fw`` filter). The rest of the lines, if any, are assumed to be action-related lines. """ self.__action_fields = action_fields
[docs] def filter_lines_iter(self) -> Iterator[FilterOutputLine]: """Returns an iterator that returns :class:`FilterOutputLine` instances. """ return iter(self.__filter_lines)
[docs] def nonfilter_lines_iter(self) -> LineGroupIterator: """Returns an iterator that returns lines (strings). The same iterator instance is always returned. """ if self.__nonfilter_lines_iter is None: self.__nonfilter_lines_iter = LineGroupIterator( self.__nonfilter_lines) return self.__nonfilter_lines_iter
def dump(self, path) -> None: """Append the contents of this object to the file at path. This is used for debugging. :meta private: """ with open(path, "a", encoding='utf-8') as outf: for line in self.__filter_lines: outf.write(str(line) + '\n') for line in self.__nonfilter_lines: outf.write(line + '\n') outf.write("------------------\n")
def field_advance(producer: Callable[[], str], expected_field: str, has_value=True) -> Optional[str]: """We are expecting the next field to be ``expected_field`` optionally followed by its value. We return that value, or ``None``. """ next_field = None try: next_field = producer() if next_field != expected_field: _logger.error("%s: expected '%s', found '%s'", field_advance.__qualname__, expected_field, next_field) raise TcParsingError(f"expecting '{expected_field}'") return producer() if has_value else None except StopIteration as stopit: if next_field is None: raise TcParsingError(f'missing {expected_field}') from stopit raise TcParsingError(f'no value for {expected_field}') from stopit
[docs]class TrafficFilterParser: """Helper class that creates :class:`TrafficFilter` objects from the output of the **tc(8)** command. """ # # Key: the tuple (filter_type, protocol) - both strings # Value: a TrafficFilter subclass # _filter_class_map = {} # # Key: the action name (a string) # Value: a TrafficAction subclass # _action_class_map = {} def __init__(self, allow_parsing_errors: bool): self.__allow_parsing_errors = allow_parsing_errors self.__parsing_errors = 0 self.__filter_list = [] # __iter is a LineGroupIterator self.__iter: LineGroupIterator = None
[docs] @classmethod def register_filter(cls, *, filter_type: str, protocol: str, klass) -> None: """Register the given class (which should be a subclass of the :class:`TrafficFilter` class). This method is intended to be used for adding support for new traffic filter types. :param filter_type: a **tc(8)** filter type, e.g. ``u32`` :param protocol: a **tc(8)** protocol name, e.g. ``ip`` :param klass: the Python class for this ``(filter_type, protocol)`` """ cls._filter_class_map[(filter_type, protocol)] = klass
[docs] @classmethod def register_action(cls, *, action_name: str, klass) -> None: """Register the given class (which should be a subclass of the :class:`TrafficAction` class). This method is intended to be used for adding support for new traffic actions. :param action_name: **tc(8)** action, e.g. ``police`` :param klass: the Python class for this action """ cls._action_class_map[action_name] = klass
@classmethod def __parse_action(cls, fields: List[str], nfl_iter: LineGroupIterator) -> 'TrafficAction': """Parse the **tc(8)** output for a traffic action. :param fields: fields of the output line identifying the action and its arguments; ``fields[0]`` is the action type :param nfl_iter: iterator returning the lines after the lines that start with 'filter' :rtype: a :class:`TrafficAction` instance Raises a :exc:`TcParsingError` if unable to parse the action :meta private: """ action_name = fields[0] klass = cls._action_class_map.get(action_name) if klass is None: raise TcParsingError(f"unknown action {action_name}") return klass.parse(fields[1:], nfl_iter) def __parse_actions(self, # pylint: disable=too-many-branches filt_output: FilterOutput, traffic_filter: 'TrafficFilter') -> None: """Parse the actions in the FilterOutput """ nfl_iter: LineGroupIterator = filt_output.nonfilter_lines_iter() # # First check for an action that was part of a 'filter' line # action_fields = filt_output.get_action_fields() if action_fields is not None: action = self.__parse_action(action_fields, nfl_iter) if action is None: raise TcParsingError( f'failed to parse action: {action_fields[0]}', line=nfl_iter.get_last_line()) action.parse_common_lines(nfl_iter) traffic_filter.add_action(action) for nonfilter_line in nfl_iter: action = None try: line = nonfilter_line.strip() fields = line.split() if fields[0] == 'action': # # This is a new-style action; the line format is: # action order 1: police 0x1 rate 20Kbit ... # if fields[1] != 'order' or len(fields) < 4: raise TcParsingError( "unable to parse filter action line", line=nonfilter_line) action = self.__parse_action(fields[3:], nfl_iter) elif fields[0] == 'police': # # This is an old-style police action; the line format is: # police 0x2 rate 40Kbit ... # action = self.__parse_action(fields, nfl_iter) elif fields[0] == 'Sent': # # This is encountered when we have an old-style police # action with stats. The output is: # police 0x2 rate 40Kbit ... # <empty-line> # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) # action_list = traffic_filter.get_actions() if not action_list: raise TcParsingError( f'unexpected field in non-filter line: {fields[0]}', line=nonfilter_line) if not action_list[-1].parse_single_stats_line(line): _logger.warning("Unable to parse line: %s", nonfilter_line) else: raise TcParsingError( f'unknown field in non-filter line: {fields[0]}', line=nonfilter_line) if action is not None: action.parse_common_lines(nfl_iter) traffic_filter.add_action(action) else: # Skip up to and including the next empty line while True: line = next(nfl_iter, None) if not line: break except TcParsingError as parserr: self.__parsing_errors += 1 if not self.__allow_parsing_errors: raise _logger.warning("action parsing error, '%s'", parserr) def __process_filter(self, filt_output: FilterOutput) -> None: """Try to create a new :class:`TrafficFilter` from ``filt_output``. The new filter will be added to self.__filter_list. """ # # Currently we only support 'ip' filters # line = filt_output.get_first_line() protocol = filt_output.get_proto() if protocol != 'ip': _logger.error("found protocol '%s', expected 'ip' (owner=%s)", protocol, filt_output.get_filter_owner()) raise TcParsingError( f"unable to handle protocol '{protocol}'", line=line) filter_type = filt_output.get_filter_type() klass = self._filter_class_map.get((filter_type, protocol)) if klass is None: _logger.error("unable to handle filter type '%s' (owner=%s)", filter_type, filt_output.get_filter_owner()) raise TcParsingError( f"unable to handle filter type '{filter_type}'", line=line) traffic_filter = klass.parse(filt_output) if filt_output.has_actions(): self.__parse_actions(filt_output, traffic_filter) if traffic_filter.get_dest_handle() is None: _logger.warning("filter %s has no dest handle (owner=%s)", traffic_filter, filt_output.get_filter_owner()) traffic_filter._mark_as_instantiated() self.__filter_list.append(traffic_filter) def __advance(self, field_name: str, has_value=True) -> Optional[str]: """We are expecting the next field to be ``field_name`` optionally followed by its value. We return that value. """ # Note that self.__iter.next_field is a callable return field_advance(self.__iter.next_field, field_name, has_value) def parse_output(self, tc_output_lines: List[str], owner: 'QNode') -> None: """Parse the **tc(8)** output in ``tc_output_lines`` into a list of :class:`TrafficFilter` objects; the list can be accessed via the :meth:`get_filter_list` method. :meta private: """ self.__filter_list = [] self.__iter = LineGroupIterator(tc_output_lines) # # Process lines into FilterOutput objects. # Each FilterOutput object has the lines of one filter. # Once all lines of a filter are seen, invoke the # filter's parse method to create the TrafficFilter object. # filt_output = None for line in self.__iter: if not line.startswith('filter '): if filt_output is None: _logger.error("unexpected filter line: '%s' (owner=%s)", line, owner) raise TcParsingError('unexpected filter line', line=line) filt_output.add_line(line) continue # # We expect a filter line to look like this: # filter protocol <val> pref <int> <type> # try: _ = self.__advance('filter', has_value=False) protocol = self.__advance('protocol') priostr = self.__advance('pref') try: prio = int(priostr) except ValueError as valerr: _logger.error("bad filter priority: %s (owner=%s)", priostr, owner) raise TcParsingError( f'bad filter priority: {priostr}') from valerr try: filter_type = self.__iter.next_field() except StopIteration as stopit: _logger.error( "filter line without filter type: '%s' (owner=%s)", line, owner) raise TcParsingError("missing filter type") from stopit if filt_output is None: filt_output = FilterOutput(protocol, prio, filter_type, owner) filt_output.add_filter_line(line, fields=list(self.__iter.get_field_iter())) continue if filt_output.matches(protocol, prio, filter_type): filt_output.add_filter_line(line, fields=list(self.__iter.get_field_iter())) continue # # Beginning of output for a new filter. # Process the one we have. # self.__process_filter(filt_output) except TcParsingError as parserr: self.__parsing_errors += 1 parserr.set_line(line) if not self.__allow_parsing_errors: raise _logger.warning("allowing filter parsing error: %s (owner=%s)", parserr, owner) filt_output = FilterOutput(protocol, prio, filter_type, owner) filt_output.add_filter_line(line, fields=list(self.__iter.get_field_iter())) if filt_output is not None: try: self.__process_filter(filt_output) except TcParsingError as parserr: self.__parsing_errors += 1 parserr.set_line(filt_output.get_first_line()) if not self.__allow_parsing_errors: raise _logger.warning("allowing filter parsing error: %s (owner=%s)", parserr, owner) def get_error_count(self) -> int: """Returns number of parsing errors encountered :meta private: """ return self.__parsing_errors def get_filter_list(self) -> List['TrafficFilter']: """Returns a list of :class:`TrafficFilter` objects from the parsed output :meta private: """ return self.__filter_list def get_filter(self) -> Optional['TrafficFilter']: """Returns the first :class:`TrafficFilter` from the parsed output, or ``None`` if no filter was successfully parsed. :meta private: """ return self.__filter_list[0] if self.__filter_list else None
def _group_split(lines: List[str], marker: str) -> List[List[str]]: """Given a list of lines, break them into groups of consecutive lines, where the first line of each group starts with the ``marker`` string. """ group_list = [] line_group = [] for line in lines: if not line: continue if line.startswith(marker): # Beginning of new line group if line_group: group_list.append(line_group) line_group = [] line_group.append(line) else: if line_group: line_group.append(line) else: raise TcParsingError( f"first line does not start with '{marker}'", line=line) if line_group: group_list.append(line_group) return group_list
[docs]class QClassOutput: """Helper class used for parsing ``tc class ls`` output for a single qclass """ def __init__(self, line_group: List[str]): """ :param line_group: list of lines, guaranteed not to be empty """ self.__line_iter = LineGroupIterator(line_group) self.__handle = None self.__parent_handle = None self.__qclass_line = None self.__qdisc_handle = None
[docs] def get_handle(self) -> Handle: """Returns the (parsed) :class:`Handle` of the queuing class. """ return self.__handle
[docs] def get_parent_handle(self) -> Handle: """Returns the (parsed) :class:`Handle` of the parent of the queuing class. """ return self.__parent_handle
[docs] def get_qdisc_handle(self) -> Optional[Handle]: """Returns the :class:`Handle` of a (leaf) qdisc """ return self.__qdisc_handle
def get_linegroup_iter(self) -> LineGroupIterator: """Returns the LineGroupIterator for the tc output lines. :meta private: """ return self.__line_iter
[docs] def get_class_line(self) -> str: """Returns the **tc(8)** output class line """ return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]: """Return an iterator for the (remaining) fields of the class line """ return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str: """Parse the first line and return a string with the qdisc type (e.g. 'htb') :meta private: """ if self.__qclass_line is not None: raise TcError('attempt to parse first line twice') self.__qclass_line = next(self.__line_iter) field_iter = self.get_field_iter() try: # # All class lines have the form: # # class <type> <handle> parent <handle> [leaf <qdisc-handle>] ... # # where the ... part is type-specific # if next(field_iter) != 'class': raise TcParsingError("line does not start with 'class'") qdisc_type = next(field_iter) # The handle string may not include a major number, e.g. # class mq :1 root # or # class mq :1 parent 10: # # We need to parse the parent handle before we can # parse the class handle. If the parent is root, we assume # the major number is 0 (so the class handle for the first # line will be 0:1) handle_str = next(field_iter) try: handle = Handle.parse(handle_str) except TcParsingError: handle = None parent_field = next(field_iter) if parent_field == 'root': parent_major = 0 if handle is None else handle.major self.__parent_handle = Handle.qdisc_handle(parent_major) elif parent_field == 'parent': self.__parent_handle = Handle.parse(next(field_iter)) else: raise TcParsingError( f"cannot determine class parent from field {parent_field}") if handle is None: self.__handle = Handle.parse(handle_str, default_major=self.__parent_handle.major) else: self.__handle = handle if field_iter.peek() == 'leaf': _ = next(field_iter) self.__qdisc_handle = Handle.parse(next(field_iter)) return qdisc_type except StopIteration as stopit: raise TcParsingError("not enough fields") from stopit
[docs]class QClassParser: """Helper class that creates :class:`QClass` objects from the output of the **tc(8)** command. """ _qclass_map = {} def __init__(self, allow_parsing_errors: bool): self.__allow_parsing_errors = allow_parsing_errors self.__parsing_errors = 0 self.__qclass_list = [] def get_error_count(self) -> int: """Returns number of parsing errors encountered :meta private: """ return self.__parsing_errors
[docs] @classmethod def register_qclass(cls, ident: str, klass) -> None: """Register the given class (which should be a subclass of the :class:`QClass` class). This method is intended to be used for adding support for new queuing discipline classes. :param ident: the queuing class name that appears in the ``tc -s class ls`` output. :param klass: the Python class for this queuing class """ cls._qclass_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None: """Parse the **tc(8)** output in ``tc_output_lines`` into a list of :class:`QClass` objects; the list can be accessed via the :meth:`get_qclass_list` method. :meta private: """ self.__qclass_list = [] for line_group in _group_split(tc_output_lines, 'class '): qclass_output = QClassOutput(line_group) try: qdisc_type = qclass_output.parse_first_line() klass = self._qclass_map.get(qdisc_type) if klass is None: if qdisc_type == 'sfq': # SFQ is classless, so this should never happen; # yet on CentOS 6.10, I observed the following in # the output of 'tc class ls': # # class sfq 202:2c9 parent 202: # # 202: was a SFQ qdisc; the class minor number # changed for every invocation of 'tc class ls' _logger.warning("classless SFQ has a class: %s", qclass_output.get_class_line()) continue raise TcParsingError(f"unknown qdisc type {qdisc_type}") qclass = klass.parse(qclass_output) qclass._parse_stats(qclass_output.get_linegroup_iter()) self.__qclass_list.append(qclass) except TcParsingError as parserr: self.__parsing_errors += 1 line = qclass_output.get_class_line() if not self.__allow_parsing_errors: parserr.set_line(line) raise _logger.warning("%s: parsing error, line='%s'", self.parse_output.__qualname__, line) def get_qclass_list(self) -> List['QClass']: """Returns a list of :class:`QClass` objects from the parsed output :meta private: """ return self.__qclass_list def get_qclass(self) -> Optional['QClass']: """Returns the first :class:`QClass` from the parsed output, or ``None`` if no queuing class was successfully parsed. :meta private: """ return self.__qclass_list[0] if self.__qclass_list else None
[docs]class QDiscOutput: """Helper class used for parsing ``tc qdisc ls`` output for a single qdisc """ def __init__(self, line_group: List[str]): """ :param line_group: list of lines, guaranteed not to be empty """ self.__line_iter = LineGroupIterator(line_group) self.__handle = None self.__parent_handle = None self.__refcnt = None self.__qdisc_line = None
[docs] def get_handle(self) -> Handle: """Returns the (parsed) :class:`Handle` of the queuing discipline. """ return self.__handle
[docs] def get_parent_handle(self) -> Optional[Handle]: """Returns the (parsed) :class:`Handle` of the parent of this queueing discipline, or ``None`` if this is a root qdisc """ return self.__parent_handle
[docs] def get_refcnt(self) -> Optional[int]: """Returns reference count of qdisc (``None`` for non-root qdiscs, and for some root qdiscs like ``mq``) """ return self.__refcnt
def get_linegroup_iter(self) -> LineGroupIterator: """Returns the LineGroupIterator for the tc output lines. :meta private: """ return self.__line_iter
[docs] def get_qdisc_line(self) -> str: """Returns the **tc(8)** output qdisc line """ return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]: """Return an iterator for the (remaining) fields of the qdisc line """ return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str: """Parse the first line and return a string with the qdisc type (e.g. 'htb') :meta private: """ if self.__qdisc_line is not None: raise TcError('attempt to parse first line twice') self.__qdisc_line = next(self.__line_iter) field_iter = self.__line_iter.get_field_iter() try: # # All qdisc lines have the form: # # qdisc <type> <handle> (root refcnt <num> |parent <handle>) ... # # where the ... part is type-specific # if next(field_iter) != 'qdisc': raise TcParsingError("line does not start with 'qdisc'") qdisc_type = next(field_iter) self.__handle = Handle.parse(next(field_iter)) parent_field = next(field_iter) if parent_field == 'root': # Some root qdiscs like 'mq' do not provide a refcount next_field = next(field_iter, None) if next_field is None: return qdisc_type if next_field != 'refcnt': raise TcParsingError( f"found '{next_field}' after 'root' " "instead of 'refcnt'") self.__refcnt = int(next(field_iter)) elif parent_field == 'parent': self.__parent_handle = Handle.parse(next(field_iter), default_major=self.__handle.major) else: raise TcParsingError( f"cannot determine qdisc parent from field {parent_field}") return qdisc_type except StopIteration as stopit: raise TcParsingError("not enough fields") from stopit
[docs]class QDiscParser: """Helper class that creates :class:`QDisc` objects from the output of the **tc(8)** command. """ _qdisc_class_map = {} def __init__(self, allow_parsing_errors: bool): self.__allow_parsing_errors = allow_parsing_errors self.__parsing_errors = 0 self.__qdisc_list = [] def get_error_count(self) -> int: """Returns number of parsing errors encountered :meta private: """ return self.__parsing_errors
[docs] @classmethod def register_qdisc(cls, ident: str, klass) -> None: """Register the given class (which should be a subclass of the :class:`QDisc` class). This method is intended to be used for adding support for new queuing disciplines. :param ident: the qdisc name that appears in the ``tc -s qdisc ls`` output. :param klass: the Python class for this queuing discipline """ cls._qdisc_class_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None: """Parse the **tc(8)** output in ``tc_output_lines`` into a list of :class:`QDisc` objects; the list can be accessed via the :meth:`get_qdisc_list` method. :meta private: """ # # High-level logic: # - parse the output into line groups, one for each qdisc; parsing # is at the syntactic level only # - for each group, determine the particular qdisc and let it # parse the output # # Parsing requirements: # 1. The 1st line needs to be partially parsed to determine the # specific qdisc # 2. The next 2 lines (with stats) can be parsed by common code # because they are the same across qdisc's # 3. Give the option to the qdisc-specific parsing code to parse # the whole output # 4. Give the option to the qdisc-specific code to use the # common parsing code # # Based on the above, the QDiscOutput object contains the common # parsing code. Consequently, it also holds parsed fields # (like handles etc.) # self.__qdisc_list = [] for line_group in _group_split(tc_output_lines, 'qdisc '): qdisc_output = QDiscOutput(line_group) try: qdisc_type = qdisc_output.parse_first_line() klass = self._qdisc_class_map.get(qdisc_type) if klass is None: raise TcParsingError(f"unknown qdisc {qdisc_type}") qdisc = klass.parse(qdisc_output) qdisc._parse_stats(qdisc_output.get_linegroup_iter()) self.__qdisc_list.append(qdisc) except TcParsingError as parserr: self.__parsing_errors += 1 line = qdisc_output.get_qdisc_line() if not self.__allow_parsing_errors: parserr.set_line(line) raise _logger.warning("%s: parsing error, line='%s'", self.parse_output.__qualname__, line) def get_qdisc_list(self) -> List['QDisc']: """Returns a list of :class:`QDisc` objects from the parsed output :meta private: """ return self.__qdisc_list def get_qdisc(self) -> Optional['QDisc']: """Returns the first :class:`QDisc` from the parsed output, or ``None`` if no queuing discipline was successfully parsed. :meta private: """ return self.__qdisc_list[0] if self.__qdisc_list else None