HEX
Server: Apache/2.4.65 (Unix) OpenSSL/1.1.1k
System: Linux vps109042.inmotionhosting.com 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: cisa (1010)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/imh-cwp-ded/setup_user.py
#!/usr/lib/imh-cwp-ded/venv/bin/python3
"""Create CWP user"""

import base64
import http.client as http_client
import json
import logging
import secrets
import socket
import string
from pathlib import Path
from time import sleep
import yaml
import rads
from provision import run_cmd, cwp_mysql, LOCALDIR

import click
import requests

CWP_API_DEBUG = False
CWP_API_RETRY_COUNT = 10
CWP_API_RETRY_DELAY = 10
PASSWORD_ALLOWED_CHARS = string.digits + string.ascii_letters


def get_logger():
    """Get the 'provision' logger"""
    return logging.getLogger("provision")


def cwp_api_request(host: str, endpoint: str, data: dict[str, str]):
    """make a request to cwp api v1"""
    url = f"https://{host}:2304/v1/{endpoint}"
    logger = get_logger()
    if CWP_API_DEBUG:
        http_client.HTTPConnection.debuglevel = 1
        logging.basicConfig()
        logging.getLogger().setLevel(logging.DEBUG)
        requests_log = logging.getLogger("requests.packages.urllib3")
        requests_log.setLevel(logging.DEBUG)
        requests_log.propagate = True
        logger.info("API Request to %s data %s", url, data)
    else:
        if hasattr(requests.packages, 'urllib3'):
            requests.packages.urllib3.disable_warnings()

    retry_count = 0
    while retry_count < CWP_API_RETRY_COUNT:
        try:
            session = requests.Session()
            session.mount(
                "https://", requests.adapters.HTTPAdapter(max_retries=10)
            )

            result = session.post(url, data=data, verify=False)
            # Throw an exception if the status is invalid
            result.raise_for_status()
        except requests.RequestException:
            retry_count += 1
            if retry_count < CWP_API_RETRY_COUNT:
                logger.warning(
                    "API call to %s failed %s times, sleeping for %s "
                    "seconds then trying again.",
                    url,
                    retry_count,
                    CWP_API_RETRY_DELAY,
                )
                sleep(CWP_API_RETRY_DELAY)
                continue
        break


def cwp_login_request(host: str, username: str, password: str):
    """Perform a login request"""
    url = f"https://{host}:2083/index.php?acc=validate"
    data = {
        "username": username,
        "password": base64.b64encode(bytes(password, "ascii")).decode("utf-8"),
        "sessioning": 0,
        "userlang": "",
    }

    logger = get_logger()
    if CWP_API_DEBUG:
        http_client.HTTPConnection.debuglevel = 1
        logging.basicConfig()
        logging.getLogger().setLevel(logging.DEBUG)
        requests_log = logging.getLogger("requests.packages.urllib3")
        requests_log.setLevel(logging.DEBUG)
        requests_log.propagate = True
        logger.info("API Request to %s data %s", url, data)
    else:
        if hasattr(requests.packages, 'urllib3'):
            requests.packages.urllib3.disable_warnings()

    retry_count = 0
    while retry_count < CWP_API_RETRY_COUNT:
        try:
            session = requests.Session()
            session.mount(
                "https://", requests.adapters.HTTPAdapter(max_retries=10)
            )
            result = session.post(url, data=data, verify=False)
            # Throw an exception if the status is invalid
            result.raise_for_status()
        except requests.RequestException:
            retry_count += 1
            if retry_count < CWP_API_RETRY_COUNT:
                logger.warning(
                    "API call to %s failed %s times, sleeping for %s seconds "
                    "then trying again.",
                    url,
                    retry_count,
                    CWP_API_RETRY_DELAY,
                )
                sleep(CWP_API_RETRY_DELAY)
                continue
            raise
        break


def write_line_to_file(path: str, line: str, truncate: bool = True):
    """write or append to a file"""
    p = Path(path)
    p.parent.mkdir(exist_ok=True, parents=True)
    p.touch(exist_ok=True)
    p.chmod(0o600)
    with p.open("w" if truncate else "a", encoding='utf-8') as file:
        file.write(line)


def random_password(length: int) -> str:
    """Generate a password"""
    return "".join(
        secrets.choice(PASSWORD_ALLOWED_CHARS) for i in range(length)
    )


def update_mysql_pw() -> str:
    """Change MySQL root password"""
    mysqlpw = random_password(16)
    run_cmd(
        "/usr/local/cwpsrv/htdocs/resources/scripts/mysql_pwd_reset",
        input_str=mysqlpw,
    )
    return mysqlpw


def update_root_pw() -> str:
    """Change root password"""
    root_pw = random_password(16)
    run_cmd("/usr/bin/passwd", "root", "--stdin", input_str=root_pw)
    return root_pw


def add_api_key() -> str:
    """Setup root api keys"""
    apikey = random_password(72)
    apientry = CWP_API_TOKEN.format(apikey)
    Path("/root/.conf/").mkdir(exist_ok=True)
    write_line_to_file("/root/.conf/apikey", apientry)
    Path("/usr/local/cwp/.conf/").mkdir(exist_ok=True, parents=True)
    write_line_to_file("/usr/local/cwp/.conf/api_key.conf", apikey)
    # Install the API httpd config into cwpsrv
    run_cmd("/scripts/install_api")
    cwpapikey = Path("/opt/cwp/cwpapikeys")
    cwpapikey.parent.mkdir(exist_ok=True, parents=True)
    cwpapikey.touch(exist_ok=True)
    cwpapikey.chmod(0o600)

    with cwpapikey.open("a", encoding='utf-8') as file:
        file.write(f"{apikey}\n")
    return apikey


def mail_user(email: str, hostname: str, username: str, password: str):
    """Send user username/password info in an email"""
    mailbody = f"""Hello {username},
    Your CWP installation has completed.

You can log in to the user panel at https://{hostname}:2083

Your username is: {username}
Your password is: {password}"""
    try:
        rads.send_email(
            to_addr=email,
            subject=f"CWP Setup on {hostname}",
            body=mailbody,
            errs=True,
        )
    except OSError as exc:
        get_logger().warning("Failed to send email: %s", exc)


def update_backups():
    """Setup account backups"""
    # set default backups settings
    cwp_mysql(QUERIES['update_backups'])


def create_user(
    hostname: str,
    apikey: str,
    domain: str,
    username: str,
    userpw: str,
    email: str,
    ip: str,
):
    """Create CWP user"""
    # https://docs.control-webpanel.com/docs/developer-tools/api-manager/functions/function-api-account/add
    data = {
        "key": apikey,
        "action": "add",
        "domain": domain,
        "user": username,
        "pass": userpw,
        "email": email,
        "package": "Primary Reseller",
        "inode": "0",
        "limit_nproc": "450",
        "limit_nofile": "150",
        "server_ips": ip,
    }
    cwp_api_request(ip, "account", data)

    # update to force package to take
    data['banckup'] = 'on'  # the docs say this is the key exactly
    data['action'] = 'udp'  # 'update' action
    cwp_api_request(ip, "account", data)

    cwp_login_request(hostname, username, userpw)
    cwp_mysql("UPDATE user SET reseller='1' WHERE id=1")
    run_cmd("/usr/bin/chsh", "-s", "/bin/bash", username)


def update_email_alerts(email: str, hostname: str):
    """Set the admin alert email address"""
    cwp_mysql("UPDATE settings set root_email=%s", (email,))
    run_cmd(
        "sed",
        "-i",
        f's#LF_ALERT_TO = ""#LF_ALERT_TO = "{email}"#',
        "/etc/csf/csf.conf",
    )
    notif_dir = Path(
        "/usr/local/cwpsrv/htdocs/resources/admin/include/libs/notifications/"
    )
    notif_config = notif_dir / "config.ini"

    notif_config.write_text(
        "\n".join(
            [
                f"email={email}",
                f"from_email=notifications@{hostname}",
                "email_info=0",
                "email_warning=0",
                "email_danger=1",
            ]
        )
    )
    notif_template = notif_dir / "template.tpl"
    notif_template.write_text(
        """You've received a new %level% notification: %subject%


                                            %message%


                                            %url"""
    )


@click.command()
@click.option(
    "--vps", default=False, is_flag=True, help="True if provisioning on VPS"
)
@click.option("--debug", default=False, type=int, help="Enable debug")
@click.option("--username", required=True, help="Username to provision")
@click.option("--domain", required=True, help="Domain to provision")
@click.option("--email", required=True, help="Username to provision")
def main(debug: int, username: str, domain: str, email: str, vps: str):
    """
    CWP provisioning script to generate first user
    """
    if debug > 1:
        global CWP_API_DEBUG  # pylint:disable=global-statement
        CWP_API_DEBUG = True
    domain = domain.lower()
    username = username.lower()
    apikey = add_api_key()
    userpw = random_password(16)
    mysqlpw = update_mysql_pw()
    rootpw = update_root_pw()
    hostname = socket.gethostname()
    ip = run_cmd("/usr/bin/hostname", "-I").replace("127.0.0.1", "").strip()
    reseller_pkg_ct = cwp_mysql(QUERIES['check_reseller_package'])[0][0]
    if reseller_pkg_ct < 1:
        raise RuntimeError(
            "The Primary Reseller package wasn't found. "
            "/opt/cwprads/post-provision-settings.py normally sets this up. "
            "This user will have a bad time if that hasn't run"
        )

    update_backups()
    create_user(
        hostname,
        apikey,
        domain,
        username,
        userpw,
        email,
        ip,
    )
    update_email_alerts(email, hostname)
    if not vps:
        mail_user(email, hostname, username, userpw)

    ret = {
        "username": username,
        "userpw": userpw,
        "rootpw": rootpw,
        "mysqlpw": mysqlpw,
        "ip": ip,
        "hostname": hostname,
        "panelurl": f"https://{hostname}:2087/",
    }
    print(json.dumps(ret, indent=2))


def read_constants() -> tuple[str, dict[str, str]]:
    """Read constants.yaml"""
    with open(LOCALDIR / "constants.yaml", encoding='utf-8') as file:
        data = yaml.load(file, Loader=yaml.SafeLoader)
    assert isinstance(data, dict)
    token_perms = ",".join(data["api_perms"])
    token = f"rootaccess|@|%|@|{{}}|@|{token_perms}|@|JSON|@|1|@|root"
    return token, data['queries']


CWP_API_TOKEN, QUERIES = read_constants()


if __name__ == "__main__":
    # @click handles args
    main()  # pylint:disable=no-value-for-parameter