# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.
"""This module provides access to the HTB queueing discipline
"""
import re
from typing import List, Optional, TextIO
from ..deps import get_logger
from ..exceptions import TcParsingError, TcBandwidthError
from ..handle import Handle
from ..tcunit import rate2str, bwstr2int, unitstr2int
from ..parsers import QDiscParser, QClassParser
from .qdisc import QDisc, QClass, QStats
_logger = get_logger("linuxnet.qos.qdiscs.htb")
[docs]class HTBQClassStats(QStats):
"""HTB-specific class stats (see :class:`QStats` for inherited stats)
"""
__LINE4_REGEX_PROG = re.compile(
r"lended: (\d+) borrowed: (\d+) giants: (\d+)")
# Tokens may be negative
__LINE5_REGEX_PROG = re.compile(
r"tokens: (-)?(\d+) ctokens: (\d+)")
def __init__(self):
super().__init__()
self.__lent = 0
self.__borrowed = 0
self.__giants = 0
# tokens/ctokens are measured in packet scheduler ticks;
# a tick duration is 64nsec.
# Sending a packet of size P when the rate is R takes T=P/R seconds
# and consumes T/64 tokens.
self.__tokens = 0
# Sending a packet of size P when the ceil is C takes T=C/R seconds
# and consumes T/64 ctokens.
self.__ctokens = 0
@property
def packets_lent(self) -> int:
"""Number of packets lent.
"""
return self.__lent
@property
def packets_borrowed(self) -> int:
"""Number of packets borrowed.
"""
return self.__borrowed
@property
def overlimits(self) -> int:
"""Number of times a rate limit was exceeded.
"""
return self.get_overlimits()
@property
def giant_packets(self) -> int:
"""Number of packets exceeding interface MTU
"""
return self.__giants
@property
def tokens(self) -> int:
"""Tokens available for transmitting at the guaranteed rate
(measured in packet scheduler ticks).
This value is not a statistic; it reflects the class state at
the time the stats were obtained.
"""
return self.__tokens
@property
def ctokens(self) -> int:
"""Tokens available for transmitting at the maximum (ceil) rate
(measured in packet scheduler ticks)
This value is not a statistic; it reflects the class state at
the time the stats were obtained.
"""
return self.__ctokens
def __parse_fourth_line(self, line: str) -> bool:
"""Initialize attributes from ``line``, which is the 4th line
of ``tc class ls`` output
"""
# The line looks like this:
#
# lended: 12189193 borrowed: 6524195 giants: 0
#
match = self.__LINE4_REGEX_PROG.match(line.strip())
if match is None:
_logger.warning("4th line not parsable: %s", line)
return False
self.__lent = int(match.group(1))
self.__borrowed = int(match.group(2))
self.__giants = int(match.group(3))
return True
def __parse_fifth_line(self, line: str) -> bool:
"""Initialize attributes from ``line``, which is the 5th line
of ``tc class ls`` output
"""
# The line looks like this:
#
# tokens: 4953344 ctokens: 309594
#
match = self.__LINE5_REGEX_PROG.match(line.strip())
if match is None:
_logger.warning("5th line not parsable: %s", line)
return False
self.__tokens = int(match.group(2))
if match.group(1) is not None:
self.__tokens = -self.__tokens
self.__ctokens = int(match.group(3))
return True
def init_from_output(self, line_group_iter: 'LineGroupIter') -> bool:
"""This method is used when parsing the output of ``tc -s qdisc ls``
to extract statistics information.
The iterator returns the lines of the output of ``tc -s qdisc ls``
for a single queuing class.
:meta private:
"""
#
# The first line of the output has already been consumed,
# and the 2nd line is the next to be returned.
# Parent class will consume the 2nd and 3rd line
#
if not super().init_from_output(line_group_iter):
return False
try:
line = next(line_group_iter)
if not self.__parse_fourth_line(line):
return False
line = next(line_group_iter)
if not self.__parse_fifth_line(line):
return False
except StopIteration:
return False
except ValueError as valerr:
_logger.warning("bad value in stats line: %s (line=%s)",
valerr, line)
return False
return True
[docs] def dump(self, outfile: TextIO, width: Optional[int] =None) -> None:
"""Dump stats to ``outfile``.
There is one stat per line output. Each line has the format::
header: value
The ``header:`` part occupies at least ``width`` characters.
"""
super().dump(outfile, width)
width = width or self.HEADER_WIDTH
print(f"{'Overlimits:':{width}} {self.__overlimits}", file=outfile)
print(f"{'Lent:':{width}} {self.__lent}", file=outfile)
print(f"{'Borrowed:':{width}} {self.__borrowed}", file=outfile)
print(f"{'Giants:':{width}} {self.__giants}", file=outfile)
print(f"{'Tokens:':{width}} {self.__tokens}", file=outfile)
print(f"{'CTokens:':{width}} {self.__ctokens}", file=outfile)
[docs]class HTBQClass(QClass): # pylint: disable=too-many-instance-attributes
"""A class of the HTB qdisc
"""
def __init__(self, # pylint: disable=too-many-arguments
class_handle: Handle, parent_handle: Handle,
*,
rate: int, ceil: Optional[int] =None,
prio: Optional[int] =None,
burst: Optional[int] =None,
cburst: Optional[int] =None,
quantum: Optional[int] = None,
class_name: Optional[str] =None):
"""
:param class_handle: handle of this :class:`HTBQClass`
:param parent_handle: handle of parent :class:`HTBQClass` or
:class:`HTBQDisc`
:param rate: guaranteed rate (unit: bits/sec)
:param ceil: max rate (unit: bits/sec)
:param prio: priority
:param burst: amount of bytes that can be sent at ``ceil`` speed
:param cburst: amount of bytes that can be sent at interface speed
:param quantum: transmission quantum (aka interface packet size)
in bytes; the kernel computes this from the HTB queuing
discipline ``r2q`` if not explicitly specified at HTB class
creation time
"""
if class_name is None:
# NB: class handle not included because it is automatically
# appended to class name in output (see get_description())
class_name = "HTBQClass"
super().__init__(class_handle, parent_handle, class_name=class_name)
self.__rate = rate
self.__ceil = ceil if ceil is not None else rate
# We map None to -1
self.__prio = prio if prio is not None else -1
self.__burst = burst
self.__cburst = cburst
self.__quantum = quantum
if self.__rate > self.__ceil:
_logger.error("%s: %s: rate (%d) > ceil (%d)",
self.__init__.__qualname__,
self,
self.__rate,
self.__ceil)
raise TcBandwidthError(
f'{self} has rate {self.__rate} > ceil {self.__ceil}')
self.__residual_rate = rate
self.__stats = None
def __str__(self):
return f"HTBQClass({self.get_handle()})"
[docs] def get_description(self) -> str:
"""Returns a string describing the class and its attributes
"""
class_name = self.get_class_name()
if class_name is None:
retval = str(self)
else:
retval = f'{class_name}({self.get_handle()}) HTB'
retval += ' rate ' + rate2str(self.__rate)
if self.__ceil is not None:
retval += '/' + rate2str(self.__ceil)
if self.__prio >= 0:
retval += f' prio {self.__prio}'
return retval
[docs] def qclass_creation_args(self) -> List[str]:
"""Returns the tc arguments to create this HTB class
"""
args = ['htb', 'rate', str(self.__rate)]
if self.__ceil is not None:
args.extend(['ceil', str(self.__ceil)])
if self.__prio >= 0:
args.extend(['prio', str(self.__prio)])
# For the burst/cburst parameters, if an explicit burst/cburst is
# not specified, tc defaults to the formula
# R / HZ + MTU
# where
# - HZ is the system clock frequency (typical 10ms quantum implies
# a frequency of 100 Hz)
# - MTU defaults to 1600; it can be changed using the undocumented
# 'mtu' parameter when adding a htb class via tc
# - R is the specified rate/ceil in bytes (i.e. it is divided by 8)
if self.__burst is not None:
args.extend(['burst', str(self.__burst)])
if self.__cburst is not None:
args.extend(['cburst', str(self.__cburst)])
if self.__quantum is not None:
args.extend(['quantum', str(self.__quantum)])
return args
[docs] def get_priority(self) -> int:
"""Returns the class priority, or -1 if the class
has no priority specified
"""
return self.__prio
[docs] def get_rate(self) -> int:
"""Returns the bandwidth guaranteed to this class
"""
return self.__rate
[docs] def get_ceil(self) -> Optional[int]:
"""Returns the maximum bandwidth that may be consumed
by this class.
"""
return self.__ceil
[docs] def get_burst(self) -> Optional[int]:
"""Returns the burst (bytes)
"""
return self.__burst
[docs] def get_cburst(self) -> Optional[int]:
"""Returns the cburst (bytes)
"""
return self.__cburst
[docs] def get_quantum(self) -> Optional[int]:
"""Returns the quantum (bytes)
"""
return self.__quantum
[docs] def get_residual_rate(self) -> int:
"""Returns the residual rate, which is the guaranteed
bandwidth left at this class after all its children
have been allocated their guaranteed bandwidth.
"""
return self.__residual_rate
[docs] def get_stats(self) -> Optional[HTBQClassStats]:
"""Returns class stats (an :class:`HTBQClassStats` instance) or
``None`` if no stats are available.
"""
return self.__stats
def __residual_rate_check(self, qclass):
"""Check that there is enough rate for a new class
"""
child_rate = qclass.get_rate()
if child_rate > self.__residual_rate:
_logger.error("%s: %s: rate (%d) of %s exceeds residual rate (%d)",
self.__residual_rate_check.__qualname__,
self,
child_rate,
qclass,
self.__residual_rate)
raise TcBandwidthError(
f'rate {child_rate} of {qclass} exceeds residual '
f'rate {self.__residual_rate} of {self}')
def _add_child_class(self, qclass: 'HTBQClass'):
"""Add ``qclass`` as a child of this :class:`HTBQClass`.
The residual rate is updated accordingly.
"""
super()._add_child_class(qclass)
self.__residual_rate -= qclass.get_rate()
if self.__residual_rate < 0:
_logger.warning(
"%s: %s: negative residual rate after addition of %s",
self._add_child_class.__qualname__,
self,
qclass)
def _remove_child_class(self, qclass: 'HTBQClass'):
"""Remove ``qclass`` from the children of this :class:`HTBQClass`.
The residual rate is updated accordingly.
"""
super()._remove_child_class(qclass)
self.__residual_rate += qclass.get_rate()
[docs] def child_admission_check(self, new_child_class: 'HTBQClass') -> None:
"""Perform sanity tests before adding a child class:
- child is an :class:`HTBQClass`
- child ceil <= parent ceil
- aggr children rate <= parent rate
Raises a :exc:`TcBandwidthError` if the check fails
"""
if not isinstance(new_child_class, HTBQClass):
_logger.error("%s: %s: class-mismatch: expected=HTBQClass found=%s",
self.child_admission_check.__qualname__,
self,
type(new_child_class))
raise TcBandwidthError(f"not a HTBQClass: {new_child_class}")
self.__residual_rate_check(new_child_class)
child_ceil = new_child_class.get_ceil()
if child_ceil > self.__ceil:
_logger.error("%s: %s: ceil (%d) of '%s' exceeds our ceil (%d)",
self.child_admission_check.__qualname__,
self,
child_ceil,
new_child_class,
self.__ceil)
raise TcBandwidthError(
f"{new_child_class} ceil {child_ceil} > "
f"parent {self} ceil {self.__ceil}")
def _parse_stats(self, line_group_iter) -> None:
"""Parse queuing stats
"""
stats = HTBQClassStats()
if stats.init_from_output(line_group_iter):
self.__stats = stats
@classmethod
def parse(cls, qclass_output) -> 'HTBQClass':
"""Create a :class:`HTBQClass` object from the output of **tc(8)**.
Raises :class:`TcParsingError` if unable to parse
:meta private:
"""
field_iter = qclass_output.get_field_iter()
#
# The iterator returns the fields of a line like this:
#
# class htb 1:11 parent 1:1 prio 0 rate 500bit ceil 500bit burst 1600b cburst 1600b
#
# or
#
# class htb 1:1 root rate 100000Kbit ceil 100000Kbit burst 1600b cburst 1600b
#
# The fields 'class', 'htb', 'root', 'parent', 'leaf' (and their values)
# have been consumed by the caller.
#
try:
prio = None
rate = None
ceil = None
burst = None
cburst = None
for field in field_iter:
if field == 'prio':
prio = int(next(field_iter))
elif field == 'rate':
rate = bwstr2int(next(field_iter))
elif field == 'ceil':
ceil = bwstr2int(next(field_iter))
elif field == 'burst':
burst = unitstr2int(next(field_iter), 'b')
elif field == 'cburst':
cburst = unitstr2int(next(field_iter), 'b')
else:
raise TcParsingError(f"unknown field '{field}'")
htb_class = HTBQClass(qclass_output.get_handle(),
qclass_output.get_parent_handle(),
rate=rate,
ceil=ceil, prio=prio,
burst=burst, cburst=cburst)
return htb_class
except ValueError as valerr:
raise TcParsingError(f"bad value for {field}") from valerr
[docs]class HTBQDisc(QDisc):
"""This class provides access to the Hierarchy Token Bucket
queueing discipline of Linux (see **tc-htb(8)**).
"""
def __init__(self, qdisc_handle: Handle, parent_handle: Optional[Handle],
*,
default_class_minor: Optional[int] =None,
r2q: Optional[int] =None):
"""
:param qdisc_handle: handle of this queuing discipline
:param parent_handle: handle of parent, ``None`` if this is a
root queuing discipline
:param default_class_minor: minor number of default class
:param r2q: when a queuing class of this queuing discipline has
not an explicitly specified quantaum, its quantum is
computed as ``rate/r2q``
"""
super().__init__(qdisc_handle, parent_handle)
if default_class_minor is not None:
self.__default_class_handle = Handle(qdisc_handle.major,
default_class_minor)
else:
self.__default_class_handle = None
self.__r2q = r2q
def __str__(self):
return f"HTBQDisc({self.get_handle()})"
[docs] def get_default_class_handle(self) -> Optional[Handle]:
"""Get the handle of the default class
"""
return self.__default_class_handle
[docs] def get_r2q(self) -> Optional[int]:
"""Returns the rate-to-quantum divisor
"""
return self.__r2q
[docs] def qdisc_creation_args(self) -> List[str]:
"""Returns the arguments expected by tc to create
a HTB qdisc
"""
args = ['htb']
if self.__default_class_handle is not None:
args.extend(['default', f'{self.__default_class_handle.minor:x}'])
if self.__r2q is not None:
args.extend(['r2q', f'{self.__r2q:d}'])
return args
[docs] def get_description(self) -> str:
"""Returns a string describing the queuing discipline and
its attributes
"""
retval = super().get_description()
if self.__default_class_handle:
retval += f' default {self.__default_class_handle}'
if self.__r2q:
retval += f' r2q {self.__r2q}'
return retval
@classmethod
def parse(cls, qdisc_output) -> 'HTBQDisc':
"""Create a HTBQDisc object from the output of the **tc(8)** command.
:meta private:
"""
field_iter = qdisc_output.get_field_iter()
#
# The fields are generated from a split of a line like this:
#
# qdisc htb 1: root refcnt 2 r2q 10 default 10 direct_packets_stat 0
#
# The next field to be returned from field_iter is 'r2q'
#
default_class_minor = None
r2q = None
for field in field_iter:
if field == 'r2q':
r2q = int(next(field_iter))
elif field in ('direct_packets_stat', 'direct_qlen'):
_ = next(field_iter)
elif field == 'default':
try:
default_class_minor = int(next(field_iter), 16)
except ValueError as valerr:
raise TcParsingError(
"bad default class minor number") from valerr
else:
raise TcParsingError(f"unknown htb argument '{field}'")
htb = HTBQDisc(qdisc_output.get_handle(),
qdisc_output.get_parent_handle(),
default_class_minor=default_class_minor, r2q=r2q)
return htb
QDiscParser.register_qdisc('htb', HTBQDisc)
QClassParser.register_qclass('htb', HTBQClass)