# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.
"""This module provides access to the Fair Queuing (FQ) with
Controlled Delay (fq_codel) queueing discipline
"""
import re
from typing import List, Optional, TextIO
from ..deps import get_logger
from ..exceptions import TcParsingError
from ..handle import Handle
from ..tcunit import datastr2int, timestr2float, unitstr2int
from ..parsers import QDiscParser
from .qdisc import QDisc, QStats
_logger = get_logger("linuxnet.qos.qdiscs.fq_codel")
[docs]class FQCoDelQDiscStats(QStats):
"""FQCoDelQDisc stats
"""
__LINE4_REGEX_PROG = re.compile(
r"maxpacket (\d+) drop_overlimit (\d+) "
r"new_flow_count (\d+) ecn_mark (\d+)")
__LINE5_REGEX_PROG = re.compile(
r"new_flows_len (-)?(\d+) old_flows_len (\d+)")
def __init__(self):
super().__init__()
self.__maxpacket = 0
self.__drop_overlimit = 0
self.__new_flow_count = 0
self.__ecn_mark = 0
self.__new_flows_len = 0
self.__old_flows_len = 0
@property
def maxpacket(self) -> int:
"""Size of largest packet seen
"""
return self.__maxpacket
@property
def drop_overlimit(self) -> int:
"""Number of packets dropped because the queue was full.
"""
return self.__drop_overlimit
@property
def new_flow_count(self) -> int:
"""Returns number of times a new flow was identified.
"""
return self.__new_flow_count
@property
def ecn_packets(self) -> int:
"""Number of packets that were ECN marked (instead of dropped)
"""
return self.__ecn_mark
@property
def new_flows_len(self) -> int:
"""Number of new flows.
This value is not a statistic; it reflects the qdisc state at
the time the stats were obtained.
"""
return self.__new_flows_len
@property
def old_flows_len(self) -> int:
"""Number of old flows.
This value is not a statistic; it reflects the qdisc state at
the time the stats were obtained.
"""
return self.__old_flows_len
def __parse_fourth_line(self, line: str) -> bool:
"""Initialize attributes from ``line``, which is the 4th line
of ``tc qdisc ls`` output
"""
# The line looks like this:
#
# maxpacket 98 drop_overlimit 0 new_flow_count 5 ecn_mark 0
#
match = self.__LINE4_REGEX_PROG.match(line.strip())
if match is None:
_logger.warning("4th line not parsable: %s", line)
return False
self.__maxpacket = int(match.group(1))
self.__drop_overlimit = int(match.group(2))
self.__new_flow_count = int(match.group(3))
self.__ecn_mark = int(match.group(4))
return True
def __parse_fifth_line(self, line: str) -> bool:
"""Initialize attributes from ``line``, which is the 5th line
of ``tc qdisc ls`` output
"""
# The line looks like this:
#
# new_flows_len 0 old_flows_len 0
#
match = self.__LINE5_REGEX_PROG.match(line.strip())
if match is None:
_logger.warning("5th line not parsable: %s", line)
return False
self.__new_flows_len = int(match.group(2))
if match.group(1):
self.__new_flows_len = -self.__new_flows_len
self.__old_flows_len = int(match.group(3))
return True
def init_from_output(self, line_group_iter: 'LineGroupIterator') -> bool:
"""This method is used when parsing the output of ``tc -s qdisc ls``
to extract statistics information.
The iterator returns the lines of the output of ``tc -s qdisc ls``
for a single queuing class.
:meta private:
"""
#
# The first line of the output has already been consumed,
# and the 2nd line is the next to be returned.
# Parent class will consume the 2nd and 3rd line
#
if not super().init_from_output(line_group_iter):
return False
try:
line = next(line_group_iter)
if not self.__parse_fourth_line(line):
return False
line = next(line_group_iter)
if not self.__parse_fifth_line(line):
return False
except StopIteration:
return False
except ValueError as valerr:
_logger.warning("bad value in stats line: %s (line=%s)",
valerr, line)
return False
return True
[docs] def dump(self, outfile: TextIO, width: Optional[int] =None) -> None:
"""Dump stats to ``outfile``.
There is one stat per line output. Each line has the format::
header: value
The ``header:`` part occupies at least ``width`` characters.
"""
super().dump(outfile, width)
width = width or self.HEADER_WIDTH
print(f"{'Maxpacket:':{width}} {self.__maxpacket}", file=outfile)
print(f"{'Drop-overlimits':{width}} {self.__drop_overlimit}",
file=outfile)
print(f"{'New-flows:':{width}} {self.__new_flow_count}", file=outfile)
print(f"{'ECN-mark:':{width}} {self.__ecn_mark}", file=outfile)
print(f"{'New-flows-len:':{width}} {self.__new_flows_len}",
file=outfile)
print(f"{'Old-flows-len:':{width}} {self.__old_flows_len}",
file=outfile)
[docs]class FQCoDelQDisc(QDisc): # pylint: disable=too-many-instance-attributes
"""This class provides access to the Fair Queuing (FQ) with
Controlled Delay (fq_codel) queueing discipline (see **tc-fq_codel(8)**)
"""
def __init__(self, qdisc_handle: Handle, parent_handle: Optional[Handle],
*,
limit: Optional[int], flows: Optional[int],
quantum: Optional[int], target: Optional[float],
interval: Optional[float],
memory_limit: Optional[int], ecn: Optional[bool]):
"""
:param qdisc_handle: handle of this :class:`FQCoDelQDisc`
:param parent_handle: handle of parent, ``None`` if this is a
root queuing discipline
:param limit: as documented in **tc-fq_codel(8)**
:param flows: as documented in **tc-fq_codel(8)**
:param quantum: as documented in **tc-fq_codel(8)**
:param target: as documented in **tc-fq_codel(8)**
:param interval: as documented in **tc-fq_codel(8)**
:param memory_limit: as documented in **tc-fq_codel(8)**
:param ecn: as documented in **tc-fq_codel(8)**
"""
super().__init__(qdisc_handle, parent_handle)
self.__limit = limit
self.__flows = flows
self.__quantum = quantum
self.__target = target # in ms
self.__interval = interval # in ms
self.__memory_limit = memory_limit # in bytes
self.__ecn: bool = ecn
self.__drop_batch = None
self.__stats = None
def __str__(self):
return f"FQCoDelQDisc({self.get_handle()})"
[docs] def get_packet_limit(self) -> Optional[int]:
"""Returns packet limit
"""
return self.__limit
[docs] def get_flows(self) -> Optional[int]:
"""Returns number of flows
"""
return self.__flows
[docs] def get_quantum(self) -> Optional[int]:
"""Returns quantum
"""
return self.__quantum
[docs] def get_memory_limit(self) -> Optional[int]:
"""Returns memory limit in bytes
"""
return self.__memory_limit
[docs] def get_queue_delay_target(self) -> Optional[int]:
"""Returns target for the minimum queue delay (in ms)
"""
return self.__target
[docs] def get_queue_delay_interval(self) -> Optional[int]:
"""Returns time interval in which to measure queue delay (in ms)
"""
return self.__interval
[docs] def get_ecn(self) -> Optional[bool]:
"""Returns the ``ecn`` value
"""
return self.__ecn
[docs] def qdisc_creation_args(self) -> List[str]:
"""Returns the arguments expected by the **tc(8)** command to create
a ``fq_codel`` qdisc
"""
args = ['fq_codel']
if self.__limit is not None:
args.extend(['limit', f'{self.__limit}p'])
if self.__flows is not None:
args.extend(['flows', f'{self.__limit:d}'])
if self.__quantum is not None:
args.extend(['quantum', f'{self.__quantum:d}'])
if self.__target is not None:
args.extend(['target', f'{self.__target:.1f}ms'])
if self.__interval is not None:
args.extend(['interval', f'{self.__interval:.1f}ms'])
if self.__memory_limit is not None:
args.extend(['memory_limit', f'{self.__memory_limit:d}b'])
if self.__ecn is not None:
if self.__ecn:
args.append('ecn')
else:
args.append('noecn')
return args
[docs] def get_description(self) -> str:
"""Returns a string describing the qdisc and its attributes
"""
retval = super().get_description()
if self.__limit is not None:
retval += f' {self.__limit}p'
if self.__flows is not None:
retval += f' {self.__flows}'
if self.__quantum is not None:
retval += f' {self.__quantum}'
if self.__target is not None:
retval += f' {self.__target:.1f}ms'
if self.__interval is not None:
retval += f' {self.__interval:.1f}ms'
if self.__memory_limit is not None:
retval += f' {self.__memory_limit}b'
if self.__ecn is not None:
if self.__ecn:
retval += ' ecn'
else:
retval += ' noecn'
if self.__drop_batch is not None:
retval += f' {self.__drop_batch}'
return retval
[docs] def get_stats(self) -> Optional[FQCoDelQDiscStats]:
"""Returns qdisc stats (an :class:`FQCoDelQDiscStats` instance) or
``None`` if no stats are available.
"""
return self.__stats
def _parse_stats(self, line_group_iter) -> None:
"""Parse queuing stats.
"""
stats = FQCoDelQDiscStats()
if stats.init_from_output(line_group_iter):
self.__stats = stats
def _set_drop_batch(self, drop_batch: int) -> None:
"""Set the drop_batch attribute.
This is a private method used only by the parsing code because
this fq_codel parameter is not documented in tc-fq_codel(8)
"""
self.__drop_batch = drop_batch
# pylint: disable=too-many-branches
@classmethod
def parse(cls, qdisc_output) -> 'FQCoDelQDisc':
"""Create a :class:`FQCoDelQDisc` object from the output of the
**tc(8)** command.
:meta private:
"""
field_iter = qdisc_output.get_field_iter()
# pylint: disable=line-too-long
#
# The fields are generated from a split of a line like this:
#
# qdisc fq_codel 0: parent :2 limit 10240p flows 1024 quantum 1514 target 5.0ms interval 100.0ms memory_limit 32Mb ecn drop_batch 64
#
limit = None
flows = None
quantum = None
target = None
interval = None
memory_limit = None
ecn = None
drop_batch = None
try:
for field in field_iter:
if field == 'limit':
limit = unitstr2int(next(field_iter), 'p')
elif field == 'flows':
flows = int(next(field_iter))
elif field == 'quantum':
quantum = int(next(field_iter))
elif field == 'target':
target = timestr2float(next(field_iter))
elif field == 'interval':
interval = timestr2float(next(field_iter))
elif field == 'memory_limit':
memory_limit = datastr2int(next(field_iter))
elif field == 'ecn':
ecn = True
elif field == 'noecn':
ecn = False
elif field == 'drop_batch':
drop_batch = int(next(field_iter))
else:
raise TcParsingError(f"unknown field '{field}'")
except ValueError as valerr:
raise TcParsingError(f"bad value for field {field}") from valerr
qdisc = FQCoDelQDisc(
qdisc_output.get_handle(),
qdisc_output.get_parent_handle(),
limit=limit, flows=flows, target=target,
quantum=quantum, interval=interval,
memory_limit=memory_limit, ecn=ecn)
if drop_batch is not None:
qdisc._set_drop_batch(drop_batch)
return qdisc
# pylint: enable=too-many-branches
QDiscParser.register_qdisc('fq_codel', FQCoDelQDisc)