Source code for linuxnet.qos.qdiscs.ingress

# Copyright (c) 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-qos.
#
# linuxnet-qos is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-qos is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-qos. If not, see
# <https://www.gnu.org/licenses/>.

"""This module provides access to the ingress queueing discipline
"""

from typing import List

from ..deps import get_logger
from ..handle import Handle
from ..parsers import QDiscParser

from .qdisc import QDisc

_logger = get_logger('linuxnet.qos.qdiscs.ingress')


[docs]class IngressQDisc(QDisc): """This class provides access to the ``ingress`` queueing discipline. """ __HANDLE = Handle(0xffff, 0) __PARENT_HANDLE = Handle(0xffff, 0xfff1) def __init__(self, handle=None, parent_handle=None): """ :meta private: """ super().__init__(handle or self.__HANDLE, parent_handle or self.__PARENT_HANDLE) def __str__(self): return f"Ingress({self.get_handle()})"
[docs] @staticmethod def is_ingress(): """Always returns ``True`` """ return True
[docs] def qdisc_creation_args(self) -> List[str]: """Returns the arguments expected by the **tc(8)** command to create an ``ingress`` qdisc """ return []
@classmethod def parse(cls, qdisc_output) -> 'IngressQDisc': """Create a :class:`IngressQDisc` object from the output of the **tc(8)** command. :meta private: """ # # The fields are generated from a split of a line like this: # # qdisc ingress ffff: parent ffff:fff1 ---------------- # # The next field to be returned from field_iter is '-----------' # return IngressQDisc(qdisc_output.get_handle(), qdisc_output.get_parent_handle())
QDiscParser.register_qdisc('ingress', IngressQDisc)