265 lines
9 KiB
Python
265 lines
9 KiB
Python
"""
|
|
fail2ban 디스코드 봇
|
|
- 실시간 IP 차단 알림
|
|
- /list: 차단 목록 조회
|
|
- /unban: IP 차단 해제
|
|
- /status: fail2ban 상태 확인
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import discord
|
|
from discord import app_commands
|
|
from discord.ext import commands
|
|
from aiohttp import web
|
|
import asyncio
|
|
from datetime import datetime
|
|
import aiohttp
|
|
|
|
# 환경변수
|
|
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
|
|
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0"))
|
|
WEBHOOK_PORT = int(os.getenv("WEBHOOK_PORT", "5000"))
|
|
|
|
# 봇 설정
|
|
intents = discord.Intents.default()
|
|
bot = commands.Bot(command_prefix="!", intents=intents)
|
|
|
|
|
|
async def get_ip_info(ip: str) -> dict:
|
|
"""IP 정보 조회 (위치, ISP)"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"http://ip-api.com/json/{ip}?fields=country,city,isp") as resp:
|
|
if resp.status == 200:
|
|
return await resp.json()
|
|
except Exception:
|
|
pass
|
|
return {"country": "Unknown", "city": "Unknown", "isp": "Unknown"}
|
|
|
|
|
|
def run_fail2ban_command(args: list) -> str:
|
|
"""fail2ban-client 명령어 실행"""
|
|
try:
|
|
result = subprocess.run(
|
|
["fail2ban-client"] + args,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
return result.stdout or result.stderr
|
|
except subprocess.TimeoutExpired:
|
|
return "명령어 시간 초과"
|
|
except Exception as e:
|
|
return f"오류: {str(e)}"
|
|
|
|
|
|
def get_jail_stats(jail: str = "sshd") -> dict:
|
|
"""jail 통계 조회"""
|
|
output = run_fail2ban_command(["status", jail])
|
|
stats = {
|
|
"banned_ips": [],
|
|
"currently_failed": "0",
|
|
"total_failed": "0",
|
|
"currently_banned": "0",
|
|
"total_banned": "0"
|
|
}
|
|
|
|
for line in output.split("\n"):
|
|
if "Banned IP list:" in line:
|
|
ip_part = line.split("Banned IP list:")[1].strip()
|
|
if ip_part:
|
|
stats["banned_ips"] = ip_part.split()
|
|
elif "Currently failed:" in line:
|
|
stats["currently_failed"] = line.split(":")[-1].strip()
|
|
elif "Total failed:" in line:
|
|
stats["total_failed"] = line.split(":")[-1].strip()
|
|
elif "Currently banned:" in line:
|
|
stats["currently_banned"] = line.split(":")[-1].strip()
|
|
elif "Total banned:" in line:
|
|
stats["total_banned"] = line.split(":")[-1].strip()
|
|
|
|
return stats
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
"""봇 시작 시 실행"""
|
|
print(f"✅ 봇 로그인: {bot.user}")
|
|
try:
|
|
synced = await bot.tree.sync()
|
|
print(f"✅ {len(synced)}개 명령어 동기화 완료")
|
|
except Exception as e:
|
|
print(f"❌ 명령어 동기화 실패: {e}")
|
|
|
|
|
|
@bot.tree.command(name="list", description="차단된 IP 목록과 통계를 조회합니다")
|
|
@app_commands.describe(jail="조회할 jail 이름 (기본: sshd)")
|
|
async def list_banned(interaction: discord.Interaction, jail: str = "sshd"):
|
|
"""차단된 IP 목록 및 통계 조회"""
|
|
await interaction.response.defer()
|
|
|
|
stats = get_jail_stats(jail)
|
|
banned_ips = stats["banned_ips"]
|
|
|
|
embed = discord.Embed(
|
|
title=f"🔒 차단된 IP 목록 ({jail})",
|
|
color=discord.Color.red() if banned_ips else discord.Color.green(),
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
if banned_ips:
|
|
ip_list = "\n".join([f"• `{ip}`" for ip in banned_ips])
|
|
embed.description = ip_list
|
|
else:
|
|
embed.description = "✅ 현재 차단된 IP가 없습니다."
|
|
|
|
# 통계 추가 (여백 포함)
|
|
embed.add_field(name="\u200b", value="\u200b", inline=False)
|
|
embed.add_field(name="🔒 영구 차단", value=f"`{stats['currently_banned']}`개 IP", inline=True)
|
|
embed.add_field(name="📈 누적 실패", value=f"`{stats['total_failed']}`회", inline=True)
|
|
embed.add_field(name="📊 누적 차단", value=f"`{stats['total_banned']}`회", inline=True)
|
|
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
|
|
@bot.tree.command(name="unban", description="IP 차단을 해제합니다")
|
|
@app_commands.describe(ip="차단 해제할 IP 주소", jail="jail 이름 (기본: sshd)")
|
|
async def unban_ip(interaction: discord.Interaction, ip: str, jail: str = "sshd"):
|
|
"""IP 차단 해제"""
|
|
await interaction.response.defer()
|
|
|
|
result = run_fail2ban_command(["set", jail, "unbanip", ip])
|
|
|
|
embed = discord.Embed(timestamp=datetime.now())
|
|
|
|
if "is not banned" in result.lower() or "not in" in result.lower():
|
|
embed.title = "⚠️ 차단 해제 실패"
|
|
embed.description = f"`{ip}`는 현재 차단되어 있지 않습니다."
|
|
embed.color = discord.Color.orange()
|
|
elif "error" in result.lower():
|
|
embed.title = "❌ 오류 발생"
|
|
embed.description = f"```{result}```"
|
|
embed.color = discord.Color.red()
|
|
else:
|
|
embed.title = "✅ 차단 해제 완료"
|
|
embed.description = f"`{ip}`의 차단이 해제되었습니다."
|
|
embed.color = discord.Color.green()
|
|
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
|
|
@bot.tree.command(name="status", description="fail2ban 서비스 상태를 확인합니다")
|
|
async def status(interaction: discord.Interaction):
|
|
"""fail2ban 서비스 상태 조회"""
|
|
await interaction.response.defer()
|
|
|
|
# 전체 상태 조회
|
|
output = run_fail2ban_command(["status"])
|
|
|
|
# 활성 jail 목록 파싱
|
|
jails = []
|
|
for line in output.split("\n"):
|
|
if "Jail list:" in line:
|
|
jail_part = line.split("Jail list:")[1].strip()
|
|
if jail_part:
|
|
jails = [j.strip() for j in jail_part.split(",")]
|
|
|
|
# 서비스 상태 확인
|
|
is_running = "Number of jail:" in output
|
|
|
|
embed = discord.Embed(
|
|
title="📊 fail2ban 서비스 상태",
|
|
color=discord.Color.green() if is_running else discord.Color.red(),
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
# 상태
|
|
status_text = "🟢 실행 중" if is_running else "🔴 중지됨"
|
|
embed.add_field(name="상태", value=status_text, inline=True)
|
|
embed.add_field(name="활성 Jail", value=f"`{len(jails)}`개", inline=True)
|
|
embed.add_field(name="\u200b", value="\u200b", inline=True)
|
|
|
|
# Jail 목록 (여백 포함)
|
|
embed.add_field(name="\u200b", value="\u200b", inline=False)
|
|
if jails:
|
|
jail_list = ", ".join([f"`{j}`" for j in jails])
|
|
embed.add_field(name="📋 Jail 목록", value=jail_list, inline=False)
|
|
|
|
# 설정 정보 (여백 포함)
|
|
embed.add_field(name="\u200b", value="\u200b", inline=False)
|
|
embed.add_field(name="⚙️ 설정", value="10분 내 5회 실패 시 **영구 차단**", inline=False)
|
|
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
|
|
async def send_ban_notification(ip: str, jail: str, failures: str):
|
|
"""IP 차단 알림 전송"""
|
|
if not CHANNEL_ID:
|
|
print("❌ CHANNEL_ID가 설정되지 않음")
|
|
return
|
|
|
|
channel = bot.get_channel(CHANNEL_ID)
|
|
if not channel:
|
|
print(f"❌ 채널을 찾을 수 없음: {CHANNEL_ID}")
|
|
return
|
|
|
|
# IP 정보 조회
|
|
ip_info = await get_ip_info(ip)
|
|
|
|
embed = discord.Embed(
|
|
title="🚫 IP 차단 알림",
|
|
description="fail2ban에 의해 IP가 차단되었습니다.",
|
|
color=discord.Color.red(),
|
|
timestamp=datetime.now()
|
|
)
|
|
embed.add_field(name="🔒 차단된 IP", value=f"`{ip}`", inline=True)
|
|
embed.add_field(name="📛 Jail", value=f"`{jail}`", inline=True)
|
|
embed.add_field(name="❌ 실패 횟수", value=f"`{failures}`", inline=True)
|
|
embed.add_field(name="🌍 위치", value=f"`{ip_info.get('city', 'Unknown')}, {ip_info.get('country', 'Unknown')}`", inline=True)
|
|
embed.add_field(name="🏢 ISP", value=f"`{ip_info.get('isp', 'Unknown')}`", inline=True)
|
|
embed.add_field(name="🖥️ 서버", value=f"`{os.uname().nodename}`", inline=True)
|
|
|
|
await channel.send(embed=embed)
|
|
print(f"✅ 차단 알림 전송: {ip}")
|
|
|
|
|
|
# HTTP 웹훅 서버 (fail2ban에서 호출)
|
|
async def webhook_handler(request):
|
|
"""fail2ban에서 호출하는 웹훅"""
|
|
try:
|
|
data = await request.json()
|
|
ip = data.get("ip", "Unknown")
|
|
jail = data.get("jail", "sshd")
|
|
failures = data.get("failures", "?")
|
|
|
|
await send_ban_notification(ip, jail, failures)
|
|
return web.Response(text="OK")
|
|
except Exception as e:
|
|
print(f"❌ 웹훅 오류: {e}")
|
|
return web.Response(text="Error", status=500)
|
|
|
|
|
|
async def start_webhook_server():
|
|
"""웹훅 서버 시작"""
|
|
app = web.Application()
|
|
app.router.add_post("/ban", webhook_handler)
|
|
|
|
runner = web.AppRunner(app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, "0.0.0.0", WEBHOOK_PORT)
|
|
await site.start()
|
|
print(f"✅ 웹훅 서버 시작: http://0.0.0.0:{WEBHOOK_PORT}/ban")
|
|
|
|
|
|
async def main():
|
|
"""메인 함수"""
|
|
# 웹훅 서버 시작
|
|
await start_webhook_server()
|
|
|
|
# 봇 시작
|
|
await bot.start(BOT_TOKEN)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|