Source code for nadzoring.commands.network_commands

"""Network base CLI commands."""

from collections.abc import Callable
from logging import Logger
from typing import Any

import click
from tqdm import tqdm

from nadzoring.logger import get_logger
from nadzoring.network_base.connections import ConnectionEntry, get_connections
from nadzoring.network_base.geolocation_ip import geo_ip
from nadzoring.network_base.http_ping import HttpPingResult, http_ping
from nadzoring.network_base.ipv4_local_cli import get_local_ipv4
from nadzoring.network_base.network_params import network_param
from nadzoring.network_base.ping_address import ping_addr
from nadzoring.network_base.port_scanner import (
    ScanConfig,
    ScanResult,
    get_ports_from_mode,
    scan_ports,
)
from nadzoring.network_base.route_table import RouteEntry, get_route_table
from nadzoring.network_base.router_ip import (
    check_ipv4,
    check_ipv6,
    get_ip_from_host,
    router_ip,
)
from nadzoring.network_base.service_on_port import get_service_on_port
from nadzoring.network_base.traceroute import TraceHop, traceroute
from nadzoring.network_base.whois_lookup import whois_lookup
from nadzoring.utils.decorators import common_cli_options
from nadzoring.utils.formatters import _format_scan_results

logger: Logger = get_logger(__name__)


@click.group(name="network-base")
def network_group() -> None:
    """Network base commands for analysis and diagnostics."""


@network_group.command(name="port-scan")
@common_cli_options(include_quiet=True)
@click.argument("targets", nargs=-1, required=True)
@click.option(
    "--mode",
    type=click.Choice(["fast", "full", "custom"], case_sensitive=False),
    default="fast",
    help="Scan mode: fast (common ports), full (1-65535), or custom",
)
@click.option(
    "--ports",
    help="Custom ports or range (e.g., '22,80,443' or '1-1024')",
)
@click.option(
    "--protocol",
    type=click.Choice(["tcp", "udp"], case_sensitive=False),
    default="tcp",
    help="Protocol to scan",
)
@click.option(
    "--timeout",
    type=float,
    default=2.0,
    help="Socket timeout in seconds",
)
@click.option(
    "--workers",
    type=int,
    default=50,
    help="Maximum number of concurrent workers per target",
)
@click.option(
    "--show-closed",
    is_flag=True,
    help="Show closed ports in results",
)
@click.option(
    "--no-banner",
    is_flag=True,
    help="Disable banner grabbing",
)
def port_scan_command(
    targets: tuple[str, ...],
    mode: str,
    ports: str | None,
    protocol: str,
    timeout: float,
    workers: int,
    *,
    show_closed: bool,
    no_banner: bool,
    quiet: bool,
) -> list[dict[str, Any]]:
    """Scan for open ports on one or more targets."""
    parsed_ports: tuple[list[int] | None, tuple[int, int] | None] = (
        _parse_port_specification(mode, ports)
    )

    base_config = ScanConfig(
        targets=list(targets),
        mode=mode,
        protocol=protocol,
        custom_ports=parsed_ports[0],
        port_range=parsed_ports[1],
        timeout=timeout,
        max_workers=workers,
        grab_banner=not no_banner,
    )

    ports_to_scan: list[int] = get_ports_from_mode(base_config)
    if not ports_to_scan:
        click.secho("No ports to scan. Check your configuration.", fg="red", err=True)
        return []

    total_ports_per_target: int = len(ports_to_scan)
    batch_size: int = workers
    num_batches: int = (total_ports_per_target + batch_size - 1) // batch_size

    if not quiet:
        click.echo(
            f"Scanning {len(targets)} target(s) | "
            f"{total_ports_per_target} ports each | "
            f"{num_batches} batches of {batch_size} workers",
            err=True,
        )

    scan_results: list[ScanResult] = []

    for target_idx, target in enumerate(targets, 1):
        target_config = ScanConfig(
            targets=[target],
            mode=mode,
            protocol=protocol,
            custom_ports=parsed_ports[0],
            port_range=parsed_ports[1],
            timeout=timeout,
            max_workers=workers,
            grab_banner=not no_banner,
        )

        if quiet:
            result = scan_ports(target_config)
            if result:
                scan_results.extend(result)
            continue

        with tqdm(
            total=total_ports_per_target,
            desc=f"[{target_idx}/{len(targets)}] {target}",
            unit="ports",
            dynamic_ncols=True,
            bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} "
            "[{elapsed}<{remaining}, {rate_fmt}]",
        ) as pbar:

            def _make_callback(
                pbar: tqdm,
                target_idx: int,
                target: str,
                total_targets: int,
            ) -> Callable[[str, int, int], None]:
                def progress_callback(desc: str, completed: int, total: int) -> None:
                    pbar.desc = f"[{target_idx}/{total_targets}] {target} | {desc}"
                    if pbar.total != total:
                        pbar.total = total
                    pbar.n = completed
                    pbar.refresh()

                return progress_callback

            target_config.progress_callback = _make_callback(
                pbar, target_idx, target, len(targets)
            )
            result: list[ScanResult] = scan_ports(target_config)
            if result:
                scan_results.extend(result)

    return _format_scan_results(scan_results, show_closed=show_closed)


