You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/dns/edns.py

517 lines
15 KiB

# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2009-2017 Nominum, Inc.
6 years ago
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""EDNS Options"""
import binascii
import math
import socket
import struct
from typing import Any, Dict, Optional, Union
6 years ago
import dns.enum
import dns.inet
import dns.rdata
import dns.wire
6 years ago
class OptionType(dns.enum.IntEnum):
#: NSID
NSID = 3
#: DAU
DAU = 5
#: DHU
DHU = 6
#: N3U
N3U = 7
#: ECS (client-subnet)
ECS = 8
#: EXPIRE
EXPIRE = 9
#: COOKIE
COOKIE = 10
#: KEEPALIVE
KEEPALIVE = 11
#: PADDING
PADDING = 12
#: CHAIN
CHAIN = 13
#: EDE (extended-dns-error)
EDE = 15
@classmethod
def _maximum(cls):
return 65535
class Option:
"""Base class for all EDNS option types."""
6 years ago
def __init__(self, otype: Union[OptionType, str]):
6 years ago
"""Initialize an option.
*otype*, a ``dns.edns.OptionType``, is the option type.
6 years ago
"""
self.otype = OptionType.make(otype)
6 years ago
def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
6 years ago
"""Convert an option to wire format.
Returns a ``bytes`` or ``None``.
6 years ago
"""
raise NotImplementedError # pragma: no cover
6 years ago
def to_text(self) -> str:
raise NotImplementedError # pragma: no cover
6 years ago
@classmethod
def from_wire_parser(cls, otype: OptionType, parser: "dns.wire.Parser") -> "Option":
"""Build an EDNS option object from wire format.
*otype*, a ``dns.edns.OptionType``, is the option type.
*parser*, a ``dns.wire.Parser``, the parser, which should be
restructed to the option length.
Returns a ``dns.edns.Option``.
"""
raise NotImplementedError # pragma: no cover
6 years ago
def _cmp(self, other):
"""Compare an EDNS option with another option of the same type.
Returns < 0 if < *other*, 0 if == *other*, and > 0 if > *other*.
6 years ago
"""
wire = self.to_wire()
owire = other.to_wire()
if wire == owire:
return 0
if wire > owire:
return 1
return -1
6 years ago
def __eq__(self, other):
if not isinstance(other, Option):
return False
if self.otype != other.otype:
return False
return self._cmp(other) == 0
def __ne__(self, other):
if not isinstance(other, Option):
return True
6 years ago
if self.otype != other.otype:
return True
6 years ago
return self._cmp(other) != 0
def __lt__(self, other):
if not isinstance(other, Option) or self.otype != other.otype:
6 years ago
return NotImplemented
return self._cmp(other) < 0
def __le__(self, other):
if not isinstance(other, Option) or self.otype != other.otype:
6 years ago
return NotImplemented
return self._cmp(other) <= 0
def __ge__(self, other):
if not isinstance(other, Option) or self.otype != other.otype:
6 years ago
return NotImplemented
return self._cmp(other) >= 0
def __gt__(self, other):
if not isinstance(other, Option) or self.otype != other.otype:
6 years ago
return NotImplemented
return self._cmp(other) > 0
def __str__(self):
return self.to_text()
6 years ago
class GenericOption(Option): # lgtm[py/missing-equals]
"""Generic Option Class
6 years ago
This class is used for EDNS option types for which we have no better
implementation.
"""
def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]):
super().__init__(otype)
self.data = dns.rdata.Rdata._as_bytes(data, True)
6 years ago
def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
if file:
file.write(self.data)
return None
else:
return self.data
def to_text(self) -> str:
return "Generic %d" % self.otype
6 years ago
@classmethod
def from_wire_parser(
cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
) -> Option:
return cls(otype, parser.get_remaining())
class ECSOption(Option): # lgtm[py/missing-equals]
"""EDNS Client Subnet (ECS, RFC7871)"""
def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0):
"""*address*, a ``str``, is the client address information.
*srclen*, an ``int``, the source prefix length, which is the
leftmost number of bits of the address to be used for the
lookup. The default is 24 for IPv4 and 56 for IPv6.
*scopelen*, an ``int``, the scope prefix length. This value
must be 0 in queries, and should be set in responses.
"""
super().__init__(OptionType.ECS)
af = dns.inet.af_for_address(address)
if af == socket.AF_INET6:
self.family = 2
if srclen is None:
srclen = 56
address = dns.rdata.Rdata._as_ipv6_address(address)
srclen = dns.rdata.Rdata._as_int(srclen, 0, 128)
scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 128)
elif af == socket.AF_INET:
self.family = 1
if srclen is None:
srclen = 24
address = dns.rdata.Rdata._as_ipv4_address(address)
srclen = dns.rdata.Rdata._as_int(srclen, 0, 32)
scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 32)
else: # pragma: no cover (this will never happen)
raise ValueError("Bad address family")
assert srclen is not None
self.address = address
self.srclen = srclen
self.scopelen = scopelen
addrdata = dns.inet.inet_pton(af, address)
nbytes = int(math.ceil(srclen / 8.0))
# Truncate to srclen and pad to the end of the last octet needed
# See RFC section 6
self.addrdata = addrdata[:nbytes]
nbits = srclen % 8
if nbits != 0:
last = struct.pack("B", ord(self.addrdata[-1:]) & (0xFF << (8 - nbits)))
self.addrdata = self.addrdata[:-1] + last
def to_text(self) -> str:
return "ECS {}/{} scope/{}".format(self.address, self.srclen, self.scopelen)
@staticmethod
def from_text(text: str) -> Option:
"""Convert a string into a `dns.edns.ECSOption`
*text*, a `str`, the text form of the option.
Returns a `dns.edns.ECSOption`.
Examples:
>>> import dns.edns
>>>
>>> # basic example
>>> dns.edns.ECSOption.from_text('1.2.3.4/24')
>>>
>>> # also understands scope
>>> dns.edns.ECSOption.from_text('1.2.3.4/24/32')
>>>
>>> # IPv6
>>> dns.edns.ECSOption.from_text('2001:4b98::1/64/64')
>>>
>>> # it understands results from `dns.edns.ECSOption.to_text()`
>>> dns.edns.ECSOption.from_text('ECS 1.2.3.4/24/32')
"""
optional_prefix = "ECS"
tokens = text.split()
ecs_text = None
if len(tokens) == 1:
ecs_text = tokens[0]
elif len(tokens) == 2:
if tokens[0] != optional_prefix:
raise ValueError('could not parse ECS from "{}"'.format(text))
ecs_text = tokens[1]
else:
raise ValueError('could not parse ECS from "{}"'.format(text))
n_slashes = ecs_text.count("/")
if n_slashes == 1:
address, tsrclen = ecs_text.split("/")
tscope = "0"
elif n_slashes == 2:
address, tsrclen, tscope = ecs_text.split("/")
else:
raise ValueError('could not parse ECS from "{}"'.format(text))
try:
scope = int(tscope)
except ValueError:
raise ValueError(
"invalid scope " + '"{}": scope must be an integer'.format(tscope)
)
try:
srclen = int(tsrclen)
except ValueError:
raise ValueError(
"invalid srclen " + '"{}": srclen must be an integer'.format(tsrclen)
)
return ECSOption(address, srclen, scope)
def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
value = (
struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata
)
if file:
file.write(value)
return None
else:
return value
@classmethod
def from_wire_parser(
cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
) -> Option:
family, src, scope = parser.get_struct("!HBB")
addrlen = int(math.ceil(src / 8.0))
prefix = parser.get_bytes(addrlen)
if family == 1:
pad = 4 - addrlen
addr = dns.ipv4.inet_ntoa(prefix + b"\x00" * pad)
elif family == 2:
pad = 16 - addrlen
addr = dns.ipv6.inet_ntoa(prefix + b"\x00" * pad)
else:
raise ValueError("unsupported family")
return cls(addr, src, scope)
class EDECode(dns.enum.IntEnum):
OTHER = 0
UNSUPPORTED_DNSKEY_ALGORITHM = 1
UNSUPPORTED_DS_DIGEST_TYPE = 2
STALE_ANSWER = 3
FORGED_ANSWER = 4
DNSSEC_INDETERMINATE = 5
DNSSEC_BOGUS = 6
SIGNATURE_EXPIRED = 7
SIGNATURE_NOT_YET_VALID = 8
DNSKEY_MISSING = 9
RRSIGS_MISSING = 10
NO_ZONE_KEY_BIT_SET = 11
NSEC_MISSING = 12
CACHED_ERROR = 13
NOT_READY = 14
BLOCKED = 15
CENSORED = 16
FILTERED = 17
PROHIBITED = 18
STALE_NXDOMAIN_ANSWER = 19
NOT_AUTHORITATIVE = 20
NOT_SUPPORTED = 21
NO_REACHABLE_AUTHORITY = 22
NETWORK_ERROR = 23
INVALID_DATA = 24
@classmethod
def _maximum(cls):
return 65535
class EDEOption(Option): # lgtm[py/missing-equals]
"""Extended DNS Error (EDE, RFC8914)"""
_preserve_case = {"DNSKEY", "DS", "DNSSEC", "RRSIGs", "NSEC", "NXDOMAIN"}
def __init__(self, code: Union[EDECode, str], text: Optional[str] = None):
"""*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the
extended error.
*text*, a ``str`` or ``None``, specifying additional information about
the error.
"""
super().__init__(OptionType.EDE)
self.code = EDECode.make(code)
if text is not None and not isinstance(text, str):
raise ValueError("text must be string or None")
self.text = text
def to_text(self) -> str:
output = f"EDE {self.code}"
if self.code in EDECode:
desc = EDECode.to_text(self.code)
desc = " ".join(
word if word in self._preserve_case else word.title()
for word in desc.split("_")
)
output += f" ({desc})"
if self.text is not None:
output += f": {self.text}"
return output
def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
value = struct.pack("!H", self.code)
if self.text is not None:
value += self.text.encode("utf8")
if file:
file.write(value)
return None
else:
return value
@classmethod
def from_wire_parser(
cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
) -> Option:
code = EDECode.make(parser.get_uint16())
text = parser.get_remaining()
if text:
if text[-1] == 0: # text MAY be null-terminated
text = text[:-1]
btext = text.decode("utf8")
else:
btext = None
return cls(code, btext)
class NSIDOption(Option):
def __init__(self, nsid: bytes):
super().__init__(OptionType.NSID)
self.nsid = nsid
6 years ago
def to_wire(self, file: Any = None) -> Optional[bytes]:
if file:
file.write(self.nsid)
return None
else:
return self.nsid
6 years ago
def to_text(self) -> str:
if all(c >= 0x20 and c <= 0x7E for c in self.nsid):
# All ASCII printable, so it's probably a string.
value = self.nsid.decode()
else:
value = binascii.hexlify(self.nsid).decode()
return f"NSID {value}"
@classmethod
def from_wire_parser(
cls, otype: Union[OptionType, str], parser: dns.wire.Parser
) -> Option:
return cls(parser.get_remaining())
_type_to_class: Dict[OptionType, Any] = {
OptionType.ECS: ECSOption,
OptionType.EDE: EDEOption,
OptionType.NSID: NSIDOption,
6 years ago
}
def get_option_class(otype: OptionType) -> Any:
"""Return the class for the specified option type.
The GenericOption class is used if a more specific class is not
known.
"""
6 years ago
cls = _type_to_class.get(otype)
if cls is None:
cls = GenericOption
return cls
def option_from_wire_parser(
otype: Union[OptionType, str], parser: "dns.wire.Parser"
) -> Option:
"""Build an EDNS option object from wire format.
*otype*, an ``int``, is the option type.
6 years ago
*parser*, a ``dns.wire.Parser``, the parser, which should be
restricted to the option length.
Returns an instance of a subclass of ``dns.edns.Option``.
"""
otype = OptionType.make(otype)
cls = get_option_class(otype)
return cls.from_wire_parser(otype, parser)
def option_from_wire(
otype: Union[OptionType, str], wire: bytes, current: int, olen: int
) -> Option:
"""Build an EDNS option object from wire format.
*otype*, an ``int``, is the option type.
*wire*, a ``bytes``, is the wire-format message.
*current*, an ``int``, is the offset in *wire* of the beginning
of the rdata.
*olen*, an ``int``, is the length of the wire-format option data
Returns an instance of a subclass of ``dns.edns.Option``.
"""
parser = dns.wire.Parser(wire, current)
with parser.restrict_to(olen):
return option_from_wire_parser(otype, parser)
def register_type(implementation: Any, otype: OptionType) -> None:
"""Register the implementation of an option type.
*implementation*, a ``class``, is a subclass of ``dns.edns.Option``.
*otype*, an ``int``, is the option type.
"""
_type_to_class[otype] = implementation
### BEGIN generated OptionType constants
NSID = OptionType.NSID
DAU = OptionType.DAU
DHU = OptionType.DHU
N3U = OptionType.N3U
ECS = OptionType.ECS
EXPIRE = OptionType.EXPIRE
COOKIE = OptionType.COOKIE
KEEPALIVE = OptionType.KEEPALIVE
PADDING = OptionType.PADDING
CHAIN = OptionType.CHAIN
EDE = OptionType.EDE
### END generated OptionType constants