Source code for nadzoring.dns_lookup.compare

"""DNS server comparison for analysing differences between resolvers."""

from collections.abc import Callable
from typing import Any, TypedDict

from nadzoring.dns_lookup.types import DNSResult, RecordType
from nadzoring.dns_lookup.utils import resolve_with_timer
from nadzoring.utils.timeout import TimeoutConfig


[docs] class ServerComparisonResult(TypedDict): """ Result of comparing DNS responses from multiple servers. Attributes: domain: The domain name that was queried. servers: Nested dict mapping server IPs to per-record-type results. differences: List of detected differences between servers. """ domain: str servers: dict[str, dict[str, DNSResult]] differences: list[dict[str, Any]]
[docs] class DifferenceDetail(TypedDict): """ Detailed information about a detected difference between DNS servers. Attributes: server: IP address of the server that returned a different response. type: DNS record type where the difference was detected. expected: Records expected (from the baseline server). got: Actual records received from this server. ttl_difference: Absolute TTL difference in seconds, or ``None`` when TTL data is unavailable for either server. """ server: str type: str expected: list[str] got: list[str] ttl_difference: int | None
[docs] def compare_dns_servers( domain: str, servers: list[str], record_types: list[str], progress_callback: Callable[[], None] | None = None, timeout_config: TimeoutConfig | None = None, ) -> ServerComparisonResult: """ Compare DNS responses from multiple servers for the same domain. Uses the first server in *servers* as the baseline. Each subsequent server's records are compared against the baseline; discrepancies are collected in ``differences``. Args: domain: Domain name to query (e.g. ``"example.com"``). servers: DNS server IPs to compare. The first entry is the baseline. record_types: Record types to query on every server. progress_callback: Called after each successful query. Useful for progress bars. timeout_config: Unified timeout configuration. If None, uses default. Returns: :class:`ServerComparisonResult` with ``domain``, ``servers``, and ``differences`` keys. Examples: >>> result = compare_dns_servers( ... "example.com", ... servers=["8.8.8.8", "1.1.1.1"], ... record_types=["A", "MX"], ... ) >>> result["differences"] [] """ if timeout_config is None: timeout_config = TimeoutConfig() result: ServerComparisonResult = { "domain": domain, "servers": {}, "differences": [], } for i, server in enumerate(servers): server_results: dict[str, DNSResult] = {} is_baseline: bool = i == 0 for rtype_str in record_types: rtype: RecordType = rtype_str # type: ignore query_result: DNSResult = resolve_with_timer( domain, rtype, server, include_ttl=True, timeout_config=timeout_config, ) if is_baseline: query_result["differs"] = False # type: ignore else: base: DNSResult = result["servers"][servers[0]].get(rtype_str, {}) differs: bool = query_result.get("records") != base.get("records") query_result["differs"] = differs # type: ignore if differs: result["differences"].append({ "server": server, "type": rtype_str, "expected": base.get("records", []), "got": query_result.get("records", []), "ttl_difference": _calculate_ttl_difference(base.get("ttl"), query_result.get("ttl")), }) server_results[rtype_str] = query_result if progress_callback: progress_callback() result["servers"][server] = server_results return result
[docs] def _calculate_ttl_difference(ttl1: int | None, ttl2: int | None) -> int | None: """ Return the absolute difference between two TTL values. Args: ttl1: First TTL in seconds. ttl2: Second TTL in seconds. Returns: Absolute difference when both values are provided, ``None`` otherwise. """ if ttl1 is None or ttl2 is None: return None return abs(ttl1 - ttl2)