# Copyright (c) 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module contains parsers to create Python objects from the output
of the **tc(8)** command.
"""
from collections import deque
from typing import Any, Callable, Iterator, List, Optional
from .deps import get_logger
from .exceptions import TcError, TcParsingError
from .handle import Handle
_logger = get_logger("linuxnet.qos.parsers")
class LookaheadIterator:
"""A LookaheadIterator is an iterator that provides the ability to
put back previously returned tokens.
Conceptual view of the LookaheadIterator::
deque
+---------------+ +---+---+---+---+---+
| back-iterator | | T | T | T |...| T |
+---------------+ +---+---+---+---+---+
^
|
Cursor
* Tokens to the right of the cursor have been consumed.
* Tokens up to, but not including, the cursor are previously consumed
tokens that have been put back.
* New tokens are obtained from the back-iterator.
* The value of the cursor indicates the number of put-back tokens.
* The maximum size of the deque is equal to the lookahead.
"""
def __init__(self, iterable, lookahead: int):
"""
:param iterable: an iterable object from which we create
the back-iterator
:param lookahead: number of tokens of look ahead
"""
self.__iter = iter(iterable)
if lookahead <= 0:
raise ValueError(f'bad lookahead value {lookahead}')
self.__tokens = deque(maxlen=lookahead)
self.__cursor = 0
def __iter__(self):
return self
def __next__(self):
if self.__cursor == 0:
token = next(self.__iter)
self.__tokens.appendleft(token)
else:
self.__cursor -= 1
token = self.__tokens[self.__cursor]
return token
def peek(self) -> Optional[Any]:
"""Returns the next token, but does not consume it
"""
try:
token = self.__next__()
self.put_back(token)
return token
except StopIteration:
return None
def put_back(self, token: str) -> None:
"""Put back the specified token. This must be a token previously
returned by the iterator (identity is checked, not equality)
"""
if self.__cursor == len(self.__tokens):
# Either there are no consumed tokens, or this is an attempt to
# put back one more tokens than those already consumed.
raise ValueError('not a consumed token')
if token is not self.__tokens[self.__cursor]:
raise ValueError('wrong token')
self.__cursor += 1
def rewind(self, step=1) -> 'LookaheadIterator':
"""Put back last ``step`` tokens
A :exc:`ValueError` will be raised if there are not enough
tokens to put back.
"""
avail = len(self.__tokens) - self.__cursor
if step > avail:
raise ValueError(f'unable to rewind {step} token(s)')
self.__cursor += step
return self
class LineGroupIterator:
"""The LineGroupIterator is used to parse the output of
``tc filter ls``. It returns lines one-by-one and is capable of
single-level backtracking. This allows the filter-specific
parsing code to return a line back to the iterator if it
does not belong to it.
"""
def __init__(self, tc_output: List[str]):
self.__line_iter = iter(tc_output)
self.__backtracked_line = None
self.__current_line = None
self.__field_iter = None
def __iter__(self):
return self
def __next__(self) -> str:
"""Returns either the backtracked line or the next line from
the sub-iterator
"""
if self.__backtracked_line is None:
self.__current_line = next(self.__line_iter)
else:
self.__current_line = self.__backtracked_line
self.__backtracked_line = None
self.__field_iter = None
return self.__current_line
def next_field(self) -> str:
"""Returns the next field of the current line
"""
return next(self.get_field_iter())
def get_field_iter(self) -> Iterator[str]:
"""Returns an iterator over the fields of the current line
"""
if self.__current_line is None:
raise TcError("attempt to access next field before line iteration")
if self.__field_iter is None:
self.__field_iter = LookaheadIterator(
self.__current_line.split(), 1)
return self.__field_iter
def clear_field_iter(self):
"""This method removes the field iterator so that a call
to :meth:`get_field_iter` will create a new one to scan
the line from the beginning.
"""
self.__field_iter = None
def get_last_line(self) -> str:
"""Returns the last line returned by :meth:`__next__`
"""
return self.__current_line
def backtrack(self) -> None:
"""Backtrack the current line.
"""
if self.__backtracked_line is not None:
_logger.error("%s: attempt to backtrack twice; current line: %s",
self.backtrack.__qualname__, self.__backtracked_line)
raise TcError('attempt to backtrack twice')
self.__backtracked_line = self.__current_line
self.__current_line = None
self.__field_iter = None
[docs]class FilterOutputLine:
"""A class that holds a line of filter output.
One can iterate over the fields of the line::
def parse_fields(fline: FilterOutputLine):
for field in fline:
if field == 'xxx':
...
The entire line can be returned by using the :func:`str` builtin function.
"""
def __init__(self, line: str, fields: List[str]):
"""
:param line: the complete **tc(8)** filter line
:param fields: list of fields of the line **after** the filter type
"""
self.__line = line
self.__fields = fields
def __iter__(self):
return iter(self.__fields)
def __str__(self):
return self.__line
[docs]class FilterOutput:
"""An instance of this class contains the **tc(8)** output for a
single filter.
"""
def __init__(self, proto: str, prio: int, filter_type: str, owner: 'QNode'):
"""
:param proto: filter protocol
:param prio: filter priority
:param filter_type: filter type
:param owner: :class:`QDisc`/:class:`QClass` that owns the filter
"""
self.__proto = proto
self.__prio = prio
self.__filter_type = filter_type
self.__owner = owner
# The __filter_lines are the lines starting with the word 'filter'
# The __nonfilter_lines are the rest.
self.__filter_lines = []
self.__nonfilter_lines = []
self.__nonfilter_lines_iter = None
def matches(self, proto: str, prio: int, filter_type: str) -> bool:
"""Returns ``True`` if the ``proto``, ``prio``, ``filter_type``
parameters match with the corresponding attributes of this object.
:meta private:
"""
return (self.__proto == proto and self.__prio == prio and
self.__filter_type == filter_type)
[docs] def get_prio(self) -> int:
"""Returns the priority value
"""
return self.__prio
[docs] def get_proto(self) -> str:
"""Returns the protocol value
"""
return self.__proto
[docs] def get_filter_type(self) -> str:
"""Returns the filter type
"""
return self.__filter_type
[docs] def get_filter_owner(self) -> 'QNode':
"""Returns the :class:`QDisc`/:class:`QClass` that owns the filter
"""
return self.__owner
[docs] def get_first_line(self) -> str:
"""Returns the first line from the **tc(8)** output for this filter
"""
return str(self.__filter_lines[0])
[docs] def has_nonfilter_lines(self) -> bool:
"""Returns ``True`` if the filter output has any lines not
starting with the word ``filter``
"""
return bool(self.__nonfilter_lines)
def has_actions(self) -> bool:
"""Returns ``True`` if the filter has any actions.
We assume that any remaining non-filter lines
are action lines.
:meta private:
"""
return (self.has_nonfilter_lines() and
self.nonfilter_lines_iter().peek() is not None)
def add_filter_line(self, line: str, fields: List[str]) -> None:
"""Add a line that starts with ``filter ``
:meta private:
"""
self.__filter_lines.append(FilterOutputLine(line, fields))
def add_line(self, line: str) -> None:
"""Add a non-filter prefixed line
:meta private:
"""
self.__nonfilter_lines.append(line)
[docs] def filter_lines_iter(self) -> Iterator[FilterOutputLine]:
"""Returns an iterator that returns :class:`FilterOutputLine`
instances.
"""
return iter(self.__filter_lines)
[docs] def nonfilter_lines_iter(self) -> Iterator[str]:
"""Returns an iterator that returns lines (strings)
"""
if self.__nonfilter_lines_iter is None:
self.__nonfilter_lines_iter = LookaheadIterator(
self.__nonfilter_lines, 1)
return self.__nonfilter_lines_iter
def dump(self, path) -> None:
"""Append the contents of this object to the file at path.
This is used for debugging.
:meta private:
"""
with open(path, "a", encoding='utf-8') as outf:
for line in self.__filter_lines:
outf.write(str(line) + '\n')
for line in self.__nonfilter_lines:
outf.write(line + '\n')
outf.write("------------------\n")
def field_advance(producer: Callable[[], str], expected_field: str,
has_value=True) -> Optional[str]:
"""We are expecting the next field to be ``expected_field`` optionally
followed by its value.
We return that value, or ``None``.
"""
next_field = None
try:
next_field = producer()
if next_field != expected_field:
_logger.error("%s: expected '%s', found '%s'",
field_advance.__qualname__, expected_field, next_field)
raise TcParsingError(f"expecting '{expected_field}'")
return producer() if has_value else None
except StopIteration as stopit:
if next_field is None:
raise TcParsingError(f'missing {expected_field}') from stopit
raise TcParsingError(f'no value for {expected_field}') from stopit
[docs]class TrafficFilterParser:
"""Helper class that creates :class:`TrafficFilter` objects from the
output of the **tc(8)** command.
"""
#
# Key: the tuple (filter_type, protocol) - both strings
# Value: a TrafficFilter subclass
#
_filter_class_map = {}
#
# Key: the action name (a string)
# Value: a TrafficAction subclass
#
_action_class_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__filter_list = []
# __iter is a LineGroupIterator
self.__iter: LineGroupIterator = None
[docs] @classmethod
def register_filter(cls, *, filter_type: str, protocol: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`TrafficFilter` class).
This method is intended to be used for adding support for new
traffic filter types.
:param filter_type: a **tc(8)** filter type, e.g. ``u32``
:param protocol: a **tc(8)** protocol name, e.g. ``ip``
:param klass: the Python class for this ``(filter_type, protocol)``
"""
cls._filter_class_map[(filter_type, protocol)] = klass
[docs] @classmethod
def register_action(cls, *, action_name: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`TrafficAction` class).
This method is intended to be used for adding support for new
traffic actions.
:param action_name: **tc(8)** action, e.g. ``police``
:param klass: the Python class for this action
"""
cls._action_class_map[action_name] = klass
[docs] @classmethod
def parse_action(cls, fields: List[str],
nfl_iter: Iterator[str]) -> 'TrafficAction':
"""Parse the **tc(8)** output for a traffic action.
:param fields: fields of the output line identifying the action
and its arguments; ``fields[0]`` is the action type
:param nfl_iter: iterator returning the lines after the lines
that start with 'filter'
:rtype: a :class:`TrafficAction` instance
Raises a :exc:`TcParsingError` if unable to parse the action
"""
action_name = fields[0]
klass = cls._action_class_map.get(action_name)
if klass is None:
raise TcParsingError(f"unknown action {action_name}")
return klass.parse(fields[1:], nfl_iter)
def __parse_actions(self, filt_output: FilterOutput,
traffic_filter: 'TrafficFilter') -> None:
"""Parse the actions in the FilterOutput
"""
nfl_iter = filt_output.nonfilter_lines_iter()
for nonfilter_line in nfl_iter:
action = None
try:
line = nonfilter_line.strip()
fields = line.split()
if fields[0] == 'action':
if fields[1] != 'order' or len(fields) < 4:
raise TcParsingError(
"unable to parse filter action line",
line=nonfilter_line)
action = self.parse_action(fields[3:], nfl_iter)
elif fields[0] == 'police':
action = self.parse_action(fields, nfl_iter)
else:
raise TcParsingError(
f'unknown field in non-filter line: {fields[0]}',
line=nonfilter_line)
if action is not None:
traffic_filter.add_action(action)
except TcParsingError as parserr:
self.__parsing_errors += 1
if not self.__allow_parsing_errors:
raise
_logger.warning("action parsing error, '%s'", parserr)
def __process_filter(self, filt_output: FilterOutput) -> None:
"""Try to create a new :class:`TrafficFilter` from ``filt_output``.
"""
#
# Currently we only support 'ip' filters
#
line = filt_output.get_first_line()
protocol = filt_output.get_proto()
if protocol != 'ip':
_logger.error("found protocol '%s', expected 'ip' (owner=%s)",
protocol, filt_output.get_filter_owner())
raise TcParsingError(
f"unable to handle protocol '{protocol}'", line=line)
filter_type = filt_output.get_filter_type()
klass = self._filter_class_map.get((filter_type, protocol))
if klass is None:
_logger.error("unable to handle filter type '%s' (owner=%s)",
filter_type, filt_output.get_filter_owner())
raise TcParsingError(
f"unable to handle filter type '{filter_type}'", line=line)
traffic_filter = klass.parse(filt_output)
if filt_output.has_actions():
self.__parse_actions(filt_output, traffic_filter)
if traffic_filter.get_dest_handle() is None:
_logger.warning("filter %s has no dest handle (owner=%s)",
traffic_filter, filt_output.get_filter_owner())
traffic_filter._mark_as_instantiated()
self.__filter_list.append(traffic_filter)
def __advance(self, field_name: str, has_value=True) -> Optional[str]:
"""We are expecting the next field to be ``field_name``
optionally followed by its value.
We return that value.
"""
# Note that self.__iter.next_field is a callable
return field_advance(self.__iter.next_field, field_name, has_value)
def parse_output(self, tc_output_lines: List[str],
owner: 'QNode') -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`TrafficFilter` objects; the list can be accessed via
the :meth:`get_filter_list` method.
:meta private:
"""
self.__filter_list = []
self.__iter = LineGroupIterator(tc_output_lines)
#
# Process lines into FilterOutput objects.
# Each FilterOutput object has the lines of one filter.
# Once all lines of a filter are seen, invoke the
# filter's parse method to create the TrafficFilter object.
#
filt_output = None
for line in self.__iter:
if not line.startswith('filter '):
if filt_output is None:
_logger.error("unexpected filter line: '%s' (owner=%s)",
line, owner)
raise TcParsingError('unexpected filter line', line=line)
filt_output.add_line(line)
continue
#
# We expect a filter line to look like this:
# filter protocol <val> pref <int> <type>
#
try:
_ = self.__advance('filter', has_value=False)
protocol = self.__advance('protocol')
priostr = self.__advance('pref')
try:
prio = int(priostr)
except ValueError as valerr:
_logger.error("bad filter priority: %s (owner=%s)",
priostr, owner)
raise TcParsingError(
f'bad filter priority: {priostr}') from valerr
try:
filter_type = self.__iter.next_field()
except StopIteration as stopit:
_logger.error(
"filter line without filter type: '%s' (owner=%s)",
line, owner)
raise TcParsingError("missing filter type") from stopit
if filt_output is None:
filt_output = FilterOutput(protocol, prio,
filter_type, owner)
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
continue
if filt_output.matches(protocol, prio, filter_type):
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
continue
#
# Beginning of output for a new filter.
# Process the one we have.
#
self.__process_filter(filt_output)
except TcParsingError as parserr:
self.__parsing_errors += 1
parserr.set_line(line)
if not self.__allow_parsing_errors:
raise
_logger.warning("allowing filter parsing error: %s (owner=%s)",
parserr, owner)
filt_output = FilterOutput(protocol, prio, filter_type, owner)
filt_output.add_filter_line(line,
fields=list(self.__iter.get_field_iter()))
if filt_output is not None:
try:
self.__process_filter(filt_output)
except TcParsingError as parserr:
self.__parsing_errors += 1
parserr.set_line(filt_output.get_first_line())
if not self.__allow_parsing_errors:
raise
_logger.warning("allowing filter parsing error: %s (owner=%s)",
parserr, owner)
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
def get_filter_list(self) -> List['TrafficFilter']:
"""Returns a list of :class:`TrafficFilter` objects from the
parsed output
:meta private:
"""
return self.__filter_list
def get_filter(self) -> Optional['TrafficFilter']:
"""Returns the first :class:`TrafficFilter` from the parsed output,
or ``None`` if no filter was successfully parsed.
:meta private:
"""
return self.__filter_list[0] if self.__filter_list else None
def _group_split(lines: List[str], marker: str) -> List[List[str]]:
"""Given a list of lines, break them into groups of
consecutive lines, where the first line of each group starts with
the ``marker`` string.
"""
group_list = []
line_group = []
for line in lines:
if not line:
continue
if line.startswith(marker):
# Beginning of new line group
if line_group:
group_list.append(line_group)
line_group = []
line_group.append(line)
else:
if line_group:
line_group.append(line)
else:
raise TcParsingError(
f"first line does not start with '{marker}'", line=line)
if line_group:
group_list.append(line_group)
return group_list
[docs]class QClassOutput:
"""Helper class used for parsing ``tc class ls`` output for a single qclass
"""
def __init__(self, line_group: List[str]):
"""
:param line_group: list of lines, guaranteed not to be empty
"""
self.__line_iter = LineGroupIterator(line_group)
self.__handle = None
self.__parent_handle = None
self.__qclass_line = None
self.__qdisc_handle = None
[docs] def get_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the queuing class.
"""
return self.__handle
[docs] def get_parent_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the parent of the
queuing class.
"""
return self.__parent_handle
[docs] def get_qdisc_handle(self) -> Optional[Handle]:
"""Returns the :class:`Handle` of a (leaf) qdisc
"""
return self.__qdisc_handle
def get_linegroup_iter(self) -> LineGroupIterator:
"""Returns the LineGroupIterator for the tc output lines.
:meta private:
"""
return self.__line_iter
[docs] def get_class_line(self) -> str:
"""Returns the **tc(8)** output class line
"""
return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]:
"""Return an iterator for the (remaining) fields of the class line
"""
return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str:
"""Parse the first line and return a string with the qdisc type
(e.g. 'htb')
:meta private:
"""
if self.__qclass_line is not None:
raise TcError('attempt to parse first line twice')
self.__qclass_line = next(self.__line_iter)
field_iter = self.get_field_iter()
try:
#
# All class lines have the form:
#
# class <type> <handle> parent <handle> [leaf <qdisc-handle>] ...
#
# where the ... part is type-specific
#
if next(field_iter) != 'class':
raise TcParsingError("line does not start with 'class'")
qdisc_type = next(field_iter)
# The handle string may not include a major number, e.g.
# class mq :1 root
# or
# class mq :1 parent 10:
#
# We need to parse the parent handle before we can
# parse the class handle. If the parent is root, we assume
# the major number is 0 (so the class handle for the first
# line will be 0:1)
handle_str = next(field_iter)
try:
handle = Handle.parse(handle_str)
except TcParsingError:
handle = None
parent_field = next(field_iter)
if parent_field == 'root':
parent_major = 0 if handle is None else handle.major
self.__parent_handle = Handle.qdisc_handle(parent_major)
elif parent_field == 'parent':
self.__parent_handle = Handle.parse(next(field_iter))
else:
raise TcParsingError(
f"cannot determine class parent from field {parent_field}")
if handle is None:
self.__handle = Handle.parse(handle_str,
default_major=self.__parent_handle.major)
else:
self.__handle = handle
if field_iter.peek() == 'leaf':
_ = next(field_iter)
self.__qdisc_handle = Handle.parse(next(field_iter))
return qdisc_type
except StopIteration as stopit:
raise TcParsingError("not enough fields") from stopit
[docs]class QClassParser:
"""Helper class that creates :class:`QClass` objects from the
output of the **tc(8)** command.
"""
_qclass_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__qclass_list = []
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
[docs] @classmethod
def register_qclass(cls, ident: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`QClass` class).
This method is intended to be used for adding support for new
queuing discipline classes.
:param ident: the queuing class name that appears in the
``tc -s class ls`` output.
:param klass: the Python class for this queuing class
"""
cls._qclass_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`QClass` objects; the list can be accessed via
the :meth:`get_qclass_list` method.
:meta private:
"""
self.__qclass_list = []
for line_group in _group_split(tc_output_lines, 'class '):
qclass_output = QClassOutput(line_group)
try:
qdisc_type = qclass_output.parse_first_line()
klass = self._qclass_map.get(qdisc_type)
if klass is None:
if qdisc_type == 'sfq':
# SFQ is classless, so this should never happen;
# yet on CentOS 6.10, I observed the following in
# the output of 'tc class ls':
#
# class sfq 202:2c9 parent 202:
#
# 202: was a SFQ qdisc; the class minor number
# changed for every invocation of 'tc class ls'
_logger.warning("classless SFQ has a class: %s",
qclass_output.get_class_line())
continue
raise TcParsingError(f"unknown qdisc type {qdisc_type}")
qclass = klass.parse(qclass_output)
qclass._parse_stats(qclass_output.get_linegroup_iter())
self.__qclass_list.append(qclass)
except TcParsingError as parserr:
self.__parsing_errors += 1
line = qclass_output.get_class_line()
if not self.__allow_parsing_errors:
parserr.set_line(line)
raise
_logger.warning("%s: parsing error, line='%s'",
self.parse_output.__qualname__, line)
def get_qclass_list(self) -> List['QClass']:
"""Returns a list of :class:`QClass` objects from the
parsed output
:meta private:
"""
return self.__qclass_list
def get_qclass(self) -> Optional['QClass']:
"""Returns the first :class:`QClass` from the parsed output,
or ``None`` if no queuing class was successfully parsed.
:meta private:
"""
return self.__qclass_list[0] if self.__qclass_list else None
[docs]class QDiscOutput:
"""Helper class used for parsing ``tc qdisc ls`` output for a single qdisc
"""
def __init__(self, line_group: List[str]):
"""
:param line_group: list of lines, guaranteed not to be empty
"""
self.__line_iter = LineGroupIterator(line_group)
self.__handle = None
self.__parent_handle = None
self.__refcnt = None
self.__qdisc_line = None
[docs] def get_handle(self) -> Handle:
"""Returns the (parsed) :class:`Handle` of the queuing discipline.
"""
return self.__handle
[docs] def get_parent_handle(self) -> Optional[Handle]:
"""Returns the (parsed) :class:`Handle` of the parent of this
queueing discipline, or ``None`` if this is a root qdisc
"""
return self.__parent_handle
[docs] def get_refcnt(self) -> Optional[int]:
"""Returns reference count of qdisc (``None`` for non-root qdiscs,
and for some root qdiscs like ``mq``)
"""
return self.__refcnt
def get_linegroup_iter(self) -> LineGroupIterator:
"""Returns the LineGroupIterator for the tc output lines.
:meta private:
"""
return self.__line_iter
[docs] def get_qdisc_line(self) -> str:
"""Returns the **tc(8)** output qdisc line
"""
return self.__line_iter.get_last_line()
[docs] def get_field_iter(self) -> Iterator[str]:
"""Return an iterator for the (remaining) fields of the qdisc line
"""
return self.__line_iter.get_field_iter()
def parse_first_line(self) -> str:
"""Parse the first line and return a string with the qdisc type
(e.g. 'htb')
:meta private:
"""
if self.__qdisc_line is not None:
raise TcError('attempt to parse first line twice')
self.__qdisc_line = next(self.__line_iter)
field_iter = self.__line_iter.get_field_iter()
try:
#
# All qdisc lines have the form:
#
# qdisc <type> <handle> (root refcnt <num> |parent <handle>) ...
#
# where the ... part is type-specific
#
if next(field_iter) != 'qdisc':
raise TcParsingError("line does not start with 'qdisc'")
qdisc_type = next(field_iter)
self.__handle = Handle.parse(next(field_iter))
parent_field = next(field_iter)
if parent_field == 'root':
# Some root qdiscs like 'mq' do not provide a refcount
next_field = next(field_iter, None)
if next_field is None:
return qdisc_type
if next_field != 'refcnt':
raise TcParsingError(
f"found '{next_field}' after 'root' "
"instead of 'refcnt'")
self.__refcnt = int(next(field_iter))
elif parent_field == 'parent':
self.__parent_handle = Handle.parse(next(field_iter),
default_major=self.__handle.major)
else:
raise TcParsingError(
f"cannot determine qdisc parent from field {parent_field}")
return qdisc_type
except StopIteration as stopit:
raise TcParsingError("not enough fields") from stopit
[docs]class QDiscParser:
"""Helper class that creates :class:`QDisc` objects from the
output of the **tc(8)** command.
"""
_qdisc_class_map = {}
def __init__(self, allow_parsing_errors: bool):
self.__allow_parsing_errors = allow_parsing_errors
self.__parsing_errors = 0
self.__qdisc_list = []
def get_error_count(self) -> int:
"""Returns number of parsing errors encountered
:meta private:
"""
return self.__parsing_errors
[docs] @classmethod
def register_qdisc(cls, ident: str, klass) -> None:
"""Register the given class (which should be a subclass of
the :class:`QDisc` class).
This method is intended to be used for adding support for new
queuing disciplines.
:param ident: the qdisc name that appears in the
``tc -s qdisc ls`` output.
:param klass: the Python class for this queuing discipline
"""
cls._qdisc_class_map[ident] = klass
def parse_output(self, tc_output_lines: List[str]) -> None:
"""Parse the **tc(8)** output in ``tc_output_lines`` into a list
of :class:`QDisc` objects; the list can be accessed via
the :meth:`get_qdisc_list` method.
:meta private:
"""
#
# High-level logic:
# - parse the output into line groups, one for each qdisc; parsing
# is at the syntactic level only
# - for each group, determine the particular qdisc and let it
# parse the output
#
# Parsing requirements:
# 1. The 1st line needs to be partially parsed to determine the
# specific qdisc
# 2. The next 2 lines (with stats) can be parsed by common code
# because they are the same across qdisc's
# 3. Give the option to the qdisc-specific parsing code to parse
# the whole output
# 4. Give the option to the qdisc-specific code to use the
# common parsing code
#
# Based on the above, the QDiscOutput object contains the common
# parsing code. Consequently, it also holds parsed fields
# (like handles etc.)
#
self.__qdisc_list = []
for line_group in _group_split(tc_output_lines, 'qdisc '):
qdisc_output = QDiscOutput(line_group)
try:
qdisc_type = qdisc_output.parse_first_line()
klass = self._qdisc_class_map.get(qdisc_type)
if klass is None:
raise TcParsingError(f"unknown qdisc {qdisc_type}")
qdisc = klass.parse(qdisc_output)
qdisc._parse_stats(qdisc_output.get_linegroup_iter())
self.__qdisc_list.append(qdisc)
except TcParsingError as parserr:
self.__parsing_errors += 1
line = qdisc_output.get_qdisc_line()
if not self.__allow_parsing_errors:
parserr.set_line(line)
raise
_logger.warning("%s: parsing error, line='%s'",
self.parse_output.__qualname__, line)
def get_qdisc_list(self) -> List['QDisc']:
"""Returns a list of :class:`QDisc` objects from the
parsed output
:meta private:
"""
return self.__qdisc_list
def get_qdisc(self) -> Optional['QDisc']:
"""Returns the first :class:`QDisc` from the parsed output,
or ``None`` if no queuing discipline was successfully parsed.
:meta private:
"""
return self.__qdisc_list[0] if self.__qdisc_list else None