Source code for linuxnet.qos.filters.filter

# Copyright (c) 2021, 2022, 2023, 2025, Panagiotis Tsirigotis

# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.

"""This module provides traffic filter classes
"""

from typing import List, Optional

from ..deps import get_logger
from ..exceptions import TcError
from ..handle import Handle

_logger = get_logger("linuxnet.qos.filters.filter")


[docs]class TrafficFilter: # pylint: disable=too-many-instance-attributes """Generic filter class. It cannot be instantiated. It is subclassed based on filter type. """ def __init__(self, # pylint: disable=too-many-arguments protocol: str, prio: Optional[int], filter_type: str, *, dest_class_handle: Optional[Handle] =None, filter_name: Optional[str] =None): """ :param protocol: protocol :param prio: optional filter priority (a number); defaults to -1 :param filter_type: a string indicating one of the currently supported filter types: ``u32``, ``fw`` :param dest_class_handle: :class:`Handle` of class where this filter will direct traffic :param filter_name: user-friendly filter name """ self.__protocol = protocol self.__prio = prio if prio is not None else -1 self.__filter_type = filter_type self.__instantiated = False self.__dest_class_handle = dest_class_handle self.__filter_name = filter_name self.__action_list = [] def __str__(self): prio = 'NOPRIO' if self.__prio < 0 else f'0x{self.__prio:x}' return f'Filter({self.__protocol}/{prio}/{self.__filter_type})'
[docs] def filter_creation_args(self) -> List[str]: """Returns a list of **tc(8)** arguments to create this filter """ raise NotImplementedError
[docs] def is_instantiated(self) -> bool: """Returns ``True`` if the filter is in the kernel """ return self.__instantiated
def _mark_as_instantiated(self) -> None: """This method is intended to be used by the code that parses an existing qdisc configuration. """ self.__instantiated = True
[docs] def get_description(self) -> str: """Returns a string with detail info about the filter """ return str(self)
[docs] def get_match_name(self) -> Optional[str]: """Returns a string with the name that describes the traffic matched by the filter, or ``None``. This method should be defined in derived classes. """ _ = self
[docs] def get_filter_type(self) -> str: """Returns the filter type """ return self.__filter_type
[docs] def get_prio(self) -> int: """Returns the filter priority """ return self.__prio
[docs] def set_prio(self, prio: int): """Set the filter priority. Raises :exc:`TcError` if the filter is instantiated. """ if self.__instantiated: _logger.error( "%s: %s: attempt to change priority of instantiated filter", self.set_prio.__qualname__, self) raise TcError(f"attempt to change filter {self} priority") self.__prio = prio
[docs] def get_filter_name(self) -> Optional[str]: """Returns the filter name """ return self.__filter_name
[docs] def set_filter_name(self, filter_name: str): """Set the filter name """ self.__filter_name = filter_name
[docs] def get_dest_handle(self) -> Optional[Handle]: """Returns the :class:`Handle` of the :class:`QClass` where this filter will send traffic """ return self.__dest_class_handle
[docs] def set_dest_handle(self, handle: Handle): """Sets the handle of the class where this filter will send traffic. Raises :exc:`TcError` if the filter is instantiated. """ if self.__instantiated: _logger.error( "%s: %s: attempt to change destination of instantiated filter", self.set_dest_handle.__qualname__, self) raise TcError(f"attempt to change filter {self} destination") self.__dest_class_handle = handle
[docs] def get_actions(self) -> List['TrafficAction']: """Returns the action list for this filter """ return self.__action_list
[docs] def add_action(self, action: 'TrafficAction') -> None: """Add a filter action. Raises :exc:`TcError` if the filter is instantiated. """ if self.__instantiated: _logger.error( "%s: %s: attempt to add action to an instantiated filter", self.add_action.__qualname__, self) raise TcError(f"attempt to add action to filter {self}") self.__action_list.append(action)
def _instantiate(self, owner) -> None: """Instantiate a filter by invoking the **tc(8)** command. :param owner: :class:`QClass`/:class:`QDisc` that is the parent of this filter """ if self.__instantiated: raise TcError('filter is already instantiated') if self.__dest_class_handle is None: raise TcError('filter has no destination class handle') config = owner.get_config() if config is None: raise TcError(f'filter owner {owner} not instantiated') cmd = ['tc', 'filter', 'add', 'dev', config.get_interface(), 'parent', str(owner.get_handle()), 'protocol', self.__protocol] if self.__prio >= 0: cmd += ['prio', f'{self.__prio:d}'] else: _logger.warning( "instantiating filter %s with no priority -- " "it will not be possible to delete", self) cmd += self.filter_creation_args() for action in self.__action_list: cmd += action.action_creation_args() config.tc_run(cmd, 'filter creation') self.__instantiated = True def _uninstantiate(self, owner) -> None: """Uninstantiate a filter by invoking the **tc(8)** command. :param owner: :class:`QClass`/:class:`QDisc` that is the parent of this filter """ if not self.__instantiated: raise TcError('filter has not been instantiated') config = owner.get_config() if config is None: # If the owner is no longer instantiated, the filter is gone too. _logger.info("%s: %s: owner not instantiated: %s", self._uninstantiate.__qualname__, self, owner) return if self.__prio < 0: _logger.error("%s: unable to delete filter '%s': missing priority", self._uninstantiate.__qualname__, self) raise TcError( f'unable to delete filter {self} ' 'because of missing priority') cmd = ['tc', 'filter', 'del', 'dev', config.get_interface(), 'parent', str(owner.get_handle()), 'prio', f'{self.__prio:d}'] config.tc_run(cmd, 'filter deletion')