File: //opt/imh-cwp-ded/setup_user.py
#!/usr/lib/imh-cwp-ded/venv/bin/python3
"""Create CWP user"""
import base64
import http.client as http_client
import json
import logging
import secrets
import socket
import string
from pathlib import Path
from time import sleep
import yaml
import rads
from provision import run_cmd, cwp_mysql, LOCALDIR
import click
import requests
CWP_API_DEBUG = False
CWP_API_RETRY_COUNT = 10
CWP_API_RETRY_DELAY = 10
PASSWORD_ALLOWED_CHARS = string.digits + string.ascii_letters
def get_logger():
"""Get the 'provision' logger"""
return logging.getLogger("provision")
def cwp_api_request(host: str, endpoint: str, data: dict[str, str]):
"""make a request to cwp api v1"""
url = f"https://{host}:2304/v1/{endpoint}"
logger = get_logger()
if CWP_API_DEBUG:
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
logger.info("API Request to %s data %s", url, data)
else:
if hasattr(requests.packages, 'urllib3'):
requests.packages.urllib3.disable_warnings()
retry_count = 0
while retry_count < CWP_API_RETRY_COUNT:
try:
session = requests.Session()
session.mount(
"https://", requests.adapters.HTTPAdapter(max_retries=10)
)
result = session.post(url, data=data, verify=False)
# Throw an exception if the status is invalid
result.raise_for_status()
except requests.RequestException:
retry_count += 1
if retry_count < CWP_API_RETRY_COUNT:
logger.warning(
"API call to %s failed %s times, sleeping for %s "
"seconds then trying again.",
url,
retry_count,
CWP_API_RETRY_DELAY,
)
sleep(CWP_API_RETRY_DELAY)
continue
break
def cwp_login_request(host: str, username: str, password: str):
"""Perform a login request"""
url = f"https://{host}:2083/index.php?acc=validate"
data = {
"username": username,
"password": base64.b64encode(bytes(password, "ascii")).decode("utf-8"),
"sessioning": 0,
"userlang": "",
}
logger = get_logger()
if CWP_API_DEBUG:
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
logger.info("API Request to %s data %s", url, data)
else:
if hasattr(requests.packages, 'urllib3'):
requests.packages.urllib3.disable_warnings()
retry_count = 0
while retry_count < CWP_API_RETRY_COUNT:
try:
session = requests.Session()
session.mount(
"https://", requests.adapters.HTTPAdapter(max_retries=10)
)
result = session.post(url, data=data, verify=False)
# Throw an exception if the status is invalid
result.raise_for_status()
except requests.RequestException:
retry_count += 1
if retry_count < CWP_API_RETRY_COUNT:
logger.warning(
"API call to %s failed %s times, sleeping for %s seconds "
"then trying again.",
url,
retry_count,
CWP_API_RETRY_DELAY,
)
sleep(CWP_API_RETRY_DELAY)
continue
raise
break
def write_line_to_file(path: str, line: str, truncate: bool = True):
"""write or append to a file"""
p = Path(path)
p.parent.mkdir(exist_ok=True, parents=True)
p.touch(exist_ok=True)
p.chmod(0o600)
with p.open("w" if truncate else "a", encoding='utf-8') as file:
file.write(line)
def random_password(length: int) -> str:
"""Generate a password"""
return "".join(
secrets.choice(PASSWORD_ALLOWED_CHARS) for i in range(length)
)
def update_mysql_pw() -> str:
"""Change MySQL root password"""
mysqlpw = random_password(16)
run_cmd(
"/usr/local/cwpsrv/htdocs/resources/scripts/mysql_pwd_reset",
input_str=mysqlpw,
)
return mysqlpw
def update_root_pw() -> str:
"""Change root password"""
root_pw = random_password(16)
run_cmd("/usr/bin/passwd", "root", "--stdin", input_str=root_pw)
return root_pw
def add_api_key() -> str:
"""Setup root api keys"""
apikey = random_password(72)
apientry = CWP_API_TOKEN.format(apikey)
Path("/root/.conf/").mkdir(exist_ok=True)
write_line_to_file("/root/.conf/apikey", apientry)
Path("/usr/local/cwp/.conf/").mkdir(exist_ok=True, parents=True)
write_line_to_file("/usr/local/cwp/.conf/api_key.conf", apikey)
# Install the API httpd config into cwpsrv
run_cmd("/scripts/install_api")
cwpapikey = Path("/opt/cwp/cwpapikeys")
cwpapikey.parent.mkdir(exist_ok=True, parents=True)
cwpapikey.touch(exist_ok=True)
cwpapikey.chmod(0o600)
with cwpapikey.open("a", encoding='utf-8') as file:
file.write(f"{apikey}\n")
return apikey
def mail_user(email: str, hostname: str, username: str, password: str):
"""Send user username/password info in an email"""
mailbody = f"""Hello {username},
Your CWP installation has completed.
You can log in to the user panel at https://{hostname}:2083
Your username is: {username}
Your password is: {password}"""
try:
rads.send_email(
to_addr=email,
subject=f"CWP Setup on {hostname}",
body=mailbody,
errs=True,
)
except OSError as exc:
get_logger().warning("Failed to send email: %s", exc)
def update_backups():
"""Setup account backups"""
# set default backups settings
cwp_mysql(QUERIES['update_backups'])
def create_user(
hostname: str,
apikey: str,
domain: str,
username: str,
userpw: str,
email: str,
ip: str,
):
"""Create CWP user"""
# https://docs.control-webpanel.com/docs/developer-tools/api-manager/functions/function-api-account/add
data = {
"key": apikey,
"action": "add",
"domain": domain,
"user": username,
"pass": userpw,
"email": email,
"package": "Primary Reseller",
"inode": "0",
"limit_nproc": "450",
"limit_nofile": "150",
"server_ips": ip,
}
cwp_api_request(ip, "account", data)
# update to force package to take
data['banckup'] = 'on' # the docs say this is the key exactly
data['action'] = 'udp' # 'update' action
cwp_api_request(ip, "account", data)
cwp_login_request(hostname, username, userpw)
cwp_mysql("UPDATE user SET reseller='1' WHERE id=1")
run_cmd("/usr/bin/chsh", "-s", "/bin/bash", username)
def update_email_alerts(email: str, hostname: str):
"""Set the admin alert email address"""
cwp_mysql("UPDATE settings set root_email=%s", (email,))
run_cmd(
"sed",
"-i",
f's#LF_ALERT_TO = ""#LF_ALERT_TO = "{email}"#',
"/etc/csf/csf.conf",
)
notif_dir = Path(
"/usr/local/cwpsrv/htdocs/resources/admin/include/libs/notifications/"
)
notif_config = notif_dir / "config.ini"
notif_config.write_text(
"\n".join(
[
f"email={email}",
f"from_email=notifications@{hostname}",
"email_info=0",
"email_warning=0",
"email_danger=1",
]
)
)
notif_template = notif_dir / "template.tpl"
notif_template.write_text(
"""You've received a new %level% notification: %subject%
%message%
%url"""
)
@click.command()
@click.option(
"--vps", default=False, is_flag=True, help="True if provisioning on VPS"
)
@click.option("--debug", default=False, type=int, help="Enable debug")
@click.option("--username", required=True, help="Username to provision")
@click.option("--domain", required=True, help="Domain to provision")
@click.option("--email", required=True, help="Username to provision")
def main(debug: int, username: str, domain: str, email: str, vps: str):
"""
CWP provisioning script to generate first user
"""
if debug > 1:
global CWP_API_DEBUG # pylint:disable=global-statement
CWP_API_DEBUG = True
domain = domain.lower()
username = username.lower()
apikey = add_api_key()
userpw = random_password(16)
mysqlpw = update_mysql_pw()
rootpw = update_root_pw()
hostname = socket.gethostname()
ip = run_cmd("/usr/bin/hostname", "-I").replace("127.0.0.1", "").strip()
reseller_pkg_ct = cwp_mysql(QUERIES['check_reseller_package'])[0][0]
if reseller_pkg_ct < 1:
raise RuntimeError(
"The Primary Reseller package wasn't found. "
"/opt/cwprads/post-provision-settings.py normally sets this up. "
"This user will have a bad time if that hasn't run"
)
update_backups()
create_user(
hostname,
apikey,
domain,
username,
userpw,
email,
ip,
)
update_email_alerts(email, hostname)
if not vps:
mail_user(email, hostname, username, userpw)
ret = {
"username": username,
"userpw": userpw,
"rootpw": rootpw,
"mysqlpw": mysqlpw,
"ip": ip,
"hostname": hostname,
"panelurl": f"https://{hostname}:2087/",
}
print(json.dumps(ret, indent=2))
def read_constants() -> tuple[str, dict[str, str]]:
"""Read constants.yaml"""
with open(LOCALDIR / "constants.yaml", encoding='utf-8') as file:
data = yaml.load(file, Loader=yaml.SafeLoader)
assert isinstance(data, dict)
token_perms = ",".join(data["api_perms"])
token = f"rootaccess|@|%|@|{{}}|@|{token_perms}|@|JSON|@|1|@|root"
return token, data['queries']
CWP_API_TOKEN, QUERIES = read_constants()
if __name__ == "__main__":
# @click handles args
main() # pylint:disable=no-value-for-parameter