File: //opt/cwprads/cwp_front_api.py
#!/usr/lib/cwprads/venv/bin/python3
"""
Module used for authenticated cwp frontend actions
"""
import sys
import re
import subprocess
import secrets
import socket
from contextlib import contextmanager
import requests
import urllib3
urllib3.disable_warnings()
def run(cmd):
"""Runs a command in a subprocess"""
result = subprocess.run(
cmd, shell=True, capture_output=True, encoding="utf-8", check=False
)
return result
def get_hash():
"""get root pw hash"""
get_hash = run("getent shadow root")
if get_hash.returncode != 0:
print("Failed to fetch root hash")
print(get_hash.stderr)
sys.exit(1)
assert "root:" in get_hash.stdout
return get_hash.stdout.split(":")[1]
def at_job(cmd):
result = subprocess.run(
args=["at", "-M", "now", "+", "5", "minutes"],
input=cmd,
capture_output=True,
encoding="utf-8",
check=False,
)
if result.returncode == 0:
# at returns jid in std
for line in result.stderr.splitlines():
if line.startswith("job"):
jid = int(line.split()[1])
return int(jid)
else:
print("Warning: unable to schedule password restore.")
return None
def temp_root():
"""
Temporarily change root password.
Pause LFD.
Configure 'at' job to resume in case of script failure
"""
backup = get_hash()
new_pass = secrets.token_urlsafe(24)
run(f"echo 'root:{new_pass}' | chpasswd -c SHA512 -s 0")
new_hash = get_hash()
restore_cmd = (
f"(echo 'root:{backup}' | chpasswd -e); "
"(pgrep lfd | xargs kill -SIGCONT)"
)
# grep for new hash in case the user changed it, we don't want to
# overwrite that.
at_jid = at_job(f"grep -qF '{new_hash}' && {restore_cmd}")
if isinstance(at_jid, int):
restore_cmd = f"at -r {at_jid}; {restore_cmd}"
# pause LFD to disable alerts
run("(pgrep lfd | xargs kill -SIGSTOP)")
return {"pass": new_pass, "restore": restore_cmd, "jid": at_jid}
@contextmanager
def login():
"""
Temporarily change root password.
Pause LFD.
Configure at job to resume in case of script failure
Return authenticated requests.Session, admin_url in tuple.
"""
temproot = temp_root()
password = temproot["pass"]
try:
sess = requests.Session()
sess.verify = False
url = f"https://{socket.gethostname()}:2087"
response = sess.post(
f"{url}/login/index.php",
data=dict(username="root", password=password, commit="Login"),
)
if response.status_code != 200:
print(f"Failed to login: HTTP {response.status_code} returned")
return None
if "/admin" not in response.url:
raise RuntimeError("Session login didn't return admin URL")
cpsess_url = re.search(r"^(.+/cwp_[^/]+/admin)", response.url)
if not cpsess_url:
raise RuntimeError("Admin url doesn't look valid")
yield (sess, cpsess_url.group(1))
finally:
run(temproot["restore"])
print("Restored original password")