HEX
Server: Apache/2.4.65 (Unix) OpenSSL/1.1.1k
System: Linux vps109042.inmotionhosting.com 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: cisa (1010)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/dedrads/check_hacks
#!/usr/lib/rads/venv/bin/python3

from __future__ import annotations

import argparse
import datetime as dt
import re
import shutil
import subprocess
from collections import Counter
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Iterator, Optional, Sequence
from rads import color


# ----------------------------
# Models / configuration
# ----------------------------

class ServerType(str, Enum):
    UBUNTU_CPANEL = "ubuntu+cpanel"
    ALMA_CPANEL = "alma+cpanel"
    ALMA_CWP = "alma+cwp"
    UNKNOWN = "unknown"


@dataclass(frozen=True)
class LogPaths:
    sys_log: Path
    mail_log: Path
    ssh_log: Path
    panel_login_log: Path
    domlogs_root: Path


@dataclass(frozen=True)
class ServerProfile:
    server_type: ServerType
    os_id: str
    os_pretty: str
    panel: str  
    cpanel_version: Optional[str]
    cwp_version: Optional[str]
    paths: LogPaths


# ----------------------------
# Low-level helpers
# ----------------------------

#Small wrapper around subprocess to keep error-handling consistent.
class Shell:
    

    @staticmethod
    def run(cmd: Sequence[str], timeout: int = 25) -> tuple[int, str, str]:
        try:
            p = subprocess.run(
                list(cmd),
                text=True,
                capture_output=True,
                timeout=timeout,
                check=False,
            )
            return p.returncode, p.stdout, p.stderr
        except FileNotFoundError:
            return 127, "", color.red(f"Command not found: {cmd[0]}")
        except subprocess.TimeoutExpired:
            return 124, "", f"Timed out running: {' '.join(cmd)}"


class FileIO:
    @staticmethod
    def iter_lines(path: Path) -> Iterator[str]:
        if not path.exists():
            return

        with path.open("r", encoding="utf-8", errors="replace") as f:
            for line in f:
                yield line.rstrip("\n")

    @staticmethod
    def read_text(path: Path) -> Optional[str]:
        try:
            if not path.exists():
                return None
            txt = path.read_text(errors="replace").strip()
            return txt or None
        except (OSError, UnicodeError):
            return None


class Reporter:
    def __init__(self, width: int = 89) -> None:
        self._sep = color.blue("=") * width

    def bar(self) -> None:
        print(self._sep)

    def header(self, title: str) -> None:
        print()
        self.bar()
        print(title)
        self.bar()

    def warn(self, msg: str) -> None:
        print(f"[WARN] {msg}")

    def ok(self, msg: str) -> None:
        print(f"[OK] {msg}")

    @staticmethod
    def top(counter: Counter[str], n: int) -> list[tuple[str, int]]:
        return sorted(counter.items(), key=lambda kv: kv[1], reverse=True)[:n]

    def print_top(self, title: str, counter: Counter[str], n: int = 5, *, min_count: int = 1) -> None:
        items = [(k, v) for k, v in self.top(counter, n) if v >= min_count]
        print(title)
        self.bar()
        if not items:
            print(color.green("No results."))
            return
        width = max(len(k) for k, _ in items)
        for k, v in items:
            print(f"{k:<{width}}  {v}")


# ----------------------------
# Detection
# ----------------------------

