discord-bot/fail2ban/main.py

266 lines
9 KiB
Python
Raw Permalink Normal View History

2025-12-16 11:44:25 +09:00
"""
fail2ban 디스코드
- 실시간 IP 차단 알림
- /list: 차단 목록 조회
- /unban: IP 차단 해제
- /status: fail2ban 상태 확인
"""
import os
import subprocess
import discord
from discord import app_commands
from discord.ext import commands
from aiohttp import web
import asyncio
from datetime import datetime
import aiohttp
# 환경변수
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0"))
WEBHOOK_PORT = int(os.getenv("WEBHOOK_PORT", "5000"))
# 봇 설정
intents = discord.Intents.default()
bot = commands.Bot(command_prefix="!", intents=intents)
async def get_ip_info(ip: str) -> dict:
"""IP 정보 조회 (위치, ISP)"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"http://ip-api.com/json/{ip}?fields=country,city,isp") as resp:
if resp.status == 200:
return await resp.json()
except Exception:
pass
return {"country": "Unknown", "city": "Unknown", "isp": "Unknown"}
def run_fail2ban_command(args: list) -> str:
"""fail2ban-client 명령어 실행"""
try:
result = subprocess.run(
["fail2ban-client"] + args,
capture_output=True,
text=True,
timeout=10
)
return result.stdout or result.stderr
except subprocess.TimeoutExpired:
return "명령어 시간 초과"
except Exception as e:
return f"오류: {str(e)}"
def get_jail_stats(jail: str = "sshd") -> dict:
"""jail 통계 조회"""
output = run_fail2ban_command(["status", jail])
stats = {
"banned_ips": [],
"currently_failed": "0",
"total_failed": "0",
"currently_banned": "0",
"total_banned": "0"
}
for line in output.split("\n"):
if "Banned IP list:" in line:
ip_part = line.split("Banned IP list:")[1].strip()
if ip_part:
stats["banned_ips"] = ip_part.split()
elif "Currently failed:" in line:
stats["currently_failed"] = line.split(":")[-1].strip()
elif "Total failed:" in line:
stats["total_failed"] = line.split(":")[-1].strip()
elif "Currently banned:" in line:
stats["currently_banned"] = line.split(":")[-1].strip()
elif "Total banned:" in line:
stats["total_banned"] = line.split(":")[-1].strip()
return stats
@bot.event
async def on_ready():
"""봇 시작 시 실행"""
print(f"✅ 봇 로그인: {bot.user}")
try:
synced = await bot.tree.sync()
print(f"{len(synced)}개 명령어 동기화 완료")
except Exception as e:
print(f"❌ 명령어 동기화 실패: {e}")
@bot.tree.command(name="list", description="차단된 IP 목록과 통계를 조회합니다")
@app_commands.describe(jail="조회할 jail 이름 (기본: sshd)")
async def list_banned(interaction: discord.Interaction, jail: str = "sshd"):
"""차단된 IP 목록 및 통계 조회"""
await interaction.response.defer()
stats = get_jail_stats(jail)
banned_ips = stats["banned_ips"]
embed = discord.Embed(
title=f"🔒 차단된 IP 목록 ({jail})",
color=discord.Color.red() if banned_ips else discord.Color.green(),
timestamp=datetime.now()
)
if banned_ips:
ip_list = "\n".join([f"• `{ip}`" for ip in banned_ips])
embed.description = ip_list
else:
embed.description = "✅ 현재 차단된 IP가 없습니다."
# 통계 추가 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
embed.add_field(name="🔒 영구 차단", value=f"`{stats['currently_banned']}`개 IP", inline=True)
embed.add_field(name="📈 누적 실패", value=f"`{stats['total_failed']}`회", inline=True)
embed.add_field(name="📊 누적 차단", value=f"`{stats['total_banned']}`회", inline=True)
await interaction.followup.send(embed=embed)
@bot.tree.command(name="unban", description="IP 차단을 해제합니다")
@app_commands.describe(ip="차단 해제할 IP 주소", jail="jail 이름 (기본: sshd)")
async def unban_ip(interaction: discord.Interaction, ip: str, jail: str = "sshd"):
"""IP 차단 해제"""
await interaction.response.defer()
result = run_fail2ban_command(["set", jail, "unbanip", ip])
embed = discord.Embed(timestamp=datetime.now())
if "is not banned" in result.lower() or "not in" in result.lower():
embed.title = "⚠️ 차단 해제 실패"
embed.description = f"`{ip}`는 현재 차단되어 있지 않습니다."
embed.color = discord.Color.orange()
elif "error" in result.lower():
embed.title = "❌ 오류 발생"
embed.description = f"```{result}```"
embed.color = discord.Color.red()
else:
embed.title = "✅ 차단 해제 완료"
embed.description = f"`{ip}`의 차단이 해제되었습니다."
embed.color = discord.Color.green()
await interaction.followup.send(embed=embed)
@bot.tree.command(name="status", description="fail2ban 서비스 상태를 확인합니다")
async def status(interaction: discord.Interaction):
"""fail2ban 서비스 상태 조회"""
await interaction.response.defer()
# 전체 상태 조회
output = run_fail2ban_command(["status"])
# 활성 jail 목록 파싱
jails = []
for line in output.split("\n"):
if "Jail list:" in line:
jail_part = line.split("Jail list:")[1].strip()
if jail_part:
jails = [j.strip() for j in jail_part.split(",")]
# 서비스 상태 확인
is_running = "Number of jail:" in output
embed = discord.Embed(
title="📊 fail2ban 서비스 상태",
color=discord.Color.green() if is_running else discord.Color.red(),
timestamp=datetime.now()
)
# 상태
status_text = "🟢 실행 중" if is_running else "🔴 중지됨"
embed.add_field(name="상태", value=status_text, inline=True)
embed.add_field(name="활성 Jail", value=f"`{len(jails)}`개", inline=True)
embed.add_field(name="\u200b", value="\u200b", inline=True)
# Jail 목록 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
if jails:
jail_list = ", ".join([f"`{j}`" for j in jails])
embed.add_field(name="📋 Jail 목록", value=jail_list, inline=False)
# 설정 정보 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
embed.add_field(name="⚙️ 설정", value="10분 내 5회 실패 시 **영구 차단**", inline=False)
await interaction.followup.send(embed=embed)
async def send_ban_notification(ip: str, jail: str, failures: str):
"""IP 차단 알림 전송"""
if not CHANNEL_ID:
print("❌ CHANNEL_ID가 설정되지 않음")
return
channel = bot.get_channel(CHANNEL_ID)
if not channel:
print(f"❌ 채널을 찾을 수 없음: {CHANNEL_ID}")
return
# IP 정보 조회
ip_info = await get_ip_info(ip)
embed = discord.Embed(
title="🚫 IP 차단 알림",
description="fail2ban에 의해 IP가 차단되었습니다.",
color=discord.Color.red(),
timestamp=datetime.now()
)
embed.add_field(name="🔒 차단된 IP", value=f"`{ip}`", inline=True)
embed.add_field(name="📛 Jail", value=f"`{jail}`", inline=True)
embed.add_field(name="❌ 실패 횟수", value=f"`{failures}`", inline=True)
embed.add_field(name="🌍 위치", value=f"`{ip_info.get('city', 'Unknown')}, {ip_info.get('country', 'Unknown')}`", inline=True)
embed.add_field(name="🏢 ISP", value=f"`{ip_info.get('isp', 'Unknown')}`", inline=True)
embed.add_field(name="🖥️ 서버", value=f"`{os.uname().nodename}`", inline=True)
await channel.send(embed=embed)
print(f"✅ 차단 알림 전송: {ip}")
# HTTP 웹훅 서버 (fail2ban에서 호출)
async def webhook_handler(request):
"""fail2ban에서 호출하는 웹훅"""
try:
data = await request.json()
ip = data.get("ip", "Unknown")
jail = data.get("jail", "sshd")
failures = data.get("failures", "?")
await send_ban_notification(ip, jail, failures)
return web.Response(text="OK")
except Exception as e:
print(f"❌ 웹훅 오류: {e}")
return web.Response(text="Error", status=500)
async def start_webhook_server():
"""웹훅 서버 시작"""
app = web.Application()
app.router.add_post("/ban", webhook_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", WEBHOOK_PORT)
await site.start()
print(f"✅ 웹훅 서버 시작: http://0.0.0.0:{WEBHOOK_PORT}/ban")
async def main():
"""메인 함수"""
# 웹훅 서버 시작
await start_webhook_server()
# 봇 시작
await bot.start(BOT_TOKEN)
if __name__ == "__main__":
asyncio.run(main())