Source code for nadzoring.dns_lookup.validation

"""DNS record validation functions."""

from typing import Any


[docs] def calculate_record_score( rtype: str, record_result: dict[str, Any], result: dict[str, list[str]], ) -> int: """ Calculate a health score for a single DNS record type. Starts at 100 and deducts points for errors, missing records, or type-specific issues. Args: rtype: DNS record type (e.g. ``"A"``, ``"MX"``, ``"TXT"``). record_result: Dict with optional ``"error"`` and ``"records"`` keys. result: Accumulator dict with ``"warnings"`` and ``"issues"`` lists updated in-place. Returns: Score between 0 and 100. """ record_score = 100 if record_result.get("error"): error = record_result["error"] if "No " in error and error.endswith("records"): record_score -= 30 result["warnings"].append(f"No {rtype} records found") else: record_score -= 50 result["issues"].append(f"{rtype} record error: {error}") elif not record_result.get("records"): record_score -= 20 result["warnings"].append(f"Empty {rtype} records") return _apply_rtype_specific_checks(rtype, record_result, record_score, result)
[docs] def _apply_rtype_specific_checks( rtype: str, record_result: dict[str, Any], record_score: int, result: dict[str, list[str]], ) -> int: """ Apply validation rules specific to each DNS record type. Args: rtype: DNS record type to validate. record_result: Dict containing the record data to validate. record_score: Current score before applying type-specific checks. result: Accumulator dict for issues and warnings. Returns: Updated score after applying type-specific validations. """ records: Any | None = record_result.get("records") if not records: return record_score if rtype == "MX": return _check_mx_priorities(records, record_score, result) if rtype == "TXT": return _check_txt_records(records, record_score, result) return record_score
[docs] def _check_mx_priorities( records: list[str], record_score: int, result: dict[str, list[str]], ) -> int: """ Deduct points for duplicate MX priorities or malformed records. Args: records: MX record strings in ``"priority mailserver"`` format. record_score: Current score. result: Accumulator dict for issues. Returns: Updated score; reduced by 20 for each duplicate or malformed entry. Examples: >>> r = {"issues": [], "warnings": []} >>> _check_mx_priorities(["10 mail1.com", "10 mail2.com"], 100, r) 80 """ priorities: list[int] = [] for mx in records: try: priority = int(mx.split()[0]) except (IndexError, ValueError): record_score -= 20 result["issues"].append(f"Invalid MX record format: {mx}") continue if priority in priorities: record_score -= 20 result["issues"].append(f"Duplicate MX priority: {priority}") else: priorities.append(priority) return record_score
[docs] def _check_txt_records( records: list[str], record_score: int, result: dict[str, list[str]], ) -> int: """ Validate TXT records for SPF and DKIM compliance. Args: records: List of TXT record strings. record_score: Current score. result: Accumulator dict for issues and warnings. Returns: Updated score after SPF and DKIM checks. """ for txt in records: if txt.startswith("v=spf1"): record_score = _check_spf_record(txt, record_score, result) elif txt.startswith("v=DKIM1"): record_score = _check_dkim_record(txt, record_score, result) return record_score
[docs] def _check_spf_record( txt: str, record_score: int, result: dict[str, list[str]], ) -> int: """ Deduct points when an SPF record lacks a ``~all`` or ``-all`` mechanism. Args: txt: SPF record string starting with ``"v=spf1"``. record_score: Current score. result: Accumulator dict for warnings. Returns: Updated score; reduced by 10 when termination mechanism is absent. Examples: >>> r = {"issues": [], "warnings": []} >>> _check_spf_record("v=spf1 include:spf.example.com", 100, r) 90 """ if "~all" not in txt and "-all" not in txt: record_score -= 10 result["warnings"].append("SPF record missing softfail/hardfail") return record_score
[docs] def _check_dkim_record( txt: str, record_score: int, result: dict[str, list[str]], ) -> int: """ Deduct points when a DKIM record is missing the ``p=`` public-key tag. Args: txt: DKIM record string starting with ``"v=DKIM1"``. record_score: Current score. result: Accumulator dict for issues. Returns: Updated score; reduced by 20 when the public key is absent. Examples: >>> r = {"issues": [], "warnings": []} >>> _check_dkim_record("v=DKIM1; k=rsa;", 100, r) 80 """ if "p=" not in txt: record_score -= 20 result["issues"].append("DKIM record missing public key") return record_score
[docs] def determine_status(score: int) -> str: """ Map a numeric health score to a status label. Args: score: Numeric score in the range 0-100. Returns: ``"healthy"`` for score ≥ 80, ``"degraded"`` for 50-79, or ``"unhealthy"`` for scores below 50. Examples: >>> determine_status(85) 'healthy' >>> determine_status(65) 'degraded' >>> determine_status(30) 'unhealthy' """ if score >= 80: return "healthy" if score >= 50: return "degraded" return "unhealthy"
[docs] def validate_mx_records(mx_records: list[str]) -> dict[str, bool | list[str]]: """ Validate MX records for duplicate priority values. Args: mx_records: MX record strings in ``"priority mailserver"`` format. Returns: Dict with ``"valid"`` (bool), ``"issues"`` (list), and ``"warnings"`` (list) keys. Examples: >>> validate_mx_records(["10 mail1.com", "10 mail2.com"]) {'valid': False, 'issues': ['Duplicate priority: 10'], 'warnings': []} """ validation: dict[str, bool | list[str]] = { "valid": True, "issues": [], "warnings": [], } priorities: list[int] = [] for mx in mx_records: try: priority = int(mx.split()[0]) except (IndexError, ValueError): validation["valid"] = False validation["issues"].append(f"Invalid MX record format: {mx}") continue if priority in priorities: validation["valid"] = False validation["issues"].append(f"Duplicate priority: {priority}") else: priorities.append(priority) return validation
[docs] def validate_txt_records(txt_records: list[str]) -> dict[str, bool | list[str]]: """ Validate TXT records for SPF and DKIM compliance. Checks for SPF (missing ``~all``/``-all``) and DKIM (missing ``p=`` key). Args: txt_records: List of TXT record strings. Returns: Dict with ``"valid"`` (bool), ``"issues"`` (list), and ``"warnings"`` (list) keys. Examples: >>> result = validate_txt_records(["v=spf1 include:spf.com"]) >>> result["valid"] True >>> result["warnings"] ['SPF missing softfail/hardfail'] """ validation: dict[str, bool | list[str]] = { "valid": True, "issues": [], "warnings": [], } for txt in txt_records: if txt.startswith("v=spf1"): if "~all" not in txt and "-all" not in txt: validation["warnings"].append("SPF missing softfail/hardfail") elif txt.startswith("v=DKIM1") and "p=" not in txt: validation["issues"].append("DKIM missing public key") validation["valid"] = False return validation