File: //opt/imh-cwp-dns/zone_cli.py
#!/opt/imh-cwp-dns/venv/bin/python3
'''
A command line tool to manage records in DNS zone files
It assumes the files are located in /var/named
'''
import argparse
import os.path
import json
from zone_interface import ZoneFile
def add_record(zone_file: ZoneFile, subdomain, record_type, value, ttl=900):
record_type = record_type.upper()
if record_type == "TXT":
# Split/escape TXT records if needed
value = zone_file.encode_txt(value)
zone_file.insert_record(subdomain, record_type, value, ttl)
return True
def update_record(
zone_file: ZoneFile, subdomain, record_type, new_value, value_regex=None, ttl=None
):
record_type = record_type.upper()
matches = zone_file.find_record(subdomain, record_type, value_regex)
if not matches:
return add_record(zone_file, subdomain, record_type, new_value, ttl)
if record_type == "TXT":
# Split/escape TXT records if needed
new_value = zone_file.encode_txt(new_value)
for record in matches:
if not zone_file.replace_record_value(record, new_value, ttl):
# Something failed
return False
return True
def remove_record(zone_file: ZoneFile, subdomain, record_type, value_regex=None):
record_type = record_type.upper()
matches = zone_file.find_record(subdomain, record_type, value_regex)
if not matches:
print("No matching records found.")
return False
for record in matches:
if not zone_file.remove_record(record):
# Something failed
return False
return True
def get_records(
zone_file: ZoneFile, subdomain, record_type, value_regex=None, parsed=False
):
record_type = record_type.upper()
matches = zone_file.find_record(subdomain, record_type, value_regex)
result = []
for record in matches:
result.append(
f"{record.subdomain} {record.ttl} IN {record.record_type} {record.value_parsed if parsed else record.value_raw}"
)
print(json.dumps(result, indent=2))
def main():
parser = argparse.ArgumentParser(description="Zone file management tool")
subparsers = parser.add_subparsers(dest="command", required=True)
# Common required arguments for all commands
for cmd, help_text in [
("add", "Add a new zone"),
("update", "Update a possibly existing zone"),
("remove", "Remove a zone"),
("get", "Get zone information"),
]:
sub = subparsers.add_parser(cmd, help=help_text)
sub.add_argument("domain", help="Domain name")
sub.add_argument("subdomain", help="Subdomain, use @ or '${domain}.' for root")
sub.add_argument("record_type", help="Record type (e.g., A, TXT, CNAME)")
sub.add_argument(
"--warnings", help="Show warnings", action="store_true", default=False
)
if cmd in ("add", "update"):
sub.add_argument("--value", help="Record value", required=True)
sub.add_argument(
"--ttl", type=int, default=900, help="Time to live", required=False
)
if cmd in ("update", "remove", "get"):
sub.add_argument(
"--match", help="Regex to match record value", required=False
)
if cmd == "get":
sub.add_argument("--parsed", action="store_true", help="Get parsed values")
else:
sub.add_argument(
"-s",
"--serial-update",
help="Update SOA serial",
action="store_true",
required=False,
default=False,
)
args = parser.parse_args()
domain = args.domain.lower()
filename = f"/var/named/{domain}.db"
if not os.path.exists(filename):
print(f"Zone file {filename} does not exist.")
exit(1)
try:
zone_file = ZoneFile(filename, domain)
zone_file.parse(args.warnings)
except Exception as e:
print(f"Error loading zone file {filename}: {e}")
exit(1)
match args.command:
case "add":
result = add_record(
zone_file, args.subdomain, args.record_type, args.value, args.ttl
)
case "update":
result = update_record(
zone_file,
args.subdomain,
args.record_type,
args.value,
args.match,
args.ttl,
)
case "remove":
result = remove_record(
zone_file, args.subdomain, args.record_type, args.match
)
case "get":
get_records(
zone_file,
args.subdomain,
args.record_type,
args.match,
parsed=args.parsed,
)
return
if result:
if args.serial_update:
zone_file.bump_soa_serial()
zone_file.save()
if __name__ == "__main__":
main()