Source code for linuxnet.qos.actions.skbedit

# Copyright (c) 2025, Panagiotis Tsirigotis

# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides access to the ``skbedit`` action.
"""

from typing import List, Optional

from ..deps import get_logger
from ..exceptions import TcParsingError
from ..parsers import TrafficFilterParser

from .action import TrafficAction, ActionDecision

_logger = get_logger("linuxnet.qos.actions.skbedit")


[docs]class SkbeditAction(TrafficAction): """This class supports the **tc(8)** ``skbedit`` action as documented in :manpage:`tc-skbedit(8)`. """ KIND = 'skbedit' def __init__(self, action_index: Optional[int] =None, decision: Optional[ActionDecision] =None, *, queue_mapping: Optional[int] =None, priority: Optional[str] =None, mark: Optional[int] =None, mask: Optional[int] =None, packet_type: Optional[str] =None, inheritdsfield: Optional[bool] =False): """ :param action_index: an integer that effectively names the action :param decision: control action to take :param queue_mapping: specify transmit queue for packet :param priority: priority :param mark: firewall mark :param mask: mask for the ``mark`` :param packet_type: packet type :param inheritdsfield: use the ``DiffServ`` field to override the classification decision """ super().__init__(action_index, decision) self.__queue_mapping = queue_mapping self.__priority = priority self.__mark = mark self.__mask = mask self.__packet_type = packet_type self.__inheritdsfield = inheritdsfield
[docs] @classmethod def get_kind(cls) -> str: """Returns ``skbedit`` """ return cls.KIND
[docs] def get_queue_mapping(self) -> Optional[int]: """Returns the queue mapping, if any """ return self.__queue_mapping
[docs] def get_priority(self) -> Optional[str]: """Returns the priority, if any """ return self.__priority
[docs] def get_mark(self) -> Optional[int]: """Returns the firewall mark, if any """ return self.__mark
[docs] def get_mask(self) -> Optional[int]: """Returns the mask for the firewall mark, if any """ return self.__mask
[docs] def get_packet_type(self) -> Optional[str]: """Returns the packet type, if any """ return self.__packet_type
[docs] def inheritdsfield(self) -> bool: """Returns the **inheritdsfield** value """ return self.__inheritdsfield
def _action_specific_args(self) -> List[str]: """Returns a list of **tc(8)** arguments to create this action """ args = [] if self.__queue_mapping: args.extend(['queue_mapping', str(self.__queue_mapping)]) if self.__priority: args.extend(['priority', self.__priority]) if self.__mark: markstr = f'0x{self.__mark:x}' if self.__mask: markstr += f'/0x{self.__mark:x}' args.extend(['mark', markstr]) if self.__packet_type: args.extend(['ptype', self.__packet_type]) if self.__inheritdsfield: args.append('inheritdsfield') return args @classmethod def parse(cls, # pylint: disable=too-many-locals fields: List[str], _: 'LineGroupIterator') -> TrafficAction: """The ``fields`` list holds the line fields after ``skbedit``. If the fields are successfully parsed, a :class:`SkbeditAction` instance is returned. We expect all skbedit-specific fields to be on the same line, so we do not make use of the line iterator. :meta private: """ # # The expected format of the fields is: # # queue_mapping 3 priority 1:2 mark 3/0xff ptype host inheritdsfield pipe # field_iter = iter(fields) queue_mapping = None priority = None packet_type = None mark = None mask = None inheritdsfield = False common_args = {} try: for field in field_iter: if field == 'queue_mapping': queue_mapping = int(next(field_iter)) elif field == 'priority': priority = next(field_iter) elif field == 'mark': value = next(field_iter) if '/' in value: markstr, maskstr = value.split('/', 1) mark = int(markstr, 0) mask = int(maskstr, 0) else: mark = int(value, 0) elif field == 'ptype': packet_type = next(field_iter) elif field == 'inheritdsfield': inheritdsfield = True elif cls._parse_common_fields(field, field_iter, common_args): pass else: raise TcParsingError(f"unknown skbedit param: {field}") except ValueError as valerr: raise TcParsingError(f"bad skbedit {field} value") from valerr return SkbeditAction( action_index=None, **common_args, queue_mapping=queue_mapping, priority=priority, mark=mark, mask=mask, packet_type=packet_type, inheritdsfield=inheritdsfield)
TrafficFilterParser.register_action(action_name=SkbeditAction.KIND, klass=SkbeditAction)