""" fail2ban 디스코드 봇 - 실시간 IP 차단 알림 - /list: 차단 목록 조회 - /unban: IP 차단 해제 - /status: fail2ban 상태 확인 """ import os import subprocess import discord from discord import app_commands from discord.ext import commands from aiohttp import web import asyncio from datetime import datetime import aiohttp # 환경변수 BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0")) WEBHOOK_PORT = int(os.getenv("WEBHOOK_PORT", "5000")) # 봇 설정 intents = discord.Intents.default() bot = commands.Bot(command_prefix="!", intents=intents) async def get_ip_info(ip: str) -> dict: """IP 정보 조회 (위치, ISP)""" try: async with aiohttp.ClientSession() as session: async with session.get(f"http://ip-api.com/json/{ip}?fields=country,city,isp") as resp: if resp.status == 200: return await resp.json() except Exception: pass return {"country": "Unknown", "city": "Unknown", "isp": "Unknown"} def run_fail2ban_command(args: list) -> str: """fail2ban-client 명령어 실행""" try: result = subprocess.run( ["fail2ban-client"] + args, capture_output=True, text=True, timeout=10 ) return result.stdout or result.stderr except subprocess.TimeoutExpired: return "명령어 시간 초과" except Exception as e: return f"오류: {str(e)}" def get_jail_stats(jail: str = "sshd") -> dict: """jail 통계 조회""" output = run_fail2ban_command(["status", jail]) stats = { "banned_ips": [], "currently_failed": "0", "total_failed": "0", "currently_banned": "0", "total_banned": "0" } for line in output.split("\n"): if "Banned IP list:" in line: ip_part = line.split("Banned IP list:")[1].strip() if ip_part: stats["banned_ips"] = ip_part.split() elif "Currently failed:" in line: stats["currently_failed"] = line.split(":")[-1].strip() elif "Total failed:" in line: stats["total_failed"] = line.split(":")[-1].strip() elif "Currently banned:" in line: stats["currently_banned"] = line.split(":")[-1].strip() elif "Total banned:" in line: stats["total_banned"] = line.split(":")[-1].strip() return stats @bot.event async def on_ready(): """봇 시작 시 실행""" print(f"✅ 봇 로그인: {bot.user}") try: synced = await bot.tree.sync() print(f"✅ {len(synced)}개 명령어 동기화 완료") except Exception as e: print(f"❌ 명령어 동기화 실패: {e}") @bot.tree.command(name="list", description="차단된 IP 목록과 통계를 조회합니다") @app_commands.describe(jail="조회할 jail 이름 (기본: sshd)") async def list_banned(interaction: discord.Interaction, jail: str = "sshd"): """차단된 IP 목록 및 통계 조회""" await interaction.response.defer() stats = get_jail_stats(jail) banned_ips = stats["banned_ips"] embed = discord.Embed( title=f"🔒 차단된 IP 목록 ({jail})", color=discord.Color.red() if banned_ips else discord.Color.green(), timestamp=datetime.now() ) if banned_ips: ip_list = "\n".join([f"• `{ip}`" for ip in banned_ips]) embed.description = ip_list else: embed.description = "✅ 현재 차단된 IP가 없습니다." # 통계 추가 (여백 포함) embed.add_field(name="\u200b", value="\u200b", inline=False) embed.add_field(name="🔒 영구 차단", value=f"`{stats['currently_banned']}`개 IP", inline=True) embed.add_field(name="📈 누적 실패", value=f"`{stats['total_failed']}`회", inline=True) embed.add_field(name="📊 누적 차단", value=f"`{stats['total_banned']}`회", inline=True) await interaction.followup.send(embed=embed) @bot.tree.command(name="unban", description="IP 차단을 해제합니다") @app_commands.describe(ip="차단 해제할 IP 주소", jail="jail 이름 (기본: sshd)") async def unban_ip(interaction: discord.Interaction, ip: str, jail: str = "sshd"): """IP 차단 해제""" await interaction.response.defer() result = run_fail2ban_command(["set", jail, "unbanip", ip]) embed = discord.Embed(timestamp=datetime.now()) if "is not banned" in result.lower() or "not in" in result.lower(): embed.title = "⚠️ 차단 해제 실패" embed.description = f"`{ip}`는 현재 차단되어 있지 않습니다." embed.color = discord.Color.orange() elif "error" in result.lower(): embed.title = "❌ 오류 발생" embed.description = f"```{result}```" embed.color = discord.Color.red() else: embed.title = "✅ 차단 해제 완료" embed.description = f"`{ip}`의 차단이 해제되었습니다." embed.color = discord.Color.green() await interaction.followup.send(embed=embed) @bot.tree.command(name="status", description="fail2ban 서비스 상태를 확인합니다") async def status(interaction: discord.Interaction): """fail2ban 서비스 상태 조회""" await interaction.response.defer() # 전체 상태 조회 output = run_fail2ban_command(["status"]) # 활성 jail 목록 파싱 jails = [] for line in output.split("\n"): if "Jail list:" in line: jail_part = line.split("Jail list:")[1].strip() if jail_part: jails = [j.strip() for j in jail_part.split(",")] # 서비스 상태 확인 is_running = "Number of jail:" in output embed = discord.Embed( title="📊 fail2ban 서비스 상태", color=discord.Color.green() if is_running else discord.Color.red(), timestamp=datetime.now() ) # 상태 status_text = "🟢 실행 중" if is_running else "🔴 중지됨" embed.add_field(name="상태", value=status_text, inline=True) embed.add_field(name="활성 Jail", value=f"`{len(jails)}`개", inline=True) embed.add_field(name="\u200b", value="\u200b", inline=True) # Jail 목록 (여백 포함) embed.add_field(name="\u200b", value="\u200b", inline=False) if jails: jail_list = ", ".join([f"`{j}`" for j in jails]) embed.add_field(name="📋 Jail 목록", value=jail_list, inline=False) # 설정 정보 (여백 포함) embed.add_field(name="\u200b", value="\u200b", inline=False) embed.add_field(name="⚙️ 설정", value="10분 내 5회 실패 시 **영구 차단**", inline=False) await interaction.followup.send(embed=embed) async def send_ban_notification(ip: str, jail: str, failures: str): """IP 차단 알림 전송""" if not CHANNEL_ID: print("❌ CHANNEL_ID가 설정되지 않음") return channel = bot.get_channel(CHANNEL_ID) if not channel: print(f"❌ 채널을 찾을 수 없음: {CHANNEL_ID}") return # IP 정보 조회 ip_info = await get_ip_info(ip) embed = discord.Embed( title="🚫 IP 차단 알림", description="fail2ban에 의해 IP가 차단되었습니다.", color=discord.Color.red(), timestamp=datetime.now() ) embed.add_field(name="🔒 차단된 IP", value=f"`{ip}`", inline=True) embed.add_field(name="📛 Jail", value=f"`{jail}`", inline=True) embed.add_field(name="❌ 실패 횟수", value=f"`{failures}`", inline=True) embed.add_field(name="🌍 위치", value=f"`{ip_info.get('city', 'Unknown')}, {ip_info.get('country', 'Unknown')}`", inline=True) embed.add_field(name="🏢 ISP", value=f"`{ip_info.get('isp', 'Unknown')}`", inline=True) embed.add_field(name="🖥️ 서버", value=f"`{os.uname().nodename}`", inline=True) await channel.send(embed=embed) print(f"✅ 차단 알림 전송: {ip}") # HTTP 웹훅 서버 (fail2ban에서 호출) async def webhook_handler(request): """fail2ban에서 호출하는 웹훅""" try: data = await request.json() ip = data.get("ip", "Unknown") jail = data.get("jail", "sshd") failures = data.get("failures", "?") await send_ban_notification(ip, jail, failures) return web.Response(text="OK") except Exception as e: print(f"❌ 웹훅 오류: {e}") return web.Response(text="Error", status=500) async def start_webhook_server(): """웹훅 서버 시작""" app = web.Application() app.router.add_post("/ban", webhook_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", WEBHOOK_PORT) await site.start() print(f"✅ 웹훅 서버 시작: http://0.0.0.0:{WEBHOOK_PORT}/ban") async def main(): """메인 함수""" # 웹훅 서버 시작 await start_webhook_server() # 봇 시작 await bot.start(BOT_TOKEN) if __name__ == "__main__": asyncio.run(main())