File: //opt/dedrads/check_hacks
#!/usr/lib/rads/venv/bin/python3
from __future__ import annotations
import argparse
import datetime as dt
import re
import shutil
import subprocess
from collections import Counter
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Iterator, Optional, Sequence
from rads import color
# ----------------------------
# Models / configuration
# ----------------------------
class ServerType(str, Enum):
UBUNTU_CPANEL = "ubuntu+cpanel"
ALMA_CPANEL = "alma+cpanel"
ALMA_CWP = "alma+cwp"
UNKNOWN = "unknown"
@dataclass(frozen=True)
class LogPaths:
sys_log: Path
mail_log: Path
ssh_log: Path
panel_login_log: Path
domlogs_root: Path
@dataclass(frozen=True)
class ServerProfile:
server_type: ServerType
os_id: str
os_pretty: str
panel: str
cpanel_version: Optional[str]
cwp_version: Optional[str]
paths: LogPaths
# ----------------------------
# Low-level helpers
# ----------------------------
#Small wrapper around subprocess to keep error-handling consistent.
class Shell:
@staticmethod
def run(cmd: Sequence[str], timeout: int = 25) -> tuple[int, str, str]:
try:
p = subprocess.run(
list(cmd),
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
return p.returncode, p.stdout, p.stderr
except FileNotFoundError:
return 127, "", color.red(f"Command not found: {cmd[0]}")
except subprocess.TimeoutExpired:
return 124, "", f"Timed out running: {' '.join(cmd)}"
class FileIO:
@staticmethod
def iter_lines(path: Path) -> Iterator[str]:
if not path.exists():
return
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
yield line.rstrip("\n")
@staticmethod
def read_text(path: Path) -> Optional[str]:
try:
if not path.exists():
return None
txt = path.read_text(errors="replace").strip()
return txt or None
except (OSError, UnicodeError):
return None
class Reporter:
def __init__(self, width: int = 89) -> None:
self._sep = color.blue("=") * width
def bar(self) -> None:
print(self._sep)
def header(self, title: str) -> None:
print()
self.bar()
print(title)
self.bar()
def warn(self, msg: str) -> None:
print(f"[WARN] {msg}")
def ok(self, msg: str) -> None:
print(f"[OK] {msg}")
@staticmethod
def top(counter: Counter[str], n: int) -> list[tuple[str, int]]:
return sorted(counter.items(), key=lambda kv: kv[1], reverse=True)[:n]
def print_top(self, title: str, counter: Counter[str], n: int = 5, *, min_count: int = 1) -> None:
items = [(k, v) for k, v in self.top(counter, n) if v >= min_count]
print(title)
self.bar()
if not items:
print(color.green("No results."))
return
width = max(len(k) for k, _ in items)
for k, v in items:
print(f"{k:<{width}} {v}")
# ----------------------------
# Detection
# ----------------------------
class ServerDetector:
"""
Detect OS via /etc/os-release.
Detect cPanel via /usr/local/cpanel/version.
Detect CWP via /usr/local/cwpsrv/htdocs/resources/admin/include/version.php.
"""
def __init__(self) -> None:
self._os_release = Path("/etc/os-release")
self._cpanel_version_file = Path("/usr/local/cpanel/version")
self._cwp_version_file = Path("/usr/local/cwpsrv/htdocs/resources/admin/include/version.php")
self._cwp_version_re = re.compile(r"\b([0-9]+\.[0-9]+\.[0-9]+(?:\.[0-9]+)?)\b")
def detect(self) -> ServerProfile:
os_id, os_pretty = self._detect_os()
cpanel_version = FileIO.read_text(self._cpanel_version_file)
cwp_version = self._detect_cwp_version()
panel = "none"
if cwp_version:
panel = "cwp"
elif cpanel_version:
panel = "cpanel"
paths = self._select_paths(os_id=os_id, panel=panel)
server_type = self._select_server_type(os_id=os_id, panel=panel)
return ServerProfile(
server_type=server_type,
os_id=os_id or "unknown",
os_pretty=os_pretty or (os_id or "Unknown OS"),
panel=panel,
cpanel_version=cpanel_version,
cwp_version=cwp_version,
paths=paths,
)
def _detect_os(self) -> tuple[str, str]:
data: dict[str, str] = {}
if self._os_release.exists():
for line in FileIO.iter_lines(self._os_release):
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip().strip('"')
os_id = (data.get("ID") or "").lower()
os_pretty = data.get("PRETTY_NAME") or data.get("NAME") or ""
return os_id, os_pretty
def _detect_cwp_version(self) -> Optional[str]:
text = FileIO.read_text(self._cwp_version_file)
if not text:
return None
m = self._cwp_version_re.search(text)
return m.group(1) if m else None
def _select_server_type(self, *, os_id: str, panel: str) -> ServerType:
if panel == "cwp":
return ServerType.ALMA_CWP
if panel == "cpanel":
if os_id == "ubuntu":
return ServerType.UBUNTU_CPANEL
if os_id in {"almalinux", "alma"}:
return ServerType.ALMA_CPANEL
return ServerType.UNKNOWN
def _select_paths(self, *, os_id: str, panel: str) -> LogPaths:
"""Return log paths for the detected OS/panel combination."""
if panel == "cwp":
return LogPaths(
sys_log=Path("/var/log/messages"),
mail_log=Path("/var/log/maillog"),
ssh_log=Path("/var/log/secure"),
panel_login_log=Path("/var/log/cwp_client_login.log"),
domlogs_root=Path("/usr/local/apache/domlogs"),
)
if panel == "cpanel":
if os_id == "ubuntu":
return LogPaths(
sys_log=Path("/var/log/syslog"),
mail_log=Path("/var/log/mail.log"),
ssh_log=Path("/var/log/auth.log"),
panel_login_log=Path("/usr/local/cpanel/logs/login_log"),
domlogs_root=Path("/usr/local/apache/domlogs"),
)
return LogPaths(
sys_log=Path("/var/log/messages"),
mail_log=Path("/var/log/maillog"),
ssh_log=Path("/var/log/secure"),
panel_login_log=Path("/usr/local/cpanel/logs/login_log"),
domlogs_root=Path("/usr/local/apache/domlogs"),
)
print(f"Error: No supported panel detected (OS: {os_id}, Panel: {panel}).")
print("This script supports: cPanel on Ubuntu/AlmaLinux, CWP on AlmaLinux.")
raise SystemExit(1)
# ----------------------------
# Checks: processes & networking
# ----------------------------
class ProcessInspector:
def __init__(self, reporter: Reporter) -> None:
self.r = reporter
def _ps_rows(self) -> Iterator[tuple[str, str, str, str]]:
rc, out, err = Shell.run(["ps", "-eo", "user:50=,pid=,ppid=,args="])
if rc != 0:
self.r.warn(f"ps failed: {err.strip()}")
return
for line in out.splitlines():
parts = line.split(None, 3)
if len(parts) < 3:
continue
user, pid, ppid = parts[0], parts[1], parts[2]
args = parts[3] if len(parts) >= 4 else ""
yield user, pid, ppid, args
def run_all(self, *, full: bool) -> None:
self._check_perl_processes()
self._check_daemonized_user_processes()
self._check_shell_bound_scripts()
self._check_network_listeners(full=full)
def _check_perl_processes(self) -> None:
self.r.header(color.yellow("Check for masquerading perl processes (review before killing)"))
pids: list[str] = []
for user, pid, _ppid, args in self._ps_rows() or []:
if user == "nobody" or re.fullmatch(r"[a-z]+[0-9]+", user):
if "perl" in args.lower():
pids.append(pid)
if not pids:
self.r.ok(color.green("No suspicious perl processes found"))
return
for pid in pids:
stat_path = Path("/proc") / pid / "stat"
cmdline_path = Path("/proc") / pid / "cmdline"
try:
stat = stat_path.read_text(errors="replace")
comm = stat.split()[1].strip("()")
cmdline_raw = cmdline_path.read_bytes()
cmdline = cmdline_raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
except OSError:
continue
if comm == "perl" or " perl" in f" {cmdline}".lower():
print(f"PID={pid} COMM={comm} CMD={cmdline}")
print()
def _check_daemonized_user_processes(self) -> None:
self.r.header(color.yellow("Daemonized user processes, review before terminating"))
printed = False
for user, pid, ppid, args in self._ps_rows() or []:
if ppid == "1" and re.fullmatch(r"[a-z]+[0-9]+", user):
if "gam_server" not in args:
print(f"USER={user} PID={pid} PPID={ppid} ARGS={args}")
printed = True
if not printed:
self.r.ok(color.green("No daemonized processes found"))
print()
def _check_shell_bound_scripts(self) -> None:
self.r.header(color.yellow("Scripts bound to a shell review with: lsof -p PID"))
hits = 0
for _user, pid, _ppid, args in self._ps_rows() or []:
low = args.lower()
if "/bin/sh" in low and "grep" not in low and "mysql" not in low:
print(f"PID={pid} ARGS={args}")
hits += 1
if "/bin/bash" in low and "grep" not in low and "/opt/" not in low and "check_hacks" not in low:
print(f"PID={pid} ARGS={args}")
hits += 1
if hits == 0:
self.r.ok(color.green("No suspicious shell-bound scripts found"))
print()
def _check_network_listeners(self, *, full: bool) -> None:
self.r.header(color.yellow("Network listeners"))
if shutil.which("ss"):
rc, out, err = Shell.run(["ss", "-plant"])
if rc != 0:
self.r.warn(f"ss failed: {err.strip()}")
return
found_suspicious = False
for l in out.splitlines():
if "perl" in l.lower():
print(l)
found_suspicious = True
if not found_suspicious:
self.r.ok(color.green("No suspicious Network connections found"))
if full:
print()
print("[--full] ss -plant (LISTEN/ESTAB)")
self.r.bar()
for l in out.splitlines():
if l.startswith("LISTEN") or l.startswith("ESTAB"):
print(l)
return
rc, out, err = Shell.run(["netstat", "-plan"])
if rc != 0:
self.r.warn(f"netstat failed: {err.strip()}")
return
found_suspicious = False
for l in out.splitlines():
if "perl" in l.lower():
print(l)
found_suspicious = True
if not found_suspicious:
self.r.ok(color.green("No suspicious Network connections found"))
if full:
print()
print("[--full] netstat -plan (tcp lines with pid/program)")
self.r.bar()
for l in out.splitlines():
if l.strip().startswith("tcp") and re.search(r"\s[0-9]+/.*$", l):
print(l)
# ----------------------------
# Checks: logs
# ----------------------------
class LogInspector:
"""
Parses:
- SSH failures from auth/secure log
- cPanel failed logins from binary login_log (strings -a)
- Apache domlogs: suspicious paths + top IPs for today's date
"""
def __init__(self, profile: ServerProfile, reporter: Reporter) -> None:
self.p = profile
self.r = reporter
self._ssh_fail_re_1 = re.compile(r"sshd\[\d+\]: Failed password .* from (?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b")
self._ssh_fail_re_2 = re.compile(
r"sshd\[\d+\]: pam_unix\(sshd:auth\): authentication failure;.*\brhost=(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b"
)
self._domlog_ip_re = re.compile(
r"\s(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\s+-\s+-\s+\[(?P<ts>\d{2}/[A-Za-z]{3}/\d{4}):"
)
self._http_method_re = re.compile(r"\"(?P<method>[A-Z]+)\s+(?P<path>/\S*)\s+HTTP/[^\" ]+\"")
self._suspicious_paths = [
re.compile(r"/wp-json/wp/v2/posts/\d+/", re.IGNORECASE),
re.compile(r"/\.env\b", re.IGNORECASE),
re.compile(r"/login\.asp\b", re.IGNORECASE),
re.compile(r"(\bunion\b|\bselect\b|\border\b\s*by\b|%20union|%20select|%20order%20by)", re.IGNORECASE),
]
def run_all(self) -> None:
self._panel_failed_logins()
self._ssh_failures()
self._domlog_summary()
def _panel_failed_logins(self) -> None:
path = self.p.paths.panel_login_log
if self.p.panel == "cpanel":
self.r.header(color.yellow("cPanel failed authentication attempts"))
if not path.exists():
self.r.warn(f"Missing: {path}")
return
if not shutil.which("strings"):
self.r.warn("'strings' not found. Install (binutils) to parse cPanel login_log.")
return
counts = self._parse_cpanel_failed_logins(path)
self.r.print_top(color.yellow("Top 5 IPS that FAILED LOGIN"), counts, n=5)
return
if self.p.panel == "cwp":
self.r.header(color.yellow(f"CWP client logins preview from {path}"))
if not path.exists():
self.r.warn(f"Missing: {path}")
return
lines = list(FileIO.iter_lines(path))
if not lines:
print("No lines found.")
return
print("\n".join(lines[-200:]))
return
self.r.header("Panel login checks skipped (no panel detected)")
def _parse_cpanel_failed_logins(self, login_log: Path) -> Counter[str]:
counts: Counter[str] = Counter()
if not login_log.exists() or not shutil.which("strings"):
return counts
rc, out, _err = Shell.run(["strings", "-a", str(login_log)], timeout=60)
if rc != 0:
return counts
for line in out.splitlines():
if "FAILED LOGIN" not in line.upper():
continue
parts = line.split()
key = " ".join(parts[:3]) if len(parts) >= 3 else line.strip()
if key:
counts[key] += 1
return counts
def _ssh_failures(self) -> None:
path = self.p.paths.ssh_log
self.r.header(color.yellow(f"SSH failures from {path} (top offenders > 5 failures)"))
if not path.exists():
self.r.warn(f"Missing: {path}")
return
counts = self._parse_ssh_failures(path)
filtered = Counter({ip: c for ip, c in counts.items() if c > 5})
self.r.print_top(color.yellow("Top 5 SSH offender IPs (failures > 5)"), filtered, n=5, min_count=6)
def _parse_ssh_failures(self, path: Path) -> Counter[str]:
counts: Counter[str] = Counter()
for line in FileIO.iter_lines(path):
if "sshd" not in line:
continue
m = self._ssh_fail_re_1.search(line)
if m:
counts[m.group("ip")] += 1
continue
m = self._ssh_fail_re_2.search(line)
if m:
counts[m.group("ip")] += 1
return counts
def _domlog_summary(self) -> None:
root = self.p.paths.domlogs_root
self.r.header(color.yellow("Apache domlogs: suspicious activity + top IPs for today's entries"))
if not root.exists():
self.r.warn(f"Missing: {root}")
return
today_str = self._today_domlog_datestr()
ip_counts_today, suspicious_hits = self._parse_domlogs(root, today_str)
print(color.yellow(f"Top 5 domlog IPs for today ({today_str})"))
self.r.bar()
if ip_counts_today:
for ip, c in self.r.top(ip_counts_today, 5):
print(f"{ip:>15} {c}")
else:
print("No domlog entries counted for today. Verify that domlogs are logging correctly")
print()
print(color.yellow("Suspicious domlog hits"))
self.r.bar()
if suspicious_hits:
for hit in suspicious_hits[:200]:
print(hit)
if len(suspicious_hits) > 200:
print(f"... ({len(suspicious_hits) - 200} more)")
else:
print("No suspicious patterns found.")
def _today_domlog_datestr(self, now: Optional[dt.datetime] = None) -> str:
"""Return today's Apache-style date string (e.g., 12/Jan/2026)."""
if now is None:
now = dt.datetime.now()
return now.strftime("%d/%b/%Y")
def _iter_domlog_files(self, root: Path, *, cutoff_ts: float) -> Iterator[Path]:
Max_size = 100 * 1024 * 1024
def eligible(p: Path) -> bool:
if not p.is_file():
return False
try:
stats = p.stat()
if stats.st_size > Max_size:
return False
return stats.st_mtime >= cutoff_ts
except OSError:
return False
for p in root.glob("*"):
if eligible(p):
yield p
for p in root.glob("*/*"):
if eligible(p):
yield p
def _parse_domlogs(self, root: Path, today_str: str) -> tuple[Counter[str], list[str]]:
ip_counts_today: Counter[str] = Counter()
suspicious_dedupe: set[tuple[str, str]] = set()
suspicious_hits: list[str] = []
cutoff_ts = (dt.datetime.now().timestamp() - 24 * 60 * 60)
for logfile in self._iter_domlog_files(root, cutoff_ts=cutoff_ts):
try:
for line in FileIO.iter_lines(logfile):
m_ip = self._domlog_ip_re.search(line)
if m_ip and m_ip.group("ts") == today_str:
ip_counts_today[m_ip.group("ip")] += 1
m_req = self._http_method_re.search(line)
if not m_req:
continue
method = m_req.group("method")
path = m_req.group("path").lower()
if method == "POST" and '"-"' in line:
m_status = re.search(r"\"\s+(?P<status>\d{3})\s+", line)
if m_status and m_status.group("status").startswith("2") and "cron" not in path:
key = (str(logfile), path)
if key not in suspicious_dedupe:
suspicious_dedupe.add(key)
suspicious_hits.append(f"No-ref-POST\t{logfile}\t{path}")
for rx in self._suspicious_paths:
if rx.search(path):
key = (str(logfile), path)
if key not in suspicious_dedupe:
suspicious_dedupe.add(key)
suspicious_hits.append(f"Suspicious\t{logfile}\t{path}")
break
except Exception:
continue
return ip_counts_today, suspicious_hits
# ----------------------------
# Orchestration
# ----------------------------
class HackCheckerApp:
def __init__(self, profile: ServerProfile) -> None:
self.profile = profile
self.r = Reporter()
self.proc = ProcessInspector(self.r)
self.logs = LogInspector(self.profile, self.r)
def run(self, *, full: bool) -> int:
self._print_banner()
self.proc.run_all(full=full)
self.logs.run_all()
print()
return 0
def _print_banner(self) -> None:
self.r.bar()
print(color.magenta("= Review processes/logs before taking action. ="))
self.r.bar()
print(color.yellow(f"Detected OS: {self.profile.os_pretty} (ID={self.profile.os_id})"))
if self.profile.panel == "cpanel":
print(color.yellow(f"Detected panel: cPanel (version={self.profile.cpanel_version or 'unknown'})"))
elif self.profile.panel == "cwp":
print(color.yellow(f"Detected panel: CWP (version={self.profile.cwp_version or 'unknown'})"))
else:
print(color.yellow("Detected panel: none"))
print(color.yellow(f"Profile: {self.profile.server_type.value}"))
print(color.yellow("Log paths:"))
print(color.yellow(f" sys_log = {self.profile.paths.sys_log}"))
print(color.yellow(f" mail_log = {self.profile.paths.mail_log}"))
print(color.yellow(f" ssh_log = {self.profile.paths.ssh_log}"))
print(color.yellow(f" panel_login = {self.profile.paths.panel_login_log}"))
print(color.yellow(f" domlogs_root = {self.profile.paths.domlogs_root}"))
# ----------------------------
# Entry point
# ----------------------------
def build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Checks logs and processes for common hack/malware indicators across Ubuntu/Alma + cPanel/CWP."
)
p.add_argument("--full", action="store_true", help="Show extra network listener output where applicable.")
return p
def main() -> int:
args = build_argparser().parse_args()
detector = ServerDetector()
profile = detector.detect()
app = HackCheckerApp(profile)
return app.run(full=args.full)
if __name__ == "__main__":
raise SystemExit(main())