HEX
Server: Apache/2.4.65 (Unix) OpenSSL/1.1.1k
System: Linux vps109042.inmotionhosting.com 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: cisa (1010)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/imh-cwp-ded/provision.py
#!/usr/lib/imh-cwp-ded/venv/bin/python3
"""Provision a CWP server"""

import platform
import re
import shlex
from subprocess import run, CalledProcessError
from time import sleep
import logging
import socket
from pathlib import Path
import json
import click
import pymysql
import yaml

# pylint:disable=line-too-long

NOTIFICATION_REMOVE_LIST = [
    "CSF/LFD Firewall - SECURITY ISSUE",
    "Firewall - SECURITY ISSUE",
    "Hidden Processes - SECURITY ISSUE",
    "CWP Secure Kernel - SECURITY ISSUE",
]
NOTIFICATION_JSON_PATH = (
    "/usr/local/cwpsrv/htdocs/resources/admin"
    "/include/libs/notifications/notifications.json"
)

LOG_DIR = "/var/log/"
LOG_FILE = f"{LOG_DIR}/cwp_provision.log"
LOG_FORMAT = "%(asctime)s:%(levelname)s:%(message)s"

LOCALDIR = Path(__file__).parent.absolute()


def setup_logging():
    """Setup the "provision" named logger"""
    logger = logging.getLogger("provision")
    loghandler = logging.FileHandler(LOG_FILE)
    logformatter = logging.Formatter(LOG_FORMAT)
    loghandler.setFormatter(logformatter)
    logger.addHandler(loghandler)
    # stderr
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)


def run_cmd(*args, input_str: str | None = None, required: bool = True) -> str:
    """run a subprocess, logging any errors"""
    try:
        ran = run(
            list(map(str, args)),
            capture_output=True,
            input=input_str,
            encoding='utf-8',
            errors='replace',
            check=True,
        )
    except CalledProcessError as exc:
        command = shlex.join(exc.cmd)
        logger = logging.getLogger("provision")
        logger.warning("Process call failed with error code %d", exc.returncode)
        logger.warning("Command: %s", command)
        logger.warning("stdout: %s", exc.stdout)
        logger.warning("stderr: %s", exc.stderr)
        if required:
            raise
        logger.warning("Command %s failed, but we will continue", command)
        return exc.stdout
    return ran.stdout


def configure_firewall(vps: bool, enable: bool):
    """Setup CSF"""
    with open(LOCALDIR / "csf.yaml", encoding='utf-8') as file:
        whitelist = yaml.load(file, Loader=yaml.SafeLoader)['whitelist']
    with open('/etc/csf/downloadservers', 'w', encoding='utf-8') as file:
        file.write("download-csf.imhadmin.net\n")
    ips: dict[str, str] = whitelist['all']
    if not vps:
        ips.update(whitelist['ded'])

    allow_file = Path("/etc/csf/csf.allow")
    allow_file_text = allow_file.read_text(encoding='utf-8')
    lines = []
    for addr, comment in ips.items():
        if re.compile(r'\b' + re.escape(addr) + r'\b').search(allow_file_text):
            continue  # already whitelisted
        lines.append(f"{addr} # inmotion - {comment}\n")

    if lines:
        with allow_file.open("a", encoding='utf-8') as file:
            file.write("\n")
            file.writelines(lines)

    csf_err = Path("/etc/csf/csf.error")
    if csf_err.is_file():
        csf_err.unlink()

    if enable:
        run_cmd("/usr/sbin/csf", "-e", required=False)
    run_cmd("/usr/sbin/csf", "-r", required=False)


def install_cwp(version: str):
    """Run install_cwp.sh"""
    run_cmd(f"{LOCALDIR}/install_cwp.sh", version, required=True)


def cwp_mysql(query: str, args: tuple | None = None):
    """Runs a MySQL query on the root_cwp database"""
    with pymysql.connect(
        read_default_file='/root/.my.cnf', database='root_cwp'
    ) as conn:
        with conn.cursor() as cur:
            cur.execute(query, args)
            return cur.fetchall()


def set_ns(ns1: str, ns2: str):
    """Setup nameservers in the root_cwp database"""
    ns1_ip = socket.gethostbyname(ns1)
    ns2_ip = socket.gethostbyname(ns2)
    cwp_mysql(
        "UPDATE nameserver SET ns1_name=%s, ns1_ip=%s, ns2_name=%s, ns2_ip=%s",
        (ns1, ns1_ip, ns2, ns2_ip),
    )


