HEX
Server: Apache/2.4.65 (Unix) OpenSSL/1.1.1k
System: Linux vps109042.inmotionhosting.com 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: cisa (1010)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/cwprads/cwp_front_api.py
#!/usr/lib/cwprads/venv/bin/python3
"""
Module used for authenticated cwp frontend actions
"""
import sys
import re
import subprocess
import secrets
import socket
from contextlib import contextmanager
import requests
import urllib3

urllib3.disable_warnings()


def run(cmd):
    """Runs a command in a subprocess"""
    result = subprocess.run(
        cmd, shell=True, capture_output=True, encoding="utf-8", check=False
    )
    return result


def get_hash():
    """get root pw hash"""
    get_hash = run("getent shadow root")
    if get_hash.returncode != 0:
        print("Failed to fetch root hash")
        print(get_hash.stderr)
        sys.exit(1)

    assert "root:" in get_hash.stdout
    return get_hash.stdout.split(":")[1]


def at_job(cmd):
    result = subprocess.run(
        args=["at", "-M", "now", "+", "5", "minutes"],
        input=cmd,
        capture_output=True,
        encoding="utf-8",
        check=False,
    )
    if result.returncode == 0:
        # at returns jid in std
        for line in result.stderr.splitlines():
            if line.startswith("job"):
                jid = int(line.split()[1])
                return int(jid)
    else:
        print("Warning: unable to schedule password restore.")
        return None


def temp_root():
    """
    Temporarily change root password.
    Pause LFD.
    Configure 'at' job to resume in case of script failure
    """

    backup = get_hash()
    new_pass = secrets.token_urlsafe(24)

    run(f"echo 'root:{new_pass}' | chpasswd -c SHA512 -s 0")
    new_hash = get_hash()
    restore_cmd = (
        f"(echo 'root:{backup}' | chpasswd -e); "
        "(pgrep lfd | xargs kill -SIGCONT)"
    )
    # grep for new hash in case the user changed it, we don't want to
    # overwrite that.
    at_jid = at_job(f"grep -qF '{new_hash}' && {restore_cmd}")

    if isinstance(at_jid, int):
        restore_cmd = f"at -r {at_jid}; {restore_cmd}"

    # pause LFD to disable alerts
    run("(pgrep lfd | xargs kill -SIGSTOP)")
    return {"pass": new_pass, "restore": restore_cmd, "jid": at_jid}


@contextmanager
def login():
    """
    Temporarily change root password.
    Pause LFD.
    Configure at job to resume in case of script failure

    Return authenticated requests.Session, admin_url in tuple.
    """
    temproot = temp_root()
    password = temproot["pass"]
    try:
        sess = requests.Session()
        sess.verify = False
        url = f"https://{socket.gethostname()}:2087"
        response = sess.post(
            f"{url}/login/index.php",
            data=dict(username="root", password=password, commit="Login"),
        )
        if response.status_code != 200:
            print(f"Failed to login: HTTP {response.status_code} returned")
            return None

        if "/admin" not in response.url:
            raise RuntimeError("Session login didn't return admin URL")

        cpsess_url = re.search(r"^(.+/cwp_[^/]+/admin)", response.url)
        if not cpsess_url:
            raise RuntimeError("Admin url doesn't look valid")

        yield (sess, cpsess_url.group(1))
    finally:
        run(temproot["restore"])
        print("Restored original password")