Source code for nadzoring.dns_lookup.benchmark

"""DNS server benchmarking for performance testing and comparison."""

import asyncio
import operator
import statistics
from collections.abc import Callable
from logging import Logger
from time import sleep

from nadzoring.dns_lookup.types import BenchmarkResult, DNSResult, RecordType
from nadzoring.dns_lookup.utils import (
    get_public_dns_servers,
    resolve_with_timer,
    resolve_with_timer_async,
)
from nadzoring.logger import get_logger
from nadzoring.utils.timeout import TimeoutConfig

logger: Logger = get_logger(__name__)

_DEFAULT_DOMAIN = "google.com"
_DEFAULT_DELAY = 0.1
_DEFAULT_MAX_WORKERS = 5


[docs] def benchmark_single_server( server: str, domain: str = _DEFAULT_DOMAIN, record_type: RecordType = "A", queries: int = 10, delay: float = _DEFAULT_DELAY, timeout_config: TimeoutConfig | None = None, ) -> BenchmarkResult: """ Benchmark the performance of a single DNS server. Performs *queries* DNS lookups against *server*, measuring response time and calculating average, minimum, maximum, and success rate statistics. Args: server: IP address of the DNS server to benchmark. domain: Domain to query. Defaults to ``"google.com"``. record_type: DNS record type to query. Defaults to ``"A"``. queries: Number of queries to perform. Defaults to ``10``. delay: Seconds to wait between queries to avoid rate limiting. Defaults to ``0.1``. Set to ``0`` to disable. timeout_config: Unified timeout configuration. If None, uses default. Returns: :class:`BenchmarkResult` dict with ``server``, ``avg_response_time``, ``min_response_time``, ``max_response_time``, ``success_rate``, ``total_queries``, ``failed_queries``, and ``responses`` keys. Examples: >>> result = benchmark_single_server("8.8.8.8") >>> print(f"{result['avg_response_time']:.2f}ms avg") """ if timeout_config is None: timeout_config = TimeoutConfig() responses: list[float] = [] failed = 0 for i in range(queries): if i > 0 and delay > 0: sleep(delay) try: result: DNSResult = resolve_with_timer( domain, record_type, server, timeout_config=timeout_config, ) if result["response_time"] is not None and not result["error"]: responses.append(result["response_time"]) else: failed += 1 except Exception as exc: logger.debug("Benchmark query failed for %s: %s", server, exc) failed += 1 success_rate: float = ((queries - failed) / queries) * 100 if queries > 0 else 0.0 return { "server": server, "avg_response_time": statistics.mean(responses) if responses else 0.0, "min_response_time": min(responses) if responses else 0.0, "max_response_time": max(responses) if responses else 0.0, "success_rate": round(success_rate, 2), "total_queries": queries, "failed_queries": failed, "responses": responses, }
[docs] def benchmark_dns_servers( domain: str = _DEFAULT_DOMAIN, servers: list[str] | None = None, record_type: RecordType = "A", queries: int = 10, max_workers: int = _DEFAULT_MAX_WORKERS, progress_callback: Callable[[str, int], None] | None = None, *, parallel: bool = True, timeout_config: TimeoutConfig | None = None, ) -> list[BenchmarkResult]: """ Benchmark multiple DNS servers and compare their performance. This synchronous wrapper runs :func:`benchmark_dns_servers_async` in a fresh event loop. Tests each server either in parallel (with :mod:`asyncio`) or sequentially, then sorts results by average response time. Args: domain: Domain to query. Defaults to ``"google.com"``. servers: Server IPs to benchmark. ``None`` uses :func:`~nadzoring.dns_lookup.utils.get_public_dns_servers`. record_type: DNS record type to query. Defaults to ``"A"``. queries: Queries per server. Defaults to ``10``. max_workers: Maximum number of concurrently benchmarked servers when *parallel* is ``True``. Defaults to ``5``. progress_callback: Called after each server completes with ``(server_ip, 1-based_index)``. parallel: Run benchmarks concurrently when ``True`` (default). timeout_config: Unified timeout configuration. If None, uses default. Returns: List of :class:`BenchmarkResult` dicts sorted by ``avg_response_time`` ascending (fastest first). Examples: >>> results = benchmark_dns_servers(servers=["8.8.8.8", "1.1.1.1"]) >>> fastest = results[0] >>> print(f"{fastest['server']}: {fastest['avg_response_time']:.2f}ms") """ try: asyncio.get_running_loop() except RuntimeError: pass else: raise RuntimeError( "benchmark_dns_servers() cannot run inside an active event loop; " "use 'await benchmark_dns_servers_async(...)' instead." ) return asyncio.run( benchmark_dns_servers_async( domain=domain, servers=servers, record_type=record_type, queries=queries, max_workers=max_workers, progress_callback=progress_callback, parallel=parallel, timeout_config=timeout_config, ) )
[docs] async def benchmark_dns_servers_async( domain: str = _DEFAULT_DOMAIN, servers: list[str] | None = None, record_type: RecordType = "A", queries: int = 10, max_workers: int = _DEFAULT_MAX_WORKERS, progress_callback: Callable[[str, int], None] | None = None, *, parallel: bool = True, timeout_config: TimeoutConfig | None = None, ) -> list[BenchmarkResult]: """ Benchmark multiple DNS servers and compare their performance. Async variant of :func:`benchmark_dns_servers` for environments that already have an active event loop (e.g. async applications, notebooks). Args: domain: Domain to query. Defaults to ``"google.com"``. servers: Server IPs to benchmark. ``None`` uses :func:`~nadzoring.dns_lookup.utils.get_public_dns_servers`. record_type: DNS record type to query. Defaults to ``"A"``. queries: Queries per server. Defaults to ``10``. max_workers: Maximum number of concurrently benchmarked servers when *parallel* is ``True``. Defaults to ``5``. progress_callback: Called after each server completes with ``(server_ip, 1-based_index)``. parallel: Run benchmarks concurrently when ``True`` (default). timeout_config: Unified timeout configuration. If None, uses default. Returns: List of :class:`BenchmarkResult` dicts sorted by ``avg_response_time`` ascending (fastest first). Examples: >>> import asyncio >>> results = asyncio.run(benchmark_dns_servers_async(servers=["8.8.8.8", "1.1.1.1"])) >>> fastest = results[0] >>> print(f"{fastest['server']}: {fastest['avg_response_time']:.2f}ms") """ if timeout_config is None: timeout_config = TimeoutConfig() if servers is None: servers = get_public_dns_servers() if parallel: results_list: list[BenchmarkResult] = await _benchmark_parallel_async( servers, domain, record_type, queries, max_workers, progress_callback, timeout_config, ) else: results_list = await _benchmark_sequential_async( servers, domain, record_type, queries, progress_callback, timeout_config, ) results_list.sort(key=operator.itemgetter("avg_response_time")) return results_list
[docs] async def _benchmark_single_server_async( server: str, domain: str, record_type: RecordType, queries: int, delay: float = _DEFAULT_DELAY, timeout_config: TimeoutConfig | None = None, ) -> BenchmarkResult: """Async variant of :func:`benchmark_single_server`.""" if timeout_config is None: timeout_config = TimeoutConfig() responses: list[float] = [] failed = 0 for i in range(queries): if i > 0 and delay > 0: await asyncio.sleep(delay) try: result: DNSResult = await resolve_with_timer_async( domain, record_type, server, timeout_config=timeout_config, ) if result["response_time"] is not None and not result["error"]: responses.append(result["response_time"]) else: failed += 1 except Exception as exc: logger.debug("Benchmark query failed for %s: %s", server, exc) failed += 1 success_rate: float = ((queries - failed) / queries) * 100 if queries > 0 else 0.0 return { "server": server, "avg_response_time": statistics.mean(responses) if responses else 0.0, "min_response_time": min(responses) if responses else 0.0, "max_response_time": max(responses) if responses else 0.0, "success_rate": round(success_rate, 2), "total_queries": queries, "failed_queries": failed, "responses": responses, }
[docs] async def _benchmark_parallel_async( servers: list[str], domain: str, record_type: RecordType, queries: int, max_workers: int, progress_callback: Callable[[str, int], None] | None, timeout_config: TimeoutConfig, ) -> list[BenchmarkResult]: """ Run server benchmarks concurrently using asyncio tasks. Args: servers: Server IP addresses to benchmark. domain: Domain to query. record_type: DNS record type. queries: Queries per server. max_workers: Maximum concurrently running benchmark tasks. progress_callback: Optional progress callback. timeout_config: Unified timeout configuration. Returns: List of benchmark results (unsorted). """ if max_workers <= 0: raise ValueError("max_workers must be greater than 0") semaphore = asyncio.Semaphore(max_workers) results: list[BenchmarkResult] = [] async def _safe_benchmark(server: str) -> tuple[str, BenchmarkResult]: try: async with semaphore: return ( server, await _benchmark_single_server_async( server, domain, record_type, queries, timeout_config=timeout_config, ), ) except Exception: logger.exception("Benchmark failed for %s", server) return (server, _make_failed_benchmark_result(server, queries)) tasks: list[asyncio.Task[tuple[str, BenchmarkResult]]] = [ asyncio.create_task(_safe_benchmark(server)) for server in servers ] for i, task in enumerate(asyncio.as_completed(tasks)): server, result = await task results.append(result) if progress_callback: progress_callback(server, i + 1) return results
[docs] async def _benchmark_sequential_async( servers: list[str], domain: str, record_type: RecordType, queries: int, progress_callback: Callable[[str, int], None] | None, timeout_config: TimeoutConfig, ) -> list[BenchmarkResult]: """ Run server benchmarks one at a time using async DNS queries. Args: servers: Server IP addresses to benchmark. domain: Domain to query. record_type: DNS record type. queries: Queries per server. progress_callback: Optional progress callback. timeout_config: Unified timeout configuration. Returns: List of benchmark results (unsorted). """ results: list[BenchmarkResult] = [] for i, server in enumerate(servers): try: result: BenchmarkResult = await _benchmark_single_server_async( server, domain, record_type, queries, timeout_config=timeout_config, ) except Exception: logger.exception("Benchmark failed for %s", server) result = _make_failed_benchmark_result(server, queries) results.append(result) if progress_callback: progress_callback(server, i + 1) return results
[docs] def _make_failed_benchmark_result(server: str, queries: int) -> BenchmarkResult: """Create a deterministic fallback result for a failed server benchmark.""" return { "server": server, "avg_response_time": 0.0, "min_response_time": 0.0, "max_response_time": 0.0, "success_rate": 0.0, "total_queries": queries, "failed_queries": queries, "responses": [], }