[docs] def _parse_port_specification( mode: str, ports: str | None ) -> tuple[list[int] | None, tuple[int, int] | None]: """Parse port specification from CLI arguments.""" if mode != "custom" or not ports: return None, None if "-" in ports and "," not in ports: try: start, end = map(int, ports.split("-")) except ValueError as err: raise click.BadParameter( "Port range must be in format 'start-end' (e.g., '1-1024')" ) from err return None, (start, end) try: custom_ports: list[int] = [int(p.strip()) for p in ports.split(",")] except ValueError as err: raise click.BadParameter("Ports must be comma-separated integers") from err return custom_ports, None
@network_group.command(name="ping") @common_cli_options(include_quiet=True) @click.argument("addresses", type=str, nargs=-1, required=True) def ping_command( addresses: tuple[str, ...], *, quiet: bool, ) -> list[dict]: """Ping one or more addresses.""" results = [] total = len(addresses) pbar = None if quiet else tqdm(total=total, desc="Pinging addresses", unit="ping") for address in addresses: is_pinged = ping_addr(address) results.append( { "address": address, "is_pinged": "yes" if is_pinged else "no", "status": "up" if is_pinged else "down", } ) if pbar: pbar.set_description(f"Pinging {address}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="geolocation") @common_cli_options(include_quiet=True) @click.argument("ips", type=str, nargs=-1, required=True) def geolocation_command( ips: tuple[str, ...], *, quiet: bool, ) -> list[dict]: """Get geolocation for one or more IPs.""" results = [] total = len(ips) pbar = None if quiet else tqdm(total=total, desc="Getting geolocation", unit="ip") for ip in ips: geolocation = geo_ip(ip) results.append( { "ip_address": ip, "latitude": geolocation.get("lat", "Unknown"), "longitude": geolocation.get("lon", "Unknown"), "country": geolocation.get("country", "Unknown"), "city": geolocation.get("city", "Unknown"), } ) if pbar: pbar.set_description(f"Locating {ip}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="params") @common_cli_options(include_quiet=True) def params_command(*, quiet: bool = False) -> list[dict]: """Get network parameters for the current system.""" data = network_param() or {} data["local_ipv4"] = get_local_ipv4() return [data] @network_group.command(name="host-to-ip") @common_cli_options(include_quiet=True) @click.argument("hostnames", type=str, nargs=-1, required=True) def host_to_ip_command( hostnames: tuple[str, ...], *, quiet: bool, ) -> list[dict]: """Get IPs for one or more hostname addresses.""" results = [] total = len(hostnames) pbar = None if quiet else tqdm(total=total, desc="Resolving hostnames", unit="host") router_ipv4 = router_ip(ipv6=False) router_ipv6 = router_ip(ipv6=True) for hostname in hostnames: ip = get_ip_from_host(hostname) ipv4_check = check_ipv4(hostname) ipv6_check = check_ipv6(hostname) results.append( { "hostname": hostname, "ip_address": ip, "ipv4_check": ipv4_check, "ipv6_check": ipv6_check, "router_ipv4": router_ipv4 or "Not found", "router_ipv6": router_ipv6 or "Not found", } ) if pbar: pbar.set_description(f"Resolving {hostname}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="port-service") @common_cli_options(include_quiet=True) @click.argument("ports", type=int, nargs=-1, required=True) def port_service_command( ports: tuple[int, ...], *, quiet: bool, ) -> list[dict]: """Get service names for one or more ports.""" results = [] total = len(ports) pbar = None if quiet else tqdm(total=total, desc="Looking up ports", unit="port") for port in ports: service = get_service_on_port(port) results.append( { "port": port, "service": service, "protocol": "tcp/udp", } ) if pbar: pbar.set_description(f"Port {port}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="http-ping") @common_cli_options(include_quiet=True) @click.argument("urls", nargs=-1, required=True) @click.option( "--timeout", type=float, default=10.0, help="Request timeout in seconds", ) @click.option( "--no-ssl-verify", is_flag=True, help="Disable SSL certificate verification", ) @click.option( "--no-redirects", is_flag=True, help="Do not follow HTTP redirects", ) @click.option( "--show-headers", is_flag=True, help="Include response headers in output", ) def http_ping_command( urls: tuple[str, ...], timeout: float, *, no_ssl_verify: bool, no_redirects: bool, show_headers: bool, quiet: bool, ) -> list[dict[str, Any]]: """ Check HTTP/HTTPS response timing and headers for one or more URLs. Measures DNS resolution time, time-to-first-byte (TTFB), total download time, HTTP status code and optional response headers. """ results: list[dict[str, Any]] = [] total = len(urls) pbar = None if quiet else tqdm(total=total, desc="Probing URLs", unit="url") for url in urls: result: HttpPingResult = http_ping( url, timeout=timeout, verify_ssl=not no_ssl_verify, follow_redirects=not no_redirects, include_headers=show_headers, ) row: dict[str, Any] = { "url": result.url, "status": result.status_code or result.error or "error", "dns_ms": result.dns_ms if result.dns_ms is not None else "n/a", "ttfb_ms": result.ttfb_ms if result.ttfb_ms is not None else "n/a", "total_ms": result.total_ms if result.total_ms is not None else "n/a", "size_bytes": ( result.content_length if result.content_length is not None else "n/a" ), } if result.final_url: row["redirected_to"] = result.final_url if show_headers and result.headers: for header_key in ( "Content-Type", "Server", "Cache-Control", "X-Powered-By", ): value = result.headers.get(header_key) or result.headers.get( header_key.lower() ) if value: row[f"header_{header_key.lower().replace('-', '_')}"] = value results.append(row) if pbar: pbar.set_description(f"Probing {url}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="whois") @common_cli_options(include_quiet=True) @click.argument("targets", nargs=-1, required=True) def whois_command( targets: tuple[str, ...], *, quiet: bool, ) -> list[dict[str, Any]]: """ Look up WHOIS registration info for one or more domains or IPs. Requires the system 'whois' utility to be installed (apt install whois / brew install whois). """ results: list[dict[str, Any]] = [] total = len(targets) pbar = None if quiet else tqdm(total=total, desc="Running WHOIS", unit="target") for target in targets: info = whois_lookup(target) # Flatten to flat dict, drop None values for cleaner table output row: dict[str, Any] = {k: v for k, v in info.items() if v is not None} results.append(row) if pbar: pbar.set_description(f"WHOIS {target}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="connections") @common_cli_options(include_quiet=True) @click.option( "--protocol", "-p", type=click.Choice(["tcp", "udp", "all"], case_sensitive=False), default="all", help="Filter by protocol", ) @click.option( "--state", "-s", "state_filter", default=None, help="Filter by state substring, e.g. LISTEN or ESTABLISHED", ) @click.option( "--no-process", is_flag=True, help="Skip process/PID info (avoids permission errors)", ) def connections_command( protocol: str, state_filter: str | None, *, no_process: bool, quiet: bool, ) -> list[dict[str, Any]]: """ List active network connections (TCP/UDP). Shows local/remote addresses, connection state and, where available, the PID and process name. Analogous to 'ss' or 'netstat'. """ entries: list[ConnectionEntry] = get_connections( protocol=protocol, state_filter=state_filter, include_process=not no_process, ) results: list[dict[str, Any]] = [] for entry in entries: row: dict[str, Any] = { "protocol": entry.protocol, "local_address": entry.local_address, "remote_address": entry.remote_address, "state": entry.state or "—", } if entry.pid is not None: row["pid"] = entry.pid if entry.process is not None: row["process"] = entry.process results.append(row) if not quiet and not results: click.echo("No connections found matching the given filters.", err=True) return results @network_group.command(name="traceroute") @common_cli_options(include_quiet=True) @click.argument("targets", nargs=-1, required=True) @click.option( "--max-hops", type=int, default=30, help="Maximum number of hops", ) @click.option( "--timeout", type=float, default=2.0, help="Per-hop timeout in seconds (default: 2)", ) @click.option( "--sudo", "use_sudo", is_flag=True, help="Run traceroute with sudo (required on some Linux systems)", ) def traceroute_command( targets: tuple[str, ...], max_hops: int, timeout: float, *, use_sudo: bool, quiet: bool, ) -> list[dict[str, Any]]: """ Trace the network path to one or more hosts. Uses 'traceroute' (Linux) or 'tracert' (Windows). On Linux, raw-socket access is needed: run with --sudo, as root, or grant the capability with 'sudo setcap cap_net_raw+ep $(which traceroute)'. tracepath is tried automatically as a root-free fallback. """ results: list[dict[str, Any]] = [] total = len(targets) pbar = None if quiet else tqdm(total=total, desc="Tracing routes", unit="target") for target in targets: if not quiet: click.echo(f"\nTraceroute to {target}:", err=True) hops: list[TraceHop] = traceroute( target, max_hops=max_hops, per_hop_timeout=timeout, use_sudo=use_sudo, ) for hop in hops: rtt_values = [f"{r:.1f} ms" if r is not None else "*" for r in hop.rtt_ms] results.append( { "target": target, "hop": hop.hop, "host": hop.host or "*", "ip": hop.ip or "*", "rtt": " / ".join(rtt_values), } ) if not hops and not quiet: click.echo(f"No hops returned for {target}.", err=True) if pbar: pbar.set_description(f"Tracing {target}") pbar.update(1) if pbar: pbar.close() return results @network_group.command(name="route") @common_cli_options(include_quiet=True) def route_command(*, quiet: bool = False) -> list[dict[str, Any]]: """ Display the system IP routing table. Uses 'ip route' on Linux and 'route PRINT' on Windows. """ entries: list[RouteEntry] = get_route_table() if not entries and not quiet: click.echo( "Could not retrieve routing table. " "Check that 'ip' (Linux) or 'route' (Windows) is available.", err=True, ) return [] return [ { "destination": entry.destination, "gateway": entry.gateway, "netmask": entry.netmask or "—", "interface": entry.interface or "—", "metric": entry.metric or "—", } for entry in entries ]