Initial commit: Discord Bot (fail2ban)

This commit is contained in:
caadiq 2025-12-16 11:44:25 +09:00
commit 8779420d0a
9 changed files with 501 additions and 0 deletions

20
.gitignore vendored Normal file
View file

@ -0,0 +1,20 @@
# Dependencies
node_modules/
# Logs
*.log
npm-debug.log*
# OS
.DS_Store
Thumbs.db
# IDE
.idea/
.vscode/
*.swp
*.swo
# Build
dist/
build/

31
README.md Normal file
View file

@ -0,0 +1,31 @@
# 🤖 Discord Bot
디스코드 봇 모음입니다.
![Python](https://img.shields.io/badge/Python-3.11-3776AB?logo=python)
![Discord](https://img.shields.io/badge/Discord.py-2.0-5865F2?logo=discord)
![Docker](https://img.shields.io/badge/Docker-2496ED?logo=docker)
---
## 📁 봇 목록
| 봇 | 설명 |
| ----------------------- | --------------------- |
| [fail2ban](./fail2ban/) | SSH 차단 알림 및 관리 |
---
## 🛠️ 공통 기술 스택
| 기술 | 설명 |
| -------------- | ------------- |
| **Python** | 런타임 |
| **discord.py** | 디스코드 API |
| **Docker** | 컨테이너 환경 |
---
## 📄 라이선스
MIT License

8
fail2ban/.env Normal file
View file

@ -0,0 +1,8 @@
# 디스코드 봇 토큰
DISCORD_BOT_TOKEN=MTQ1MDI5MjE2NDIzMDY0MzcyNQ.GrxNZp.xg79iSfHslXoFmooH6VbFak91yC5D49k1ZSjIw
# 알림을 보낼 디스코드 채널 ID (채널 우클릭 → ID 복사)
DISCORD_CHANNEL_ID=1450288717280575498
# 웹훅 서버 포트
WEBHOOK_PORT=5000

17
fail2ban/Dockerfile Normal file
View file

@ -0,0 +1,17 @@
FROM python:3.12-slim
WORKDIR /app
# fail2ban-client 설치
RUN apt-get update && apt-get install -y --no-install-recommends \
fail2ban \
&& rm -rf /var/lib/apt/lists/*
# 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 소스 복사
COPY main.py .
CMD ["python", "main.py"]

104
fail2ban/README.md Normal file
View file

@ -0,0 +1,104 @@
# 🛡️ fail2ban Discord Bot
fail2ban IP 차단 현황을 디스코드로 실시간 알림 및 관리하는 봇입니다.
![Python](https://img.shields.io/badge/Python-3.11-3776AB?logo=python)
![Discord](https://img.shields.io/badge/Discord.py-2.0-5865F2?logo=discord)
---
## ✨ 주요 기능
- 🚨 **실시간 알림** - IP 차단 시 디스코드 채널로 즉시 알림
- 📋 **차단 목록** - `/list` 명령으로 현재 차단 IP 조회
- 🔓 **차단 해제** - `/unban` 명령으로 IP 차단 해제
- 📊 **상태 조회** - `/status` 명령으로 fail2ban 서비스 상태 확인
- 🌍 **IP 정보** - 차단된 IP의 위치 및 ISP 정보 표시
---
## 🛠️ 기술 스택
| 기술 | 설명 |
| -------------- | ------------------ |
| **Python** | 런타임 |
| **discord.py** | 디스코드 API |
| **aiohttp** | 비동기 HTTP (웹훅) |
| **ip-api.com** | IP 위치 조회 |
---
## 📡 슬래시 명령어
| 명령어 | 설명 |
| -------------------- | --------------------------- |
| `/list [jail]` | 차단된 IP 목록 및 통계 조회 |
| `/unban <ip> [jail]` | IP 차단 해제 |
| `/status` | fail2ban 서비스 상태 확인 |
---
## 🔔 알림 형식
IP 차단 시 다음 정보가 포함된 Embed 알림이 전송됩니다:
- 🔒 차단된 IP
- 📛 Jail 이름
- ❌ 실패 횟수
- 🌍 위치 (국가, 도시)
- 🏢 ISP
- 🖥️ 서버 이름
---
## 🚀 실행 방법
### Docker (권장)
```bash
docker compose up -d --build
```
### 로컬 실행
```bash
pip install -r requirements.txt
python main.py
```
---
## ⚙️ 환경 변수
| 변수 | 설명 |
| -------------------- | --------------------------- |
| `DISCORD_BOT_TOKEN` | 디스코드 봇 토큰 |
| `DISCORD_CHANNEL_ID` | 알림 채널 ID |
| `WEBHOOK_PORT` | 웹훅 서버 포트 (기본: 5000) |
---
## 🔧 fail2ban 연동
`/etc/fail2ban/action.d/` 에 액션 스크립트를 추가하고, jail 설정에서 해당 액션을 사용합니다.
```bash
# 차단 시 웹훅 호출
curl -X POST http://127.0.0.1:5000/ban \
-H "Content-Type: application/json" \
-d '{"ip":"<ip>", "jail":"<name>", "failures":"<failures>"}'
```
---
## 📁 구조
```
fail2ban/
├── main.py # 봇 메인 로직
├── docker-compose.yml # Docker Compose 설정
├── Dockerfile # 컨테이너 빌드
├── requirements.txt # Python 의존성
├── discord-notify.sh # fail2ban 액션 스크립트
└── .env # 환경 변수
```

View file

@ -0,0 +1,31 @@
#!/bin/bash
# fail2ban에서 IP 차단 시 디스코드 봇으로 알림을 보내는 스크립트
# 사용법: discord-notify.sh <ip> <jail> <failures>
# 봇 웹훅 URL
BOT_WEBHOOK_URL="http://localhost:5000/ban"
# 파라미터 받기
IP=$1
JAIL=$2
FAILURES=$3
# JSON 페이로드 생성
JSON_PAYLOAD=$(cat <<EOF
{
"ip": "${IP}",
"jail": "${JAIL}",
"failures": "${FAILURES}"
}
EOF
)
# 봇 웹훅으로 알림 전송
curl -H "Content-Type: application/json" \
-X POST \
-d "${JSON_PAYLOAD}" \
"${BOT_WEBHOOK_URL}" \
--silent --output /dev/null \
--max-time 5
exit 0

View file

@ -0,0 +1,23 @@
services:
discord-bot-fail2ban:
build: .
container_name: discord-bot-fail2ban
env_file:
- .env
environment:
- PYTHONUNBUFFERED=1
volumes:
# 호스트의 fail2ban 소켓에 접근
- /var/run/fail2ban:/var/run/fail2ban:ro
ports:
# 호스트 fail2ban 스크립트 접근용 (localhost만)
- "127.0.0.1:5000:5000"
networks:
- bot
labels:
- "com.centurylinklabs.watchtower.enable=false"
restart: unless-stopped
networks:
bot:
external: true

265
fail2ban/main.py Normal file
View file

@ -0,0 +1,265 @@
"""
fail2ban 디스코드
- 실시간 IP 차단 알림
- /list: 차단 목록 조회
- /unban: IP 차단 해제
- /status: fail2ban 상태 확인
"""
import os
import subprocess
import discord
from discord import app_commands
from discord.ext import commands
from aiohttp import web
import asyncio
from datetime import datetime
import aiohttp
# 환경변수
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID", "0"))
WEBHOOK_PORT = int(os.getenv("WEBHOOK_PORT", "5000"))
# 봇 설정
intents = discord.Intents.default()
bot = commands.Bot(command_prefix="!", intents=intents)
async def get_ip_info(ip: str) -> dict:
"""IP 정보 조회 (위치, ISP)"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"http://ip-api.com/json/{ip}?fields=country,city,isp") as resp:
if resp.status == 200:
return await resp.json()
except Exception:
pass
return {"country": "Unknown", "city": "Unknown", "isp": "Unknown"}
def run_fail2ban_command(args: list) -> str:
"""fail2ban-client 명령어 실행"""
try:
result = subprocess.run(
["fail2ban-client"] + args,
capture_output=True,
text=True,
timeout=10
)
return result.stdout or result.stderr
except subprocess.TimeoutExpired:
return "명령어 시간 초과"
except Exception as e:
return f"오류: {str(e)}"
def get_jail_stats(jail: str = "sshd") -> dict:
"""jail 통계 조회"""
output = run_fail2ban_command(["status", jail])
stats = {
"banned_ips": [],
"currently_failed": "0",
"total_failed": "0",
"currently_banned": "0",
"total_banned": "0"
}
for line in output.split("\n"):
if "Banned IP list:" in line:
ip_part = line.split("Banned IP list:")[1].strip()
if ip_part:
stats["banned_ips"] = ip_part.split()
elif "Currently failed:" in line:
stats["currently_failed"] = line.split(":")[-1].strip()
elif "Total failed:" in line:
stats["total_failed"] = line.split(":")[-1].strip()
elif "Currently banned:" in line:
stats["currently_banned"] = line.split(":")[-1].strip()
elif "Total banned:" in line:
stats["total_banned"] = line.split(":")[-1].strip()
return stats
@bot.event
async def on_ready():
"""봇 시작 시 실행"""
print(f"✅ 봇 로그인: {bot.user}")
try:
synced = await bot.tree.sync()
print(f"{len(synced)}개 명령어 동기화 완료")
except Exception as e:
print(f"❌ 명령어 동기화 실패: {e}")
@bot.tree.command(name="list", description="차단된 IP 목록과 통계를 조회합니다")
@app_commands.describe(jail="조회할 jail 이름 (기본: sshd)")
async def list_banned(interaction: discord.Interaction, jail: str = "sshd"):
"""차단된 IP 목록 및 통계 조회"""
await interaction.response.defer()
stats = get_jail_stats(jail)
banned_ips = stats["banned_ips"]
embed = discord.Embed(
title=f"🔒 차단된 IP 목록 ({jail})",
color=discord.Color.red() if banned_ips else discord.Color.green(),
timestamp=datetime.now()
)
if banned_ips:
ip_list = "\n".join([f"• `{ip}`" for ip in banned_ips])
embed.description = ip_list
else:
embed.description = "✅ 현재 차단된 IP가 없습니다."
# 통계 추가 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
embed.add_field(name="🔒 영구 차단", value=f"`{stats['currently_banned']}`개 IP", inline=True)
embed.add_field(name="📈 누적 실패", value=f"`{stats['total_failed']}`회", inline=True)
embed.add_field(name="📊 누적 차단", value=f"`{stats['total_banned']}`회", inline=True)
await interaction.followup.send(embed=embed)
@bot.tree.command(name="unban", description="IP 차단을 해제합니다")
@app_commands.describe(ip="차단 해제할 IP 주소", jail="jail 이름 (기본: sshd)")
async def unban_ip(interaction: discord.Interaction, ip: str, jail: str = "sshd"):
"""IP 차단 해제"""
await interaction.response.defer()
result = run_fail2ban_command(["set", jail, "unbanip", ip])
embed = discord.Embed(timestamp=datetime.now())
if "is not banned" in result.lower() or "not in" in result.lower():
embed.title = "⚠️ 차단 해제 실패"
embed.description = f"`{ip}`는 현재 차단되어 있지 않습니다."
embed.color = discord.Color.orange()
elif "error" in result.lower():
embed.title = "❌ 오류 발생"
embed.description = f"```{result}```"
embed.color = discord.Color.red()
else:
embed.title = "✅ 차단 해제 완료"
embed.description = f"`{ip}`의 차단이 해제되었습니다."
embed.color = discord.Color.green()
await interaction.followup.send(embed=embed)
@bot.tree.command(name="status", description="fail2ban 서비스 상태를 확인합니다")
async def status(interaction: discord.Interaction):
"""fail2ban 서비스 상태 조회"""
await interaction.response.defer()
# 전체 상태 조회
output = run_fail2ban_command(["status"])
# 활성 jail 목록 파싱
jails = []
for line in output.split("\n"):
if "Jail list:" in line:
jail_part = line.split("Jail list:")[1].strip()
if jail_part:
jails = [j.strip() for j in jail_part.split(",")]
# 서비스 상태 확인
is_running = "Number of jail:" in output
embed = discord.Embed(
title="📊 fail2ban 서비스 상태",
color=discord.Color.green() if is_running else discord.Color.red(),
timestamp=datetime.now()
)
# 상태
status_text = "🟢 실행 중" if is_running else "🔴 중지됨"
embed.add_field(name="상태", value=status_text, inline=True)
embed.add_field(name="활성 Jail", value=f"`{len(jails)}`개", inline=True)
embed.add_field(name="\u200b", value="\u200b", inline=True)
# Jail 목록 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
if jails:
jail_list = ", ".join([f"`{j}`" for j in jails])
embed.add_field(name="📋 Jail 목록", value=jail_list, inline=False)
# 설정 정보 (여백 포함)
embed.add_field(name="\u200b", value="\u200b", inline=False)
embed.add_field(name="⚙️ 설정", value="10분 내 5회 실패 시 **영구 차단**", inline=False)
await interaction.followup.send(embed=embed)
async def send_ban_notification(ip: str, jail: str, failures: str):
"""IP 차단 알림 전송"""
if not CHANNEL_ID:
print("❌ CHANNEL_ID가 설정되지 않음")
return
channel = bot.get_channel(CHANNEL_ID)
if not channel:
print(f"❌ 채널을 찾을 수 없음: {CHANNEL_ID}")
return
# IP 정보 조회
ip_info = await get_ip_info(ip)
embed = discord.Embed(
title="🚫 IP 차단 알림",
description="fail2ban에 의해 IP가 차단되었습니다.",
color=discord.Color.red(),
timestamp=datetime.now()
)
embed.add_field(name="🔒 차단된 IP", value=f"`{ip}`", inline=True)
embed.add_field(name="📛 Jail", value=f"`{jail}`", inline=True)
embed.add_field(name="❌ 실패 횟수", value=f"`{failures}`", inline=True)
embed.add_field(name="🌍 위치", value=f"`{ip_info.get('city', 'Unknown')}, {ip_info.get('country', 'Unknown')}`", inline=True)
embed.add_field(name="🏢 ISP", value=f"`{ip_info.get('isp', 'Unknown')}`", inline=True)
embed.add_field(name="🖥️ 서버", value=f"`{os.uname().nodename}`", inline=True)
await channel.send(embed=embed)
print(f"✅ 차단 알림 전송: {ip}")
# HTTP 웹훅 서버 (fail2ban에서 호출)
async def webhook_handler(request):
"""fail2ban에서 호출하는 웹훅"""
try:
data = await request.json()
ip = data.get("ip", "Unknown")
jail = data.get("jail", "sshd")
failures = data.get("failures", "?")
await send_ban_notification(ip, jail, failures)
return web.Response(text="OK")
except Exception as e:
print(f"❌ 웹훅 오류: {e}")
return web.Response(text="Error", status=500)
async def start_webhook_server():
"""웹훅 서버 시작"""
app = web.Application()
app.router.add_post("/ban", webhook_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", WEBHOOK_PORT)
await site.start()
print(f"✅ 웹훅 서버 시작: http://0.0.0.0:{WEBHOOK_PORT}/ban")
async def main():
"""메인 함수"""
# 웹훅 서버 시작
await start_webhook_server()
# 봇 시작
await bot.start(BOT_TOKEN)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,2 @@
discord.py>=2.3.0
aiohttp>=3.9.0