class ServerDetector:
    """
    Detect OS via /etc/os-release.
    Detect cPanel via /usr/local/cpanel/version.
    Detect CWP via /usr/local/cwpsrv/htdocs/resources/admin/include/version.php.
    """

    def __init__(self) -> None:
        self._os_release = Path("/etc/os-release")
        self._cpanel_version_file = Path("/usr/local/cpanel/version")
        self._cwp_version_file = Path("/usr/local/cwpsrv/htdocs/resources/admin/include/version.php")
        self._cwp_version_re = re.compile(r"\b([0-9]+\.[0-9]+\.[0-9]+(?:\.[0-9]+)?)\b")

    def detect(self) -> ServerProfile:
        os_id, os_pretty = self._detect_os()
        cpanel_version = FileIO.read_text(self._cpanel_version_file)
        cwp_version = self._detect_cwp_version()

        panel = "none"
        if cwp_version:
            panel = "cwp"
        elif cpanel_version:
            panel = "cpanel"

        paths = self._select_paths(os_id=os_id, panel=panel)
        server_type = self._select_server_type(os_id=os_id, panel=panel)

        return ServerProfile(
            server_type=server_type,
            os_id=os_id or "unknown",
            os_pretty=os_pretty or (os_id or "Unknown OS"),
            panel=panel,
            cpanel_version=cpanel_version,
            cwp_version=cwp_version,
            paths=paths,
        )

    def _detect_os(self) -> tuple[str, str]:
        
        data: dict[str, str] = {}
        if self._os_release.exists():
            for line in FileIO.iter_lines(self._os_release):
                if not line or line.startswith("#") or "=" not in line:
                    continue
                k, v = line.split("=", 1)
                data[k.strip()] = v.strip().strip('"')

        os_id = (data.get("ID") or "").lower()
        os_pretty = data.get("PRETTY_NAME") or data.get("NAME") or ""
        return os_id, os_pretty

    def _detect_cwp_version(self) -> Optional[str]:
        text = FileIO.read_text(self._cwp_version_file)
        if not text:
            return None
        m = self._cwp_version_re.search(text)
        return m.group(1) if m else None

    def _select_server_type(self, *, os_id: str, panel: str) -> ServerType:
        if panel == "cwp":
            return ServerType.ALMA_CWP
        if panel == "cpanel":
            if os_id == "ubuntu":
                return ServerType.UBUNTU_CPANEL
            if os_id in {"almalinux", "alma"}:
                return ServerType.ALMA_CPANEL
        return ServerType.UNKNOWN

    def _select_paths(self, *, os_id: str, panel: str) -> LogPaths:
        """Return log paths for the detected OS/panel combination."""
        if panel == "cwp":
            return LogPaths(
                sys_log=Path("/var/log/messages"),
                mail_log=Path("/var/log/maillog"),
                ssh_log=Path("/var/log/secure"),
                panel_login_log=Path("/var/log/cwp_client_login.log"),
                domlogs_root=Path("/usr/local/apache/domlogs"),
            )

        if panel == "cpanel":
            if os_id == "ubuntu":
                return LogPaths(
                    sys_log=Path("/var/log/syslog"),
                    mail_log=Path("/var/log/mail.log"),
                    ssh_log=Path("/var/log/auth.log"),
                    panel_login_log=Path("/usr/local/cpanel/logs/login_log"),
                    domlogs_root=Path("/usr/local/apache/domlogs"),
                )

            return LogPaths(
                sys_log=Path("/var/log/messages"),
                mail_log=Path("/var/log/maillog"),
                ssh_log=Path("/var/log/secure"),
                panel_login_log=Path("/usr/local/cpanel/logs/login_log"),
                domlogs_root=Path("/usr/local/apache/domlogs"),
            )

        print(f"Error: No supported panel detected (OS: {os_id}, Panel: {panel}).")
        print("This script supports: cPanel on Ubuntu/AlmaLinux, CWP on AlmaLinux.")
        raise SystemExit(1)


# ----------------------------
# Checks: processes & networking
# ----------------------------

