Source code for nadzoring.dns_lookup.trace

"""DNS trace routing for tracking the full resolution delegation chain."""

import socket
from logging import Logger
from time import time
from typing import Any

import dns.exception
import dns.message
import dns.name
import dns.query
import dns.rdatatype
import dns.resolver
from dns.message import Message, QueryMessage
from dns.name import Name
from dns.resolver import Answer, Resolver

from nadzoring.dns_lookup.utils import create_resolver
from nadzoring.logger import get_logger

logger: Logger = get_logger(__name__)

_ROOT_NAMESERVER = "198.41.0.4"
_MAX_HOPS = 30
_DELEGATION_TIMEOUT = 5


[docs] def _create_hop(nameserver: str) -> dict[str, Any]: """ Initialise a new hop entry for DNS trace tracking. Args: nameserver: IP address of the nameserver for this hop. Returns: Dict with ``nameserver``, ``records``, ``response_time``, ``next``, and ``error`` keys set to their zero/null defaults. """ return { "nameserver": nameserver, "records": [], "response_time": None, "next": None, "error": None, }
[docs] def _query_nameserver( domain: str, nameserver: str, ) -> tuple[Answer | None, float | None, str | None]: """ Query a specific nameserver for A records of *domain*. Args: domain: Domain name to query. nameserver: IP address of the nameserver to query. Returns: Three-tuple of ``(answers, response_time_ms, error_message)``. *response_time_ms* is ``None`` only on timeout; *error_message* is ``None`` on success. """ resolver: Resolver = create_resolver(nameserver, timeout=3, lifetime=5) start_time: float = time() try: answers: Answer = resolver.resolve(domain, "A") return answers, round((time() - start_time) * 1000, 2), None except dns.resolver.NXDOMAIN: return None, round((time() - start_time) * 1000, 2), "Domain does not exist" except dns.resolver.NoAnswer: return None, round((time() - start_time) * 1000, 2), "No answer" except dns.exception.Timeout: return None, None, "Timeout" except Exception as exc: return None, round((time() - start_time) * 1000, 2), str(exc)
[docs] def _get_delegation_info( current_domain: Name, current_ns: str, hop: dict[str, Any], ) -> str | None: """ Resolve the next-hop nameserver IP via NS delegation records. Args: current_domain: Domain to query for NS records. current_ns: IP address of the nameserver to ask. hop: Hop dict updated in-place with delegation record strings or an ``"error"`` key on failure. Returns: IP address of the next nameserver, or ``None`` when delegation cannot be determined. """ try: ns_query: QueryMessage = dns.message.make_query(current_domain, dns.rdatatype.NS) response: Message = dns.query.udp(ns_query, current_ns, timeout=_DELEGATION_TIMEOUT) for rrset in response.authority: if rrset.rdtype != dns.rdatatype.NS: continue for rr in rrset: ns_name = str(rr.target) try: ns_ip_answer: Answer = dns.resolver.resolve(ns_name, "A", lifetime=3) ns_ip = str(ns_ip_answer[0]) hop["records"].append(f"Delegation to {ns_name} ({ns_ip})") except Exception: hop["records"].append(f"Delegation to {ns_name}") try: return socket.gethostbyname(ns_name) except OSError: logger.debug("Failed to resolve delegation NS %s", ns_name) else: return ns_ip except Exception as exc: hop["error"] = f"Delegation error: {exc}" return None
[docs] def trace_dns(domain: str, nameserver: str | None = None) -> dict[str, Any]: """ Trace the complete DNS resolution path for *domain*. Follows the delegation chain from the specified (or root) nameserver to the authoritative answer, similar to ``dig +trace``. Loop detection and a maximum-hop limit prevent infinite recursion. Args: domain: Domain name to trace (e.g. ``"example.com"``). nameserver: Starting nameserver IP. Defaults to ``a.root-servers.net`` (``198.41.0.4``) when ``None``. Returns: Dict with ``domain``, ``hops`` (list of hop dicts), and ``final_answer`` (the authoritative hop, or ``None``). Examples: >>> result = trace_dns("example.com") >>> for hop in result["hops"]: ... print(hop["nameserver"], hop["response_time"]) """ result: dict[str, Any] = { "domain": domain, "hops": [], "final_answer": None, } current_ns: str = nameserver or _ROOT_NAMESERVER current_domain: Name = dns.name.from_text(domain) visited: set[str] = set() for _ in range(_MAX_HOPS): if current_ns in visited: loop_hop: dict[str, Any] = _create_hop(current_ns) loop_hop["error"] = "Loop detected" loop_hop["next"] = "Loop detected" result["hops"].append(loop_hop) break visited.add(current_ns) hop: dict[str, Any] = _create_hop(current_ns) answers, response_time, error = _query_nameserver(domain, current_ns) hop["response_time"] = response_time if answers: hop["records"] = [str(a) for a in answers] hop["next"] = "Complete" result["final_answer"] = hop result["hops"].append(hop) break next_ns: str | None = _get_delegation_info(current_domain, current_ns, hop) if error: hop["error"] = error if next_ns and next_ns != current_ns: hop["next"] = next_ns result["hops"].append(hop) current_ns = next_ns else: hop["next"] = "No further delegation" result["hops"].append(hop) break return result