def brand_install(ns1: str, ns2: str):
    """Setup branding"""
    set_ns(ns1, ns2)
    # install dnsadmin
    run_cmd("/usr/bin/yum", "install", "imh-cwp-dns", "-y")

    # disable terminal
    path = "/usr/local/cwpsrv/htdocs/admin/design/css/custom.css"
    with open(path, "a", encoding='utf-8') as file:
        file.write(
            "li.dropdown { display: none !important; }' >>  "
            "# Disable 'Terminal' option in cwp"
        )


def remove_notifications():
    """Remove items in NOTIFICATION_REMOVE_LIST FROM NOTIFICATION_JSON_PATH"""
    with open(NOTIFICATION_JSON_PATH, "r+", encoding='ascii') as file:
        notifications = json.load(file)
        notifications = [
            notification
            for notification in notifications
            if notification["subject"] not in NOTIFICATION_REMOVE_LIST
        ]
        json.dumps(notifications)
        file.seek(0)
        file.write(json.dumps(notifications))
        file.truncate()


def fix_sendmail_symlink():
    """Symlink /usr/sbin/sendmail to /usr/sbin/sendmail.postfix"""
    if not Path("/usr/sbin/sendmail").exists():
        run_cmd("ln", "-s", "/usr/sbin/sendmail.postfix", "/usr/sbin/sendmail")


def renew_service_ssl(hostname: str):
    """Setup Service SSL"""
    try:
        run_cmd("/scripts/install_acme")
        run_cmd("/scripts/generate_hostname_ssl")
        # fmt: off
        run_cmd(
            "/usr/bin/bash", "/root/.acme.sh/acme.sh",
            "--issue",
            "--cert-home", "/root/.acme.sh/cwp_certs",
            "-d", hostname,
            "-w", "/usr/local/apache/autossl_tmp/",
            "--certpath", "/etc/pki/tls/certs/hostname.cert",
            "--keypath", "/etc/pki/tls/private/hostname.key",
            "--fullchainpath", "/etc/pki/tls/certs/hostname.bundle",
            "--keylength", "2048",
            "--force",
            "--renew-hook", "/scripts/hostname_ssl_restart_services",
            "--log",
        )
        # fmt: on
        run_cmd("/usr/bin/systemctl", "restart", "cwpsrv")
    except CalledProcessError:
        logger = logging.getLogger("provision")
        logger.warning("Setting Service SSL did not immediately succeed.")


def enable_monit():
    """Run setup_monit.sh"""
    run_cmd("/usr/bin/bash", "/opt/cwprads/setup_monit.sh")


def tune_config(vps: bool, version: str):
    """Run tune-config.sh"""
    plat = "vps" if vps else "dedicated"
    # make extra adjustments to system config before handoff
    run_cmd("/usr/bin/bash", str(LOCALDIR / "tune-config.sh"), plat, version)


def install_account_hooks_package():
    """Install imh-cwp-account-hooks"""
    # install imh-cwp-account-hooks
    run_cmd("/usr/bin/yum", "install", "imh-cwp-account-hooks", "-y")


@click.group()
def cli():
    """Placeholder"""


@cli.command()
@click.option(
    "--vps", default=False, is_flag=True, help="True if provisioning on VPS"
)
@click.option("--version", required=True)
@click.option("--ns1", default="ns1.inmotionhosting.com", help="nameserver 1")
@click.option("--ns2", default="ns2.inmotionhosting.com", help="nameserver 2")
@click.option(
    "do_service_ssl",
    "--service-ssl/--no-service-ssl",
    default=True,
    is_flag=True,
    help="Renew service SSL (default)",
)
def cwp(vps: bool, version: str, ns1: str, ns2: str, do_service_ssl: bool):
    """Setup CWP"""
    setup_logging()
    install_cwp(version)
    configure_firewall(vps=vps, enable=True)
    sleep(10)  # Wiggle room for CWP installed services to come up
    brand_install(ns1, ns2)
    install_account_hooks_package()
    enable_monit()
    fix_sendmail_symlink()
    remove_notifications()
    if do_service_ssl:
        renew_service_ssl(platform.node())

    tune_config(vps, version)


@cli.command(help="Renew service SSL")
def service_ssl():
    """Renew service SSL"""
    renew_service_ssl(platform.node())


@cli.command(help="Set default nameservers")
@click.option("--ns1", default="ns1.inmotionhosting.com", help="nameserver 1")
@click.option("--ns2", default="ns2.inmotionhosting.com", help="nameserver 2")
def ns(ns1: str, ns2: str):
    """Set default nameservers"""
    set_ns(ns1, ns2)


if __name__ == "__main__":
    cli()