class ProcessInspector:
    def __init__(self, reporter: Reporter) -> None:
        self.r = reporter


    def _ps_rows(self) -> Iterator[tuple[str, str, str, str]]:
        
        rc, out, err = Shell.run(["ps", "-eo", "user:50=,pid=,ppid=,args="])
        if rc != 0:
            self.r.warn(f"ps failed: {err.strip()}")
            return
        for line in out.splitlines():
            parts = line.split(None, 3)
            if len(parts) < 3:
                continue
            user, pid, ppid = parts[0], parts[1], parts[2]
            args = parts[3] if len(parts) >= 4 else ""
            yield user, pid, ppid, args

    def run_all(self, *, full: bool) -> None:
        self._check_perl_processes()
        self._check_daemonized_user_processes()
        self._check_shell_bound_scripts()
        self._check_network_listeners(full=full)

    def _check_perl_processes(self) -> None:
        self.r.header(color.yellow("Check for masquerading perl processes (review before killing)"))

        pids: list[str] = []
        for user, pid, _ppid, args in self._ps_rows() or []:
            if user == "nobody" or re.fullmatch(r"[a-z]+[0-9]+", user):
                if "perl" in args.lower():
                    pids.append(pid)

        if not pids:
            self.r.ok(color.green("No suspicious perl processes found"))
            return

        for pid in pids:
            stat_path = Path("/proc") / pid / "stat"
            cmdline_path = Path("/proc") / pid / "cmdline"
            try:
                stat = stat_path.read_text(errors="replace")
                comm = stat.split()[1].strip("()")
                cmdline_raw = cmdline_path.read_bytes()
                cmdline = cmdline_raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
            except OSError:
                continue

            if comm == "perl" or " perl" in f" {cmdline}".lower():
                print(f"PID={pid} COMM={comm} CMD={cmdline}")
        print()
        
    def _check_daemonized_user_processes(self) -> None:
        self.r.header(color.yellow("Daemonized user processes, review before terminating"))

        printed = False
        for user, pid, ppid, args in self._ps_rows() or []:
            if ppid == "1" and re.fullmatch(r"[a-z]+[0-9]+", user):
                if "gam_server" not in args:
                    print(f"USER={user} PID={pid} PPID={ppid} ARGS={args}")
                    printed = True

        if not printed:
            self.r.ok(color.green("No daemonized processes found"))
        print()

    def _check_shell_bound_scripts(self) -> None:
        self.r.header(color.yellow("Scripts bound to a shell review with: lsof -p PID"))

        hits = 0
        for _user, pid, _ppid, args in self._ps_rows() or []:
            low = args.lower()
            if "/bin/sh" in low and "grep" not in low and "mysql" not in low:
                print(f"PID={pid} ARGS={args}")
                hits += 1
            if "/bin/bash" in low and "grep" not in low and "/opt/" not in low and "check_hacks" not in low:
                print(f"PID={pid} ARGS={args}")
                hits += 1

        if hits == 0:
            self.r.ok(color.green("No suspicious shell-bound scripts found"))
        print()

    def _check_network_listeners(self, *, full: bool) -> None:
        self.r.header(color.yellow("Network listeners"))
        
        if shutil.which("ss"):
            rc, out, err = Shell.run(["ss", "-plant"])
            if rc != 0:
                self.r.warn(f"ss failed: {err.strip()}")
                return
            
            found_suspicious = False  
            for l in out.splitlines():
                if "perl" in l.lower():
                    print(l)
                    found_suspicious = True  
            
            if not found_suspicious:
                self.r.ok(color.green("No suspicious Network connections found"))

            if full:
                print()
                print("[--full] ss -plant (LISTEN/ESTAB)")
                self.r.bar()
                for l in out.splitlines():
                    if l.startswith("LISTEN") or l.startswith("ESTAB"):
                        print(l)
            return

        rc, out, err = Shell.run(["netstat", "-plan"])
        if rc != 0:
            self.r.warn(f"netstat failed: {err.strip()}")
            return
            
        found_suspicious = False  
        for l in out.splitlines():
            if "perl" in l.lower():
                print(l)
                found_suspicious = True  
        
        if not found_suspicious:
            self.r.ok(color.green("No suspicious Network connections found"))

        if full:
            print()
            print("[--full] netstat -plan (tcp lines with pid/program)")
            self.r.bar()
            for l in out.splitlines():
                if l.strip().startswith("tcp") and re.search(r"\s[0-9]+/.*$", l):
                    print(l)


# ----------------------------
# Checks: logs
# ----------------------------

class LogInspector:
    """
    Parses:
      - SSH failures from auth/secure log
      - cPanel failed logins from binary login_log (strings -a)
      - Apache domlogs: suspicious paths + top IPs for today's date
    """

    def __init__(self, profile: ServerProfile, reporter: Reporter) -> None:
        self.p = profile
        self.r = reporter

        self._ssh_fail_re_1 = re.compile(r"sshd\[\d+\]: Failed password .* from (?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b")
        self._ssh_fail_re_2 = re.compile(
            r"sshd\[\d+\]: pam_unix\(sshd:auth\): authentication failure;.*\brhost=(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b"
        )

        self._domlog_ip_re = re.compile(
            r"\s(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\s+-\s+-\s+\[(?P<ts>\d{2}/[A-Za-z]{3}/\d{4}):"
        )
        self._http_method_re = re.compile(r"\"(?P<method>[A-Z]+)\s+(?P<path>/\S*)\s+HTTP/[^\" ]+\"")
        self._suspicious_paths = [
            re.compile(r"/wp-json/wp/v2/posts/\d+/", re.IGNORECASE),
            re.compile(r"/\.env\b", re.IGNORECASE),
            re.compile(r"/login\.asp\b", re.IGNORECASE),
            re.compile(r"(\bunion\b|\bselect\b|\border\b\s*by\b|%20union|%20select|%20order%20by)", re.IGNORECASE),
        ]

    def run_all(self) -> None:
        self._panel_failed_logins()
        self._ssh_failures()
        self._domlog_summary()

    def _panel_failed_logins(self) -> None:
        path = self.p.paths.panel_login_log

        if self.p.panel == "cpanel":
            self.r.header(color.yellow("cPanel failed authentication attempts"))
            if not path.exists():
                self.r.warn(f"Missing: {path}")
                return
            if not shutil.which("strings"):
                self.r.warn("'strings' not found. Install (binutils) to parse cPanel login_log.")
                return

            counts = self._parse_cpanel_failed_logins(path)
            self.r.print_top(color.yellow("Top 5 IPS that FAILED LOGIN"), counts, n=5)
            return

        if self.p.panel == "cwp":
            self.r.header(color.yellow(f"CWP client logins preview from {path}"))
            if not path.exists():
                self.r.warn(f"Missing: {path}")
                return
            lines = list(FileIO.iter_lines(path))
            if not lines:
                print("No lines found.")
                return
            print("\n".join(lines[-200:]))
            return

        self.r.header("Panel login checks skipped (no panel detected)")

    def _parse_cpanel_failed_logins(self, login_log: Path) -> Counter[str]:
        counts: Counter[str] = Counter()
        if not login_log.exists() or not shutil.which("strings"):
            return counts

        rc, out, _err = Shell.run(["strings", "-a", str(login_log)], timeout=60)
        if rc != 0:
            return counts

        for line in out.splitlines():
            if "FAILED LOGIN" not in line.upper():
                continue
            parts = line.split()
            key = " ".join(parts[:3]) if len(parts) >= 3 else line.strip()
            if key:
                counts[key] += 1
        return counts

    def _ssh_failures(self) -> None:
        path = self.p.paths.ssh_log
        self.r.header(color.yellow(f"SSH failures from {path} (top offenders > 5 failures)"))
        if not path.exists():
            self.r.warn(f"Missing: {path}")
            return

        counts = self._parse_ssh_failures(path)
        filtered = Counter({ip: c for ip, c in counts.items() if c > 5})
        self.r.print_top(color.yellow("Top 5 SSH offender IPs (failures > 5)"), filtered, n=5, min_count=6)

    def _parse_ssh_failures(self, path: Path) -> Counter[str]:
        counts: Counter[str] = Counter()
        for line in FileIO.iter_lines(path):
            if "sshd" not in line:
                continue
            m = self._ssh_fail_re_1.search(line)
            if m:
                counts[m.group("ip")] += 1
                continue
            m = self._ssh_fail_re_2.search(line)
            if m:
                counts[m.group("ip")] += 1
        return counts

    def _domlog_summary(self) -> None:
        root = self.p.paths.domlogs_root
        self.r.header(color.yellow("Apache domlogs: suspicious activity + top IPs for today's entries"))
        if not root.exists():
            self.r.warn(f"Missing: {root}")
            return

        today_str = self._today_domlog_datestr()
        ip_counts_today, suspicious_hits = self._parse_domlogs(root, today_str)

        print(color.yellow(f"Top 5 domlog IPs for today ({today_str})"))
        self.r.bar()
        if ip_counts_today:
            for ip, c in self.r.top(ip_counts_today, 5):
                print(f"{ip:>15}  {c}")
        else:
            print("No domlog entries counted for today. Verify that domlogs are logging correctly")

        print()
        print(color.yellow("Suspicious domlog hits"))
        self.r.bar()
        if suspicious_hits:
            for hit in suspicious_hits[:200]:
                print(hit)
            if len(suspicious_hits) > 200:
                print(f"... ({len(suspicious_hits) - 200} more)")
        else:
            print("No suspicious patterns found.")

    def _today_domlog_datestr(self, now: Optional[dt.datetime] = None) -> str:
        """Return today's Apache-style date string (e.g., 12/Jan/2026)."""
        if now is None:
            now = dt.datetime.now()
        return now.strftime("%d/%b/%Y")

    def _iter_domlog_files(self, root: Path, *, cutoff_ts: float) -> Iterator[Path]:

        Max_size = 100 * 1024 * 1024
        def eligible(p: Path) -> bool:
            if not p.is_file():
                return False
            try:
                stats = p.stat()
                if stats.st_size > Max_size:
                    return False
                return stats.st_mtime >= cutoff_ts
            except OSError:
                return False

        for p in root.glob("*"):
            if eligible(p):
                yield p
        for p in root.glob("*/*"):
            if eligible(p):
                yield p

    def _parse_domlogs(self, root: Path, today_str: str) -> tuple[Counter[str], list[str]]:
        ip_counts_today: Counter[str] = Counter()
        suspicious_dedupe: set[tuple[str, str]] = set()
        suspicious_hits: list[str] = []

        cutoff_ts = (dt.datetime.now().timestamp() - 24 * 60 * 60)


        for logfile in self._iter_domlog_files(root, cutoff_ts=cutoff_ts):
            try:
                for line in FileIO.iter_lines(logfile):
                    m_ip = self._domlog_ip_re.search(line)
                    if m_ip and m_ip.group("ts") == today_str:
                        ip_counts_today[m_ip.group("ip")] += 1

                    m_req = self._http_method_re.search(line)
                    if not m_req:
                        continue

                    method = m_req.group("method")
                    path = m_req.group("path").lower()

                    if method == "POST" and '"-"' in line:
                        m_status = re.search(r"\"\s+(?P<status>\d{3})\s+", line)
                        if m_status and m_status.group("status").startswith("2") and "cron" not in path:
                            key = (str(logfile), path)
                            if key not in suspicious_dedupe:
                                suspicious_dedupe.add(key)
                                suspicious_hits.append(f"No-ref-POST\t{logfile}\t{path}")

                    for rx in self._suspicious_paths:
                        if rx.search(path):
                            key = (str(logfile), path)
                            if key not in suspicious_dedupe:
                                suspicious_dedupe.add(key)
                                suspicious_hits.append(f"Suspicious\t{logfile}\t{path}")
                            break
            except Exception:
                continue

        return ip_counts_today, suspicious_hits


# ----------------------------
# Orchestration
# ----------------------------

class HackCheckerApp:
    def __init__(self, profile: ServerProfile) -> None:
        self.profile = profile
        self.r = Reporter()

        self.proc = ProcessInspector(self.r)
        self.logs = LogInspector(self.profile, self.r)

    def run(self, *, full: bool) -> int:
        self._print_banner()
        self.proc.run_all(full=full)
        self.logs.run_all()
        print()
        return 0

    def _print_banner(self) -> None:
        self.r.bar()
        print(color.magenta("= Review processes/logs before taking action. ="))
        self.r.bar()
        print(color.yellow(f"Detected OS: {self.profile.os_pretty} (ID={self.profile.os_id})"))
        if self.profile.panel == "cpanel":
            print(color.yellow(f"Detected panel: cPanel (version={self.profile.cpanel_version or 'unknown'})"))
        elif self.profile.panel == "cwp":
            print(color.yellow(f"Detected panel: CWP (version={self.profile.cwp_version or 'unknown'})"))
        else:
            print(color.yellow("Detected panel: none"))
        print(color.yellow(f"Profile: {self.profile.server_type.value}"))
        print(color.yellow("Log paths:"))
        print(color.yellow(f"  sys_log        = {self.profile.paths.sys_log}"))
        print(color.yellow(f"  mail_log       = {self.profile.paths.mail_log}"))
        print(color.yellow(f"  ssh_log        = {self.profile.paths.ssh_log}"))
        print(color.yellow(f"  panel_login    = {self.profile.paths.panel_login_log}"))
        print(color.yellow(f"  domlogs_root   = {self.profile.paths.domlogs_root}"))


# ----------------------------
# Entry point
# ----------------------------

def build_argparser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        description="Checks logs and processes for common hack/malware indicators across Ubuntu/Alma + cPanel/CWP."
    )
    p.add_argument("--full", action="store_true", help="Show extra network listener output where applicable.")
    return p


def main() -> int:
    args = build_argparser().parse_args()

    detector = ServerDetector()
    profile = detector.detect()

    app = HackCheckerApp(profile)
    return app.run(full=args.full)


if __name__ == "__main__":
    raise SystemExit(main())