# Copyright (c) 2022, 2023, 2025, Panagiotis Tsirigotis
# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module contains parsers to create Python objects from the output
of the **tc(8)** command.
"""
# pylint: disable=too-many-lines
from collections import deque
from typing import Any, Callable, Iterator, List, Optional
from .deps import get_logger
from .exceptions import TcError, TcParsingError
from .handle import Handle
_logger = get_logger("linuxnet.qos.parsers")
class LookaheadIterator:
"""A LookaheadIterator is an iterator that provides the ability to
put back previously returned tokens.
Conceptual view of the LookaheadIterator::
deque
+---------------+ +---+---+---+---+---+
| back-iterator | | T | T | T |...| T |
+---------------+ +---+---+---+---+---+
^
|
Cursor
* Tokens to the right of the cursor have been consumed.
* Tokens up to, but not including, the cursor are previously consumed
tokens that have been put back.
* New tokens are obtained from the back-iterator.
* The value of the cursor indicates the number of put-back tokens.
* The maximum size of the deque is equal to the lookahead.
"""
def __init__(self, iterable, lookahead: int):
"""
:param iterable: an iterable object from which we create
the back-iterator
:param lookahead: number of tokens of look ahead
"""
self.__iter = iter(iterable)
if lookahead <= 0:
raise ValueError(f'bad lookahead value {lookahead}')
self.__tokens = deque(maxlen=lookahead)
self.__cursor = 0
def __iter__(self):
return self
def __next__(self):
if self.__cursor == 0:
token = next(self.__iter)
self.__tokens.appendleft(token)
else:
self.__cursor -= 1
token = self.__tokens[self.__cursor]
return token
def peek(self) -> Optional[Any]:
"""Returns the next token, but does not consume it.
The value ``None`` is returned if there are no more tokens.
This can be ambiguous if ``None`` is a possible value held
in the underlying iterable object.
"""
try:
token = self.__next__()
self.put_back(token)
return token
except StopIteration:
return None
def put_back(self, token: Any) -> None:
"""Put back the specified token. This must be a token previously
returned by the iterator (identity is checked, not equality)
"""
if self.__cursor == len(self.__tokens):
# Either there are no consumed tokens, or this is an attempt to
# put back one more tokens than those already consumed.
raise ValueError('not a consumed token')
if token is not self.__tokens[self.__cursor]:
raise ValueError('wrong token')
self.__cursor += 1
def rewind(self, step=1) -> 'LookaheadIterator':
"""Put back last ``step`` tokens
A :exc:`ValueError` will be raised if there are not enough
tokens to put back.
"""
avail = len(self.__tokens) - self.__cursor
if step > avail:
raise ValueError(f'unable to rewind {step} token(s)')
self.__cursor += step
return self
def disable(self) -> None:
"""The iterator is disabled by
1) exhausting the underling iterator
2) clearing any stored tokens
"""
self.__iter = iter([])
self.__tokens.clear()
self.__cursor = 0
class LineGroupIterator(LookaheadIterator):
"""The LineGroupIterator is used to parse the output of
``tc filter ls``. It returns lines one-by-one and is capable of
backtracking. This allows the filter-specific
parsing code to return a line back to the iterator if it
does not belong to it.
"""
def __init__(self, tc_output: List[str]):
super().__init__(tc_output, lookahead=1)
self.__current_line = None
self.__field_iter = None
def __iter__(self):
return self
def __next__(self) -> str:
self.__current_line = super().__next__()
self.__field_iter = None
return self.__current_line
def next_field(self) -> str:
"""Returns the next field of the current line
"""
return next(self.get_field_iter())
def get_field_iter(self) -> Iterator[str]:
"""Returns an iterator over the fields of the current line
"""
if self.__current_line is None:
raise TcError("attempt to access next field before line iteration")
if self.__field_iter is None:
self.__field_iter = LookaheadIterator(
self.__current_line.split(), 1)
return self.__field_iter
def clear_field_iter(self):
"""This method removes the field iterator so that a call
to :meth:`get_field_iter` will create a new one to scan
the line from the beginning.
"""
if self.__field_iter is not None:
self.__field_iter.disable()
self.__field_iter = None
def get_last_line(self) -> Optional[str]:
"""Returns the last line returned by :meth:`__next__`
"""
return self.__current_line
def put_back(self, token: str) -> None:
"""Put back the specified token which is a line (string).
This invalidates self.__current_line, and clears the field iterator.
"""
super().put_back(token)
self.__current_line = None
self.clear_field_iter()
def rewind(self, step=1) -> 'LineGroupIterator':
"""Put back last ``step`` lines
A :exc:`ValueError` will be raised if there are not enough
lines to put back.
"""
super().rewind(step)
self.__current_line = None
self.clear_field_iter()
return self
[docs]class FilterOutputLine:
"""A class that holds a line of filter output.
One can iterate over the fields of the line::
def parse_fields(fline: FilterOutputLine):
for field in fline:
if field == 'xxx':
...
The entire line can be returned by using the :func:`str` builtin function.
"""
def __init__(self, line: str, fields: List[str]):
"""
:param line: the complete **tc(8)** filter line
:param fields: list of fields of the line **after** the filter type
"""
self.__line = line
self.__fields = fields
def __iter__(self):
return iter(self.__fields)
def __str__(self):
return self.__line
[docs]class FilterOutput:
"""An instance of this class contains the **tc(8)** output for a
single filter. This includes the filter description lines, followed
by action description lines. The lines are grouped, with groups
separated by empty lines. The first line group contains the filter
lines. Each successive line group describes an action.
"""
def __init__(self, proto: str, prio: int, filter_type: str, owner: 'QNode'):
"""
:param proto: filter protocol
:param prio: filter priority
:param filter_type: filter type
:param owner: :class:`QDisc`/:class:`QClass` that owns the filter
"""
self.__proto = proto
self.__prio = prio
self.__filter_type = filter_type
self.__owner = owner
# The __filter_lines are the lines starting with the word 'filter'
# The __nonfilter_lines are the rest.
self.__filter_lines = []
self.__nonfilter_lines = []
self.__nonfilter_lines_iter = None
self.__action_fields = None
def matches(self, proto: str, prio: int, filter_type: str) -> bool:
"""Returns ``True`` if the ``proto``, ``prio``, ``filter_type``
parameters match with the corresponding attributes of this object.
:meta private:
"""
return (self.__proto == proto and self.__prio == prio and
self.__filter_type == filter_type)
[docs] def get_prio(self) -> int:
"""Returns the priority value
"""
return self.__prio
[docs] def get_proto(self) -> str:
"""Returns the protocol value
"""
return self.__proto
[docs] def get_filter_type(self) -> str:
"""Returns the filter type
"""
return self.__filter_type
[docs] def get_filter_owner(self) -> 'QNode':
"""Returns the :class:`QDisc`/:class:`QClass` that owns the filter
"""
return self.__owner
[docs] def get_first_line(self) -> str:
"""Returns the first line from the **tc(8)** output for this filter
"""
return str(self.__filter_lines[0])
[docs] def has_nonfilter_lines(self) -> bool:
"""Returns ``True`` if the filter output has any lines not
starting with the word ``filter``
"""
return bool(self.__nonfilter_lines)
def has_actions(self) -> bool:
"""Returns ``True`` if the filter has any actions.
We assume that any remaining non-filter lines are action lines.
:meta private:
"""
return (self.__action_fields is not None or
self.has_nonfilter_lines() and
self.nonfilter_lines_iter().peek() is not None)
def get_action_fields(self) -> Optional[List[str]]:
"""Returns action-related fields that were in the 'filter' line,
if any.
:meta private:
"""
return self.__action_fields
def add_filter_line(self, line: str, fields: List[str]) -> None:
"""Add a line that starts with ``filter ``
:meta private:
"""
self.__filter_lines.append(FilterOutputLine(line, fields))
def add_line(self, line: str) -> None:
"""Add a non-filter prefixed line
:meta private:
"""
self.__nonfilter_lines.append(line)
[docs] def action_in_filter_line(self, action_fields: List[str]) -> None:
"""
This method can be used by a filter parser to indicate that
an action is part of a filter line (happens with the ``fw`` filter).
The rest of the lines, if any, are assumed to be action-related
lines.
"""
self.__action_fields = action_fields
[docs] def filter_lines_iter(self) -> Iterator[FilterOutputLine]:
"""Returns an iterator that returns :class:`FilterOutputLine`
instances.
"""
return iter(self.__filter_lines)
[docs] def nonfilter_lines_iter(self) -> LineGroupIterator:
"""Returns an iterator that returns lines (strings).
The same iterator instance is always returned.
"""
if self.__nonfilter_lines_iter is None:
self.__nonfilter_lines_iter = LineGroupIterator(
self.__nonfilter_lines)
return self.__nonfilter_lines_iter
def dump(self, path) -> None:
"""Append the contents of this object to the file at path.
This is used for debugging.
:meta private:
"""
with open(path, "a", encoding='utf-8') as outf:
for line in self.__filter_lines:
outf.write(str(line) + '\n')
for line in self.__nonfilter_lines:
outf.write(line + '\n')
outf.write("------------------\n")
def field_advance(producer: Callable[[], str], expected_field: str,
has_value=True) -> Optional[str]:
"""We are expecting the next field to be ``expected_field`` optionally
followed by its value.
We return that value, or ``None``.
"""
next_field = None
try:
next_field = producer()
if next_field != expected_field:
_logger.error("%s: expected '%s', found '%s'",
field_advance.__qualname__, expected_field, next_field)
raise TcParsingError(f"expecting '{expected_field}'")
return producer() if has_value else None
except StopIteration as stopit:
if next_field is None:
raise TcParsingError(f'missing {expected_field}') from stopit
raise TcParsingError(f'no value for {expected_field}') from stopit
[docs]class TrafficFilterParser:
"""Helper class that creates :class:`TrafficFilter` objects from the
output of the **tc(8)** command.
"""
#
# Key: the tuple (filter_type, protocol) - both strings
# Value: a TrafficFilter subclass
#
_filter_class_map = {}
#
# Key: the action name (a string)
# Value: a TrafficAction subclass
#
_action_class_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__filter_list = []
# __iter is a LineGroupIterator
self.__iter: LineGroupIterator = None
[docs] @classmethod
def register_filter(cls, *, filter_type: str, protocol: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`TrafficFilter` class).
This method is intended to be used for adding support for new
traffic filter types.
:param filter_type: a **tc(8)** filter type, e.g. ``u32``
:param protocol: a **tc(8)** protocol name, e.g. ``ip``
:param klass: the Python class for this ``(filter_type, protocol)``
"""
cls._filter_class_map[(filter_type, protocol)] = klass
[docs] @classmethod
def register_action(cls, *, action_name: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`TrafficAction` class).
This method is intended to be used for adding support for new
traffic actions.
:param action_name: **tc(8)** action, e.g. ``police``
:param klass: the Python class for this action
"""
cls._action_class_map[action_name] = klass
@classmethod
def __parse_action(cls, fields: List[str],
nfl_iter: LineGroupIterator) -> 'TrafficAction':
"""Parse the **tc(8)** output for a traffic action.
:param fields: fields of the output line identifying the action
and its arguments; ``fields[0]`` is the action type
:param nfl_iter: iterator returning the lines after the lines
that start with 'filter'
:rtype: a :class:`TrafficAction` instance
Raises a :exc:`TcParsingError` if unable to parse the action
:meta private:
"""
action_name = fields[0]
klass = cls._action_class_map.get(action_name)
if klass is None:
raise TcParsingError(f"unknown action {action_name}")
return klass.parse(fields[1:], nfl_iter)
def __parse_actions(self, # pylint: disable=too-many-branches
filt_output: FilterOutput,
traffic_filter: 'TrafficFilter') -> None:
"""Parse the actions in the FilterOutput
"""
nfl_iter: LineGroupIterator = filt_output.nonfilter_lines_iter()
#
# First check for an action that was part of a 'filter' line
#
action_fields = filt_output.get_action_fields()
if action_fields is not None:
action = self.__parse_action(action_fields, nfl_iter)
if action is None:
raise TcParsingError(
f'failed to parse action: {action_fields[0]}',
line=nfl_iter.get_last_line())
action.parse_common_lines(nfl_iter)
traffic_filter.add_action(action)
for nonfilter_line in nfl_iter:
action = None
try:
line = nonfilter_line.strip()
fields = line.split()
if fields[0] == 'action':
#
# This is a new-style action; the line format is:
# action order 1: police 0x1 rate 20Kbit ...
#
if fields[1] != 'order' or len(fields) < 4:
raise TcParsingError(
"unable to parse filter action line",
line=nonfilter_line)
action = self.__parse_action(fields[3:], nfl_iter)
elif fields[0] == 'police':
#
# This is an old-style police action; the line format is:
# police 0x2 rate 40Kbit ...
#
action = self.__parse_action(fields, nfl_iter)
elif fields[0] == 'Sent':
#
# This is encountered when we have an old-style police
# action with stats. The output is:
# police 0x2 rate 40Kbit ...
# <empty-line>
# Sent 0 bytes 0 pkts (dropped 0, overlimits 0)
#
action_list = traffic_filter.get_actions()
if not action_list:
raise TcParsingError(
f'unexpected field in non-filter line: {fields[0]}',
line=nonfilter_line)
if not action_list[-1].parse_single_stats_line(line):
_logger.warning("Unable to parse line: %s",
nonfilter_line)
else:
raise TcParsingError(
f'unknown field in non-filter line: {fields[0]}',
line=nonfilter_line)
if action is not None:
action.parse_common_lines(nfl_iter)
traffic_filter.add_action(action)
else:
# Skip up to and including the next empty line
while True:
line = next(nfl_iter, None)
if not line:
break
except TcParsingError as parserr:
self.__parsing_errors += 1
if not self.__allow_parsing_errors:
raise
_logger.warning("action parsing error, '%s'", parserr)
def __process_filter(self, filt_output: FilterOutput) -> None:
"""Try to create a new :class:`TrafficFilter` from ``filt_output``.
The new filter will be added to self.__filter_list.
"""
#
# Currently we only support 'ip' filters
#
line = filt_output.get_first_line()
protocol = filt_output.get_proto()
if protocol != 'ip':
_logger.error("found protocol '%s', expected 'ip' (owner=%s)",
protocol, filt_output.get_filter_owner())
raise TcParsingError(
f"unable to handle protocol '{protocol}'", line=line)
filter_type = filt_output.get_filter_type()
klass = self._filter_class_map.get((filter_type, protocol))
if klass is None:
_logger.error("unable to handle filter type '%s' (owner=%s)",
filter_type, filt_output.get_filter_owner())
raise TcParsingError(
f"unable to handle filter type '{filter_type}'", line=line)
traffic_filter = klass.parse(filt_output)
if filt_output.has_actions():
self.__parse_actions(filt_output, traffic_filter)
if traffic_filter.get_dest_handle() is None:
_logger.warning("filter %s has no dest handle (owner=%s)",
traffic_filter, filt_output.get_filter_owner())
traffic_filter._mark_as_instantiated()
self.__filter_list.append(traffic_filter)
def __advance(self, field_name: str, has_value=True) -> Optional[str]:
"""We are expecting the next field to be ``field_name``
optionally followed by its value.
We return that value.
"""
# Note that self.__iter.next_field is a callable
return field_advance(self.__iter.next_field, field_name, has_value)
def parse_output(self, tc_output_lines: List[str],
owner: 'QNode') -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`TrafficFilter` objects; the list can be accessed via
the :meth:`get_filter_list` method.
:meta private:
"""
self.__filter_list = []
self.__iter = LineGroupIterator(tc_output_lines)
#
# Process lines into FilterOutput objects.
# Each FilterOutput object has the lines of one filter.
# Once all lines of a filter are seen, invoke the
# filter's parse method to create the TrafficFilter object.
#
filt_output = None
for line in self.__iter:
if not line.startswith('filter '):
if filt_output is None:
_logger.error("unexpected filter line: '%s' (owner=%s)",
line, owner)
raise TcParsingError('unexpected filter line', line=line)
filt_output.add_line(line)
continue
#
# We expect a filter line to look like this:
# filter protocol <val> pref <int> <type>
#
try:
_ = self.__advance('filter', has_value=False)
protocol = self.__advance('protocol')
priostr = self.__advance('pref')
try:
prio = int(priostr)
except ValueError as valerr:
_logger.error("bad filter priority: %s (owner=%s)",
priostr, owner)
raise TcParsingError(
f'bad filter priority: {priostr}') from valerr
try:
filter_type = self.__iter.next_field()
except StopIteration as stopit:
_logger.error(
"filter line without filter type: '%s' (owner=%s)",
line, owner)
raise TcParsingError("missing filter type") from stopit
if filt_output is None:
filt_output = FilterOutput(protocol, prio,
filter_type, owner)
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
continue
if filt_output.matches(protocol, prio, filter_type):
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
continue
#
# Beginning of output for a new filter.
# Process the one we have.
#
self.__process_filter(filt_output)
except TcParsingError as parserr:
self.__parsing_errors += 1
parserr.set_line(line)
if not self.__allow_parsing_errors:
raise
_logger.warning("allowing filter parsing error: %s (owner=%s)",
parserr, owner)
filt_output = FilterOutput(protocol, prio, filter_type, owner)
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
if filt_output is not None:
try:
self.__process_filter(filt_output)
except TcParsingError as parserr:
self.__parsing_errors += 1
parserr.set_line(filt_output.get_first_line())
if not self.__allow_parsing_errors:
raise
_logger.warning("allowing filter parsing error: %s (owner=%s)",
parserr, owner)
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
def get_filter_list(self) -> List['TrafficFilter']:
"""Returns a list of :class:`TrafficFilter` objects from the
parsed output
:meta private:
"""
return self.__filter_list
def get_filter(self) -> Optional['TrafficFilter']:
"""Returns the first :class:`TrafficFilter` from the parsed output,
or ``None`` if no filter was successfully parsed.
:meta private:
"""
return self.__filter_list[0] if self.__filter_list else None
def _group_split(lines: List[str], marker: str) -> List[List[str]]:
"""Given a list of lines, break them into groups of
consecutive lines, where the first line of each group starts with
the ``marker`` string.
"""
group_list = []
line_group = []
for line in lines:
if not line:
continue
if line.startswith(marker):
# Beginning of new line group
if line_group:
group_list.append(line_group)
line_group = []
line_group.append(line)
else:
if line_group:
line_group.append(line)
else:
raise TcParsingError(
f"first line does not start with '{marker}'", line=line)
if line_group:
group_list.append(line_group)
return group_list
[docs]class QClassOutput:
"""Helper class used for parsing ``tc class ls`` output for a single qclass
"""
def __init__(self, line_group: List[str]):
"""
:param line_group: list of lines, guaranteed not to be empty
"""
self.__line_iter = LineGroupIterator(line_group)
self.__handle = None
self.__parent_handle = None
self.__qclass_line = None
self.__qdisc_handle = None
[docs] def get_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the queuing class.
"""
return self.__handle
[docs] def get_parent_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the parent of the
queuing class.
"""
return self.__parent_handle
[docs] def get_qdisc_handle(self) -> Optional[Handle]:
"""Returns the :class:`Handle` of a (leaf) qdisc
"""
return self.__qdisc_handle
def get_linegroup_iter(self) -> LineGroupIterator:
"""Returns the LineGroupIterator for the tc output lines.
:meta private:
"""
return self.__line_iter
[docs] def get_class_line(self) -> str:
"""Returns the **tc(8)** output class line
"""
return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]:
"""Return an iterator for the (remaining) fields of the class line
"""
return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str:
"""Parse the first line and return a string with the qdisc type
(e.g. 'htb')
:meta private:
"""
if self.__qclass_line is not None:
raise TcError('attempt to parse first line twice')
self.__qclass_line = next(self.__line_iter)
field_iter = self.get_field_iter()
try:
#
# All class lines have the form:
#
# class <type> <handle> parent <handle> [leaf <qdisc-handle>] ...
#
# where the ... part is type-specific
#
if next(field_iter) != 'class':
raise TcParsingError("line does not start with 'class'")
qdisc_type = next(field_iter)
# The handle string may not include a major number, e.g.
# class mq :1 root
# or
# class mq :1 parent 10:
#
# We need to parse the parent handle before we can
# parse the class handle. If the parent is root, we assume
# the major number is 0 (so the class handle for the first
# line will be 0:1)
handle_str = next(field_iter)
try:
handle = Handle.parse(handle_str)
except TcParsingError:
handle = None
parent_field = next(field_iter)
if parent_field == 'root':
parent_major = 0 if handle is None else handle.major
self.__parent_handle = Handle.qdisc_handle(parent_major)
elif parent_field == 'parent':
self.__parent_handle = Handle.parse(next(field_iter))
else:
raise TcParsingError(
f"cannot determine class parent from field {parent_field}")
if handle is None:
self.__handle = Handle.parse(handle_str,
default_major=self.__parent_handle.major)
else:
self.__handle = handle
if field_iter.peek() == 'leaf':
_ = next(field_iter)
self.__qdisc_handle = Handle.parse(next(field_iter))
return qdisc_type
except StopIteration as stopit:
raise TcParsingError("not enough fields") from stopit
[docs]class QClassParser:
"""Helper class that creates :class:`QClass` objects from the
output of the **tc(8)** command.
"""
_qclass_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__qclass_list = []
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
[docs] @classmethod
def register_qclass(cls, ident: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`QClass` class).
This method is intended to be used for adding support for new
queuing discipline classes.
:param ident: the queuing class name that appears in the
``tc -s class ls`` output.
:param klass: the Python class for this queuing class
"""
cls._qclass_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`QClass` objects; the list can be accessed via
the :meth:`get_qclass_list` method.
:meta private:
"""
self.__qclass_list = []
for line_group in _group_split(tc_output_lines, 'class '):
qclass_output = QClassOutput(line_group)
try:
qdisc_type = qclass_output.parse_first_line()
klass = self._qclass_map.get(qdisc_type)
if klass is None:
if qdisc_type == 'sfq':
# SFQ is classless, so this should never happen;
# yet on CentOS 6.10, I observed the following in
# the output of 'tc class ls':
#
# class sfq 202:2c9 parent 202:
#
# 202: was a SFQ qdisc; the class minor number
# changed for every invocation of 'tc class ls'
_logger.warning("classless SFQ has a class: %s",
qclass_output.get_class_line())
continue
raise TcParsingError(f"unknown qdisc type {qdisc_type}")
qclass = klass.parse(qclass_output)
qclass._parse_stats(qclass_output.get_linegroup_iter())
self.__qclass_list.append(qclass)
except TcParsingError as parserr:
self.__parsing_errors += 1
line = qclass_output.get_class_line()
if not self.__allow_parsing_errors:
parserr.set_line(line)
raise
_logger.warning("%s: parsing error, line='%s'",
self.parse_output.__qualname__, line)
def get_qclass_list(self) -> List['QClass']:
"""Returns a list of :class:`QClass` objects from the
parsed output
:meta private:
"""
return self.__qclass_list
def get_qclass(self) -> Optional['QClass']:
"""Returns the first :class:`QClass` from the parsed output,
or ``None`` if no queuing class was successfully parsed.
:meta private:
"""
return self.__qclass_list[0] if self.__qclass_list else None
[docs]class QDiscOutput:
"""Helper class used for parsing ``tc qdisc ls`` output for a single qdisc
"""
def __init__(self, line_group: List[str]):
"""
:param line_group: list of lines, guaranteed not to be empty
"""
self.__line_iter = LineGroupIterator(line_group)
self.__handle = None
self.__parent_handle = None
self.__refcnt = None
self.__qdisc_line = None
[docs] def get_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the queuing discipline.
"""
return self.__handle
[docs] def get_parent_handle(self) -> Optional[Handle]:
"""Returns the (parsed) :class:`Handle` of the parent of this
queueing discipline, or ``None`` if this is a root qdisc
"""
return self.__parent_handle
[docs] def get_refcnt(self) -> Optional[int]:
"""Returns reference count of qdisc (``None`` for non-root qdiscs,
and for some root qdiscs like ``mq``)
"""
return self.__refcnt
def get_linegroup_iter(self) -> LineGroupIterator:
"""Returns the LineGroupIterator for the tc output lines.
:meta private:
"""
return self.__line_iter
[docs] def get_qdisc_line(self) -> str:
"""Returns the **tc(8)** output qdisc line
"""
return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]:
"""Return an iterator for the (remaining) fields of the qdisc line
"""
return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str:
"""Parse the first line and return a string with the qdisc type
(e.g. 'htb')
:meta private:
"""
if self.__qdisc_line is not None:
raise TcError('attempt to parse first line twice')
self.__qdisc_line = next(self.__line_iter)
field_iter = self.__line_iter.get_field_iter()
try:
#
# All qdisc lines have the form:
#
# qdisc <type> <handle> (root refcnt <num> |parent <handle>) ...
#
# where the ... part is type-specific
#
if next(field_iter) != 'qdisc':
raise TcParsingError("line does not start with 'qdisc'")
qdisc_type = next(field_iter)
self.__handle = Handle.parse(next(field_iter))
parent_field = next(field_iter)
if parent_field == 'root':
# Some root qdiscs like 'mq' do not provide a refcount
next_field = next(field_iter, None)
if next_field is None:
return qdisc_type
if next_field != 'refcnt':
raise TcParsingError(
f"found '{next_field}' after 'root' "
"instead of 'refcnt'")
self.__refcnt = int(next(field_iter))
elif parent_field == 'parent':
self.__parent_handle = Handle.parse(next(field_iter),
default_major=self.__handle.major)
else:
raise TcParsingError(
f"cannot determine qdisc parent from field {parent_field}")
return qdisc_type
except StopIteration as stopit:
raise TcParsingError("not enough fields") from stopit
[docs]class QDiscParser:
"""Helper class that creates :class:`QDisc` objects from the
output of the **tc(8)** command.
"""
_qdisc_class_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__qdisc_list = []
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
[docs] @classmethod
def register_qdisc(cls, ident: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`QDisc` class).
This method is intended to be used for adding support for new
queuing disciplines.
:param ident: the qdisc name that appears in the
``tc -s qdisc ls`` output.
:param klass: the Python class for this queuing discipline
"""
cls._qdisc_class_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`QDisc` objects; the list can be accessed via
the :meth:`get_qdisc_list` method.
:meta private:
"""
#
# High-level logic:
# - parse the output into line groups, one for each qdisc; parsing
# is at the syntactic level only
# - for each group, determine the particular qdisc and let it
# parse the output
#
# Parsing requirements:
# 1. The 1st line needs to be partially parsed to determine the
# specific qdisc
# 2. The next 2 lines (with stats) can be parsed by common code
# because they are the same across qdisc's
# 3. Give the option to the qdisc-specific parsing code to parse
# the whole output
# 4. Give the option to the qdisc-specific code to use the
# common parsing code
#
# Based on the above, the QDiscOutput object contains the common
# parsing code. Consequently, it also holds parsed fields
# (like handles etc.)
#
self.__qdisc_list = []
for line_group in _group_split(tc_output_lines, 'qdisc '):
qdisc_output = QDiscOutput(line_group)
try:
qdisc_type = qdisc_output.parse_first_line()
klass = self._qdisc_class_map.get(qdisc_type)
if klass is None:
raise TcParsingError(f"unknown qdisc {qdisc_type}")
qdisc = klass.parse(qdisc_output)
qdisc._parse_stats(qdisc_output.get_linegroup_iter())
self.__qdisc_list.append(qdisc)
except TcParsingError as parserr:
self.__parsing_errors += 1
line = qdisc_output.get_qdisc_line()
if not self.__allow_parsing_errors:
parserr.set_line(line)
raise
_logger.warning("%s: parsing error, line='%s'",
self.parse_output.__qualname__, line)
def get_qdisc_list(self) -> List['QDisc']:
"""Returns a list of :class:`QDisc` objects from the
parsed output
:meta private:
"""
return self.__qdisc_list
def get_qdisc(self) -> Optional['QDisc']:
"""Returns the first :class:`QDisc` from the parsed output,
or ``None`` if no queuing discipline was successfully parsed.
:meta private:
"""
return self.__qdisc_list[0] if self.__qdisc_list else None