Source code for nadzoring.commands.network_commands

"""Network base CLI commands."""

from collections.abc import Callable
from logging import Logger
from typing import Any, Literal, Never

import click
from tqdm import tqdm

from nadzoring.logger import get_logger
from nadzoring.network_base.connections import ConnectionEntry, get_connections
from nadzoring.network_base.domain_info import get_domain_info
from nadzoring.network_base.geolocation_ip import geo_ip
from nadzoring.network_base.http_ping import http_ping
from nadzoring.network_base.ipv4_local_cli import get_local_ipv4
from nadzoring.network_base.network_params import network_param
from nadzoring.network_base.parse_url import parse_url
from nadzoring.network_base.ping_address import ping_addr
from nadzoring.network_base.port_scanner import (
    ScanConfig,
    ScanMode,
    ScanResult,
    get_ports_from_mode,
    scan_ports,
)
from nadzoring.network_base.route_table import RouteEntry, get_route_table
from nadzoring.network_base.router_ip import (
    check_ipv4,
    check_ipv6,
    get_ip_from_host,
    router_ip,
)
from nadzoring.network_base.service_detector import (
    ServiceDetectionResult,
    detect_service_on_host,
)
from nadzoring.network_base.service_on_port import get_service_on_port
from nadzoring.network_base.traceroute import traceroute
from nadzoring.network_base.whois_lookup import whois_lookup
from nadzoring.utils.decorators import common_cli_options
from nadzoring.utils.formatters import format_scan_results
from nadzoring.utils.timeout import TimeoutConfig

logger: Logger = get_logger(__name__)


@click.group(name="network-base")
def network_group() -> None:
    """Network base commands for analysis and diagnostics."""


@network_group.command(name="detect-service")
@common_cli_options(include_quiet=True, include_timeout=True)
@click.argument("target", required=True)
@click.argument("ports", type=int, nargs=-1, required=True)
@click.option(
    "--no-probe",
    is_flag=True,
    help="Don't send protocol-specific probes",
)
def detect_service_command(
    target: str,
    ports: tuple[int, ...],
    *,
    timeout_config: TimeoutConfig,
    no_probe: bool,
    quiet: bool,
) -> list[dict[str, Any]]:
    """
    Detect actual services running on specific ports of a target host.

    Connects to each specified port, grabs a banner if possible,
    and analyzes it to determine the real service.

    Examples:
        nadzoring network-base detect-service example.com 80 443 22
        nadzoring network-base detect-service --timeout 5 192.168.1.1 8080 3306

    """
    results: list[dict[str, Any]] = []
    total: int = len(ports)

    pbar: Any | None = None if quiet else tqdm(total=total, desc="Detecting services", unit="port")

    for port in ports:
        result: ServiceDetectionResult = detect_service_on_host(
            host=target,
            port=port,
            timeout_config=timeout_config,
            send_probe=not no_probe,
        )

        detected: str = result.detected_service or result.guessed_service or "unknown"
        status: str = result.detected_service or "guessed"
        if result.error:
            status = f"error: {result.error}"

        results.append({
            "target": target,
            "port": port,
            "detected_service": detected,
            "status": status,
            "banner": result.banner or "",
            "method": result.method,
        })

        if pbar:
            pbar.set_description(f"Port {port}")
            pbar.update(1)

    if pbar:
        pbar.close()

    return results


