HEX
Server: Apache/2.4.65 (Unix) OpenSSL/1.1.1k
System: Linux vps109042.inmotionhosting.com 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: cisa (1010)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/imh-cwp-dns/zone_cli.py
#!/opt/imh-cwp-dns/venv/bin/python3
'''
A command line tool to manage records in DNS zone files
It assumes the files are located in /var/named
'''
import argparse
import os.path
import json
from zone_interface import ZoneFile


def add_record(zone_file: ZoneFile, subdomain, record_type, value, ttl=900):
    record_type = record_type.upper()

    if record_type == "TXT":
        # Split/escape TXT records if needed
        value = zone_file.encode_txt(value)

    zone_file.insert_record(subdomain, record_type, value, ttl)
    return True


def update_record(
    zone_file: ZoneFile, subdomain, record_type, new_value, value_regex=None, ttl=None
):
    record_type = record_type.upper()
    matches = zone_file.find_record(subdomain, record_type, value_regex)

    if not matches:
        return add_record(zone_file, subdomain, record_type, new_value, ttl)

    if record_type == "TXT":
        # Split/escape TXT records if needed
        new_value = zone_file.encode_txt(new_value)

    for record in matches:
        if not zone_file.replace_record_value(record, new_value, ttl):
            # Something failed
            return False

    return True


def remove_record(zone_file: ZoneFile, subdomain, record_type, value_regex=None):
    record_type = record_type.upper()
    matches = zone_file.find_record(subdomain, record_type, value_regex)

    if not matches:
        print("No matching records found.")
        return False

    for record in matches:
        if not zone_file.remove_record(record):
            # Something failed
            return False
    return True


def get_records(
    zone_file: ZoneFile, subdomain, record_type, value_regex=None, parsed=False
):
    record_type = record_type.upper()
    matches = zone_file.find_record(subdomain, record_type, value_regex)
    result = []
    for record in matches:
        result.append(
            f"{record.subdomain} {record.ttl} IN {record.record_type} {record.value_parsed if parsed else record.value_raw}"
        )
    print(json.dumps(result, indent=2))


def main():
    parser = argparse.ArgumentParser(description="Zone file management tool")
    subparsers = parser.add_subparsers(dest="command", required=True)

    # Common required arguments for all commands
    for cmd, help_text in [
        ("add", "Add a new zone"),
        ("update", "Update a possibly existing zone"),
        ("remove", "Remove a zone"),
        ("get", "Get zone information"),
    ]:
        sub = subparsers.add_parser(cmd, help=help_text)
        sub.add_argument("domain", help="Domain name")
        sub.add_argument("subdomain", help="Subdomain, use @ or '${domain}.' for root")
        sub.add_argument("record_type", help="Record type (e.g., A, TXT, CNAME)")
        sub.add_argument(
            "--warnings", help="Show warnings", action="store_true", default=False
        )
        if cmd in ("add", "update"):
            sub.add_argument("--value", help="Record value", required=True)
            sub.add_argument(
                "--ttl", type=int, default=900, help="Time to live", required=False
            )
        if cmd in ("update", "remove", "get"):
            sub.add_argument(
                "--match", help="Regex to match record value", required=False
            )
        if cmd == "get":
            sub.add_argument("--parsed", action="store_true", help="Get parsed values")
        else:
            sub.add_argument(
                "-s",
                "--serial-update",
                help="Update SOA serial",
                action="store_true",
                required=False,
                default=False,
            )

    args = parser.parse_args()
    domain = args.domain.lower()
    filename = f"/var/named/{domain}.db"
    if not os.path.exists(filename):
        print(f"Zone file {filename} does not exist.")
        exit(1)

    try:
        zone_file = ZoneFile(filename, domain)
        zone_file.parse(args.warnings)
    except Exception as e:
        print(f"Error loading zone file {filename}: {e}")
        exit(1)

    match args.command:
        case "add":
            result = add_record(
                zone_file, args.subdomain, args.record_type, args.value, args.ttl
            )
        case "update":
            result = update_record(
                zone_file,
                args.subdomain,
                args.record_type,
                args.value,
                args.match,
                args.ttl,
            )
        case "remove":
            result = remove_record(
                zone_file, args.subdomain, args.record_type, args.match
            )
        case "get":
            get_records(
                zone_file,
                args.subdomain,
                args.record_type,
                args.match,
                parsed=args.parsed,
            )
            return

    if result:
        if args.serial_update:
            zone_file.bump_soa_serial()
        zone_file.save()


if __name__ == "__main__":
    main()