@network_group.command(name="port-scan")
@common_cli_options(include_quiet=True, include_timeout=True)
@click.argument("targets", nargs=-1, required=True)
@click.option(
    "--mode",
    type=click.Choice(["fast", "full", "custom"], case_sensitive=False),
    default="fast",
    show_default=True,
    help="Scan mode: fast (common ports), full (1-65535), or custom",
)
@click.option(
    "--ports",
    help="Custom ports or range (e.g., '22,80,443' or '1-1024')",
)
@click.option(
    "--protocol",
    type=click.Choice(["tcp", "udp"], case_sensitive=False),
    default="tcp",
    show_default=True,
    help="Protocol to scan",
)
@click.option(
    "--workers",
    type=int,
    default=50,
    show_default=True,
    help="Maximum number of concurrent workers per target",
)
@click.option(
    "--show-closed",
    is_flag=True,
    help="Show closed ports in results",
)
@click.option(
    "--no-banner",
    is_flag=True,
    help="Disable banner grabbing",
)
def port_scan_command(
    targets: tuple[str, ...],
    mode: str,
    ports: str | None,
    protocol: str,
    timeout_config: TimeoutConfig,
    workers: int,
    *,
    show_closed: bool,
    no_banner: bool,
    quiet: bool,
) -> list[dict[str, Any]]:
    """Scan for open ports on one or more targets."""
    parsed_ports: tuple[list[int] | None, tuple[int, int] | None] = _parse_port_specification(mode, ports)

    mode_literal: ScanMode = "fast" if mode == "fast" else "full" if mode == "full" else "custom"
    protocol_literal: Literal["tcp", "udp"] = "tcp" if protocol == "tcp" else "udp"

    base_config = ScanConfig(
        targets=list(targets),
        mode=mode_literal,
        protocol=protocol_literal,
        custom_ports=parsed_ports[0],
        port_range=parsed_ports[1],
        timeout_config=timeout_config,
        max_workers=workers,
        grab_banner=not no_banner,
    )

    ports_to_scan: list[int] = get_ports_from_mode(base_config)
    if not ports_to_scan:
        click.secho("No ports to scan. Check your configuration.", fg="red", err=True)
        return []

    total_ports_per_target: int = len(ports_to_scan)
    batch_size: int = workers
    num_batches: int = (total_ports_per_target + batch_size - 1) // batch_size

    if not quiet:
        click.echo(
            f"Scanning {len(targets)} target(s) | "
            f"{total_ports_per_target} ports each | "
            f"{num_batches} batches of {batch_size} workers",
            err=True,
        )

    scan_results: list[ScanResult] = []

    for target_idx, target in enumerate(targets, 1):
        target_config = ScanConfig(
            targets=[target],
            mode=mode_literal,
            protocol=protocol_literal,
            custom_ports=parsed_ports[0],
            port_range=parsed_ports[1],
            timeout_config=timeout_config,
            max_workers=workers,
            grab_banner=not no_banner,
        )

        if quiet:
            result: list[ScanResult] = scan_ports(target_config)
            if result:
                scan_results.extend(result)
            continue

        with tqdm(
            total=total_ports_per_target,
            desc=f"[{target_idx}/{len(targets)}] {target}",
            unit="ports",
            dynamic_ncols=True,
            bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
        ) as pbar:

            def _make_callback(
                pbar: tqdm[Never],
                target_idx: int,
                target: str,
                total_targets: int,
            ) -> Callable[[str, int, int], None]:
                def progress_callback(desc: str, completed: int, total: int) -> None:
                    pbar.desc = f"[{target_idx}/{total_targets}] {target} | {desc}"
                    if pbar.total != total:
                        pbar.total = total
                    pbar.n = completed
                    pbar.refresh()

                return progress_callback

            target_config.progress_callback = _make_callback(pbar, target_idx, target, len(targets))
            result_scan: list[ScanResult] = scan_ports(target_config)
            if result_scan:
                scan_results.extend(result_scan)

    return format_scan_results(scan_results, show_closed=show_closed)


[docs] def _parse_port_specification( mode: str, ports: str | None, ) -> tuple[list[int] | None, tuple[int, int] | None]: """ Parse a port specification string into custom ports or a range. Args: mode: Scan mode string; only ``"custom"`` triggers parsing. ports: Raw port string such as ``"22,80,443"`` or ``"1-1024"``. Returns: Two-tuple of ``(custom_ports, port_range)`` where exactly one element is non-``None`` when ``mode == "custom"`` and ``ports`` is provided, and both are ``None`` otherwise. Raises: click.BadParameter: When the port string cannot be parsed. """ if mode != "custom" or not ports: return None, None if "-" in ports and "," not in ports: try: start, end = map(int, ports.split("-")) except ValueError as err: raise click.BadParameter("Port range must be in format 'start-end' (e.g., '1-1024')") from err return None, (start, end) try: custom_ports: list[int] = [int(p.strip()) for p in ports.split(",")] except ValueError as err: raise click.BadParameter("Ports must be comma-separated integers") from err return custom_ports, None
@network_group.command(name="ping") @common_cli_options(include_quiet=True) @click.argument("addresses", type=str, nargs=-1, required=True) def ping_command( addresses: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Ping one or more addresses.""" results: list[dict[str, Any]] = [] total: int = len(addresses) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Pinging addresses", unit="ping") for address in addresses: is_pinged: bool = ping_addr(address) results.append({ "address": address, "is_pinged": "yes" if is_pinged else "no", "status": "up" if is_pinged else "down", }) if pbar: pbar.set_description(f"Pinging {address}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="parse-url") @common_cli_options(include_quiet=True) @click.argument("urls", type=str, nargs=-1, required=True) def parse_url_command( urls: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Parse one or more URLs into their components.""" results: list[dict[str, Any]] = [] total: int = len(urls) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Parsing URLs", unit="URL") for url in urls: results.append(parse_url(url)) if pbar: pbar.set_description(f"Parsing {url}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="geolocation") @common_cli_options(include_quiet=True) @click.argument("ips", type=str, nargs=-1, required=True) def geolocation_command( ips: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Get geolocation for one or more IP addresses.""" results: list[dict[str, Any]] = [] total: int = len(ips) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Getting geolocation", unit="ip") for ip in ips: geo: dict[str, str] = geo_ip(ip) results.append({ "ip_address": ip, "latitude": geo.get("lat", "Unknown"), "longitude": geo.get("lon", "Unknown"), "country": geo.get("country", "Unknown"), "city": geo.get("city", "Unknown"), }) if pbar: pbar.set_description(f"Locating {ip}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="params") @common_cli_options() def params_command() -> list[dict[str, Any]]: """Display network configuration parameters for the current system.""" data: dict[str, str | None] = network_param() or {} data["local_ipv4"] = get_local_ipv4() return [data] @network_group.command(name="host-to-ip") @common_cli_options(include_quiet=True) @click.argument("hostnames", type=str, nargs=-1, required=True) def host_to_ip_command( hostnames: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Resolve one or more hostnames to IP addresses.""" results: list[dict[str, Any]] = [] total: int = len(hostnames) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Resolving hostnames", unit="host") router_ipv4: str | None = router_ip(ipv6=False) router_ipv6: str | None = router_ip(ipv6=True) for hostname in hostnames: results.append({ "hostname": hostname, "ip_address": get_ip_from_host(hostname), "ipv4_check": check_ipv4(hostname), "ipv6_check": check_ipv6(hostname), "router_ipv4": router_ipv4 or "Not found", "router_ipv6": router_ipv6 or "Not found", }) if pbar: pbar.set_description(f"Resolving {hostname}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="port-service") @common_cli_options(include_quiet=True) @click.argument("ports", type=int, nargs=-1, required=True) def port_service_command( ports: tuple[int, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Get service names for one or more port numbers.""" results: list[dict[str, Any]] = [] total: int = len(ports) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Looking up ports", unit="port") for port in ports: results.append({ "port": port, "service": get_service_on_port(port), "protocol": "tcp/udp", }) if pbar: pbar.set_description(f"Port {port}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="http-ping") @common_cli_options(include_quiet=True, include_timeout=True) @click.argument("urls", nargs=-1, required=True) @click.option( "--no-ssl-verify", is_flag=True, help="Disable SSL certificate verification", ) @click.option( "--no-redirects", is_flag=True, help="Do not follow HTTP redirects", ) @click.option( "--show-headers", is_flag=True, help="Include response headers in output", ) def http_ping_command( urls: tuple[str, ...], timeout_config: TimeoutConfig, *, no_ssl_verify: bool, no_redirects: bool, show_headers: bool, quiet: bool, ) -> list[dict[str, Any]]: """Check HTTP/HTTPS response timing and status for one or more URLs.""" results: list[dict[str, Any]] = [] total: int = len(urls) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Probing URLs", unit="url") for url in urls: result = http_ping( url, timeout_config=timeout_config, verify_ssl=not no_ssl_verify, follow_redirects=not no_redirects, include_headers=show_headers, ) row: dict[str, Any] = { "url": result.url, "status": result.status_code or result.error or "error", "dns_ms": result.dns_ms if result.dns_ms is not None else "n/a", "ttfb_ms": result.ttfb_ms if result.ttfb_ms is not None else "n/a", "total_ms": result.total_ms if result.total_ms is not None else "n/a", "size_bytes": (result.content_length if result.content_length is not None else "n/a"), } if result.final_url: row["redirected_to"] = result.final_url if show_headers and result.headers: for header_key in ( "Content-Type", "Server", "Cache-Control", "X-Powered-By", ): value: str | None = result.headers.get(header_key) or result.headers.get(header_key.lower()) if value: row[f"header_{header_key.lower().replace('-', '_')}"] = value results.append(row) if pbar: pbar.set_description(f"Probing {url}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="whois") @common_cli_options(include_quiet=True) @click.argument("targets", nargs=-1, required=True) def whois_command( targets: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Look up WHOIS registration info for one or more domains or IPs.""" results: list[dict[str, Any]] = [] total: int = len(targets) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Running WHOIS", unit="target") for target in targets: info: dict[str, str | None] = whois_lookup(target) results.append({k: v for k, v in info.items() if v is not None}) if pbar: pbar.set_description(f"WHOIS {target}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="domain-info") @common_cli_options(include_quiet=True) @click.argument("domains", nargs=-1, required=True) def domain_info_command( domains: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """Get comprehensive info (WHOIS, DNS, geo) for one or more domains.""" results: list[dict[str, Any]] = [] total: int = len(domains) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Getting domain info", unit="domain") for domain in domains: results.append(get_domain_info(domain)) if pbar: pbar.set_description(f"Getting info for {domain}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="connections") @common_cli_options(include_quiet=True) @click.option( "--protocol", "-p", type=click.Choice(["tcp", "udp", "all"], case_sensitive=False), default="all", show_default=True, help="Filter by protocol", ) @click.option( "--state", "-s", "state_filter", default=None, help="Filter by state substring, e.g. LISTEN or ESTABLISHED", ) @click.option( "--no-process", is_flag=True, help="Skip process/PID info (avoids permission errors)", ) def connections_command( protocol: str, state_filter: str | None, *, no_process: bool, quiet: bool, ) -> list[dict[str, Any]]: """List active TCP/UDP network connections on the current system.""" entries: list[ConnectionEntry] = get_connections( protocol=protocol, state_filter=state_filter, include_process=not no_process, ) results: list[dict[str, Any]] = [] for entry in entries: row: dict[str, Any] = { "protocol": entry.protocol, "local_address": entry.local_address, "remote_address": entry.remote_address, "state": entry.state or "—", } if entry.pid is not None: row["pid"] = entry.pid if entry.process is not None: row["process"] = entry.process results.append(row) if not quiet and not results: click.echo("No connections found matching the given filters.", err=True) return results @network_group.command(name="traceroute") @common_cli_options(include_quiet=True, include_timeout=True) @click.argument("targets", nargs=-1, required=True) @click.option( "--max-hops", type=int, default=30, show_default=True, help="Maximum number of hops", ) @click.option( "--sudo", "use_sudo", is_flag=True, help="Run traceroute with sudo (required on some Linux systems)", ) def traceroute_command( targets: tuple[str, ...], max_hops: int, timeout_config: TimeoutConfig, *, use_sudo: bool, quiet: bool, ) -> list[dict[str, Any]]: """Trace the network path to one or more hosts.""" results: list[dict[str, Any]] = [] total: int = len(targets) pbar: tqdm[Never] | None = None if quiet else tqdm(total=total, desc="Tracing routes", unit="target") for target in targets: if not quiet: click.echo(f"\nTraceroute to {target}:", err=True) hops = traceroute( target, max_hops=max_hops, per_hop_timeout=timeout_config.connect, use_sudo=use_sudo, ) for hop in hops: rtt_values: list[str] = [f"{r:.1f} ms" if r is not None else "*" for r in hop.rtt_ms] results.append({ "target": target, "hop": hop.hop, "host": hop.host or "*", "ip": hop.ip or "*", "rtt": " / ".join(rtt_values), }) if not hops and not quiet: click.echo(f"No hops returned for {target}.", err=True) if pbar: pbar.set_description(f"Tracing {target}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="route") @common_cli_options(include_quiet=True) def route_command(*, quiet: bool = False) -> list[dict[str, Any]]: """Display the system IP routing table.""" entries: list[RouteEntry] = get_route_table() if not entries and not quiet: click.echo( "Could not retrieve routing table. Check that 'ip' (Linux) or 'route' (Windows) is available.", err=True, ) return [] return [ { "destination": entry.destination, "gateway": entry.gateway, "netmask": entry.netmask or "—", "interface": entry.interface or "—", "metric": entry.metric or "—", } for entry in entries ]