import fp from 'fastify-plugin'; import cron from 'node-cron'; import staticBots from '../config/bots.js'; import { syncAllSchedules } from '../services/meilisearch/index.js'; import { nowKST } from '../utils/date.js'; import { logActivity } from '../utils/log.js'; const REDIS_PREFIX = 'bot:status:'; const TIMEZONE = 'Asia/Seoul'; const MAX_CONSECUTIVE_ERRORS = 10; async function schedulerPlugin(fastify, opts) { const tasks = new Map(); const burstTimers = new Map(); // weekly 모드 내부 setInterval 핸들 let cachedBots = null; /** * DB에서 YouTube 봇 목록 조회 */ async function getYouTubeBotsFromDB() { const [rows] = await fastify.db.query( 'SELECT * FROM bot_youtube' ); return rows.map(row => { const weekly = row.weekly_schedule_config ? (typeof row.weekly_schedule_config === 'string' ? JSON.parse(row.weekly_schedule_config) : row.weekly_schedule_config) : null; // weekly 모드면 시작 시각에만 트리거, 아니면 cron_interval 분 주기 let cronExpr; if (weekly && weekly.startTime && weekly.dayOfWeek !== undefined) { const [h, m] = weekly.startTime.split(':').map(Number); cronExpr = `${m} ${h} * * ${weekly.dayOfWeek}`; } else { cronExpr = `*/${row.cron_interval || 2} * * * *`; } return { id: `youtube-${row.id}`, // DB ID를 문자열 형식으로 변환 dbId: row.id, type: 'youtube', channelId: row.channel_id, channelHandle: row.channel_handle, channelName: row.channel_name, bannerUrl: row.banner_url, cron: cronExpr, enabled: row.enabled === 1, titleFilters: row.title_filters ? (typeof row.title_filters === 'string' ? JSON.parse(row.title_filters) : row.title_filters) : [], defaultMemberIds: row.default_member_ids ? (typeof row.default_member_ids === 'string' ? JSON.parse(row.default_member_ids) : row.default_member_ids) : [], extractMembersFromDesc: row.extract_members_from_desc === 1, autoScheduleNext: row.auto_schedule_config ? (typeof row.auto_schedule_config === 'string' ? JSON.parse(row.auto_schedule_config) : row.auto_schedule_config) : null, weeklySchedule: weekly, }; }); } /** * DB에서 X 봇 목록 조회 */ async function getXBotsFromDB() { const [rows] = await fastify.db.query( 'SELECT * FROM bot_x' ); return rows.map(row => ({ id: `x-${row.id}`, dbId: row.id, type: 'x', username: row.username, displayName: row.display_name, avatarUrl: row.avatar_url, nitterUrl: process.env.NITTER_URL || 'http://nitter:8080', cron: `*/${row.cron_interval} * * * *`, enabled: row.enabled === 1, textFilters: row.text_filters ? (typeof row.text_filters === 'string' ? JSON.parse(row.text_filters) : row.text_filters) : [], includeRetweets: row.include_retweets === 1, extractYoutube: row.extract_youtube === 1, excludeManagedChannels: row.exclude_managed_channels === 1, })); } /** * 모든 봇 목록 가져오기 (정적 + DB) */ async function getAllBots(forceRefresh = false) { if (cachedBots && !forceRefresh) { return cachedBots; } const youtubeBots = await getYouTubeBotsFromDB(); const xBots = await getXBotsFromDB(); cachedBots = [...staticBots, ...youtubeBots, ...xBots]; return cachedBots; } /** * 봇 ID로 봇 찾기 */ async function findBot(botId) { const allBots = await getAllBots(); return allBots.find(b => b.id === botId); } /** * 봇 상태 Redis에 저장 */ async function updateStatus(botId, status) { const current = await getStatus(botId); const updated = { ...current, ...status, updatedAt: nowKST() }; await fastify.redis.set(`${REDIS_PREFIX}${botId}`, JSON.stringify(updated)); return updated; } /** * 봇 상태 Redis에서 조회 */ async function getStatus(botId) { const data = await fastify.redis.get(`${REDIS_PREFIX}${botId}`); if (data) { return JSON.parse(data); } return { status: 'stopped', lastCheckAt: null, lastAddedCount: 0, totalAdded: 0, lastSyncDuration: null, errorMessage: null, consecutiveErrors: 0, }; } /** * 봇 동기화 함수 가져오기 */ function getSyncFunction(bot) { if (bot.type === 'youtube') { return fastify.youtubeBot.syncNewVideos; } else if (bot.type === 'x') { return fastify.xBot.syncNewTweets; } else if (bot.type === 'meilisearch') { return async () => { const count = await syncAllSchedules(fastify.meilisearch, fastify.db); return { addedCount: count, total: count }; }; } return null; } /** * 동기화 결과 처리 */ async function handleSyncResult(botId, result, options = {}) { const { setRunningStatus = false } = options; const status = await getStatus(botId); const updateData = { lastCheckAt: nowKST(), totalAdded: (status.totalAdded || 0) + result.addedCount, consecutiveErrors: 0, }; if (setRunningStatus) { updateData.status = 'running'; updateData.errorMessage = null; } if (result.addedCount > 0) { updateData.lastAddedCount = result.addedCount; } await updateStatus(botId, updateData); return result.addedCount; } /** * DB의 enabled 필드 업데이트 (정적 봇은 무시) */ async function setEnabled(botId, enabled) { const match = botId.match(/^(youtube|x)-(\d+)$/); if (!match) return; // 정적 봇 (meilisearch 등) const table = match[1] === 'x' ? 'bot_x' : 'bot_youtube'; const dbId = match[2]; await fastify.db.query(`UPDATE ${table} SET enabled = ? WHERE id = ?`, [enabled ? 1 : 0, dbId]); invalidateCache(); } /** * 단일 동기화 실행 + 에러 처리 (consecutiveErrors, 자동 정지 포함) */ async function runSync(botId, bot, syncFn, { setRunningStatus = false } = {}) { try { const result = await syncFn(bot); const addedCount = await handleSyncResult(botId, result, { setRunningStatus }); fastify.log.info(`[${botId}] 동기화 완료: ${addedCount}개 추가`); if (addedCount > 0) { logActivity(fastify.db, { actor: botId, action: 'sync_complete', category: 'sync', summary: `${botId} 동기화 완료: ${addedCount}개 추가`, details: { addedCount }, }); } return { ok: true, addedCount }; } catch (err) { const prev = await getStatus(botId); const consecutiveErrors = (prev.consecutiveErrors || 0) + 1; await updateStatus(botId, { status: 'error', lastCheckAt: nowKST(), errorMessage: err.message, consecutiveErrors, }); fastify.log.error(`[${botId}] 동기화 오류 (${consecutiveErrors}/${MAX_CONSECUTIVE_ERRORS}): ${err.message}`); if (consecutiveErrors === 1) { logActivity(fastify.db, { actor: botId, action: 'error', category: 'sync', summary: `${botId} 동기화 오류: ${err.message}`, details: { error: err.message }, }); } if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { fastify.log.warn(`[${botId}] 연속 ${MAX_CONSECUTIVE_ERRORS}회 실패 - 자동 정지`); logActivity(fastify.db, { actor: botId, action: 'stop', category: 'bot', summary: `${botId} 연속 ${MAX_CONSECUTIVE_ERRORS}회 실패로 자동 정지`, details: { error: err.message, consecutiveErrors }, }); try { await stopBot(botId); } catch (stopErr) { fastify.log.error(`[${botId}] 자동 정지 실패: ${stopErr.message}`); } } return { ok: false, err }; } } /** * 주간 집중 폴링 세션 시작 (weekly 모드) * 새 영상 1개 발견 시 즉시 종료, durationMinutes 초과 시도 종료 */ async function startWeeklyBurst(botId, bot, syncFn) { if (burstTimers.has(botId)) return; // 이미 실행 중이면 무시 const intervalSeconds = Math.max(5, bot.weeklySchedule?.intervalSeconds || 30); const durationMinutes = Math.max(1, bot.weeklySchedule?.durationMinutes || 30); const endAt = Date.now() + durationMinutes * 60 * 1000; fastify.log.info(`[${botId}] 주간 폴링 시작 (간격 ${intervalSeconds}초, 최대 ${durationMinutes}분)`); const stopBurst = (reason) => { const handle = burstTimers.get(botId); if (!handle) return; clearInterval(handle.timer); burstTimers.delete(botId); fastify.log.info(`[${botId}] 주간 폴링 종료: ${reason}`); }; const tick = async () => { if (!burstTimers.has(botId)) return; const result = await runSync(botId, bot, syncFn, { setRunningStatus: true }); if (!burstTimers.has(botId)) return; // runSync 중 자동 정지 등으로 정리됐을 수 있음 if (result.ok && result.addedCount > 0) { stopBurst(`새 영상 ${result.addedCount}개 발견 (stopOnFound)`); return; } if (Date.now() >= endAt) { stopBurst('최대 지속시간 초과'); } }; // 타이머 먼저 등록 → tick에서 burstTimers.has 체크로 중복/중단 판별 const timer = setInterval(tick, intervalSeconds * 1000); burstTimers.set(botId, { timer, endAt }); await tick(); } /** * 봇 시작 */ async function startBot(botId) { const bot = await findBot(botId); if (!bot) { throw new Error(`봇을 찾을 수 없습니다: ${botId}`); } // 기존 태스크가 있으면 정지 if (tasks.has(botId)) { tasks.get(botId).stop(); tasks.delete(botId); } if (burstTimers.has(botId)) { clearInterval(burstTimers.get(botId).timer); burstTimers.delete(botId); } // DB enabled 활성화 await setEnabled(botId, true); const syncFn = getSyncFunction(bot); if (!syncFn) { throw new Error(`지원하지 않는 봇 타입: ${bot.type}`); } // cron 태스크 등록 (한국 시간 기준) const task = cron.schedule(bot.cron, async () => { fastify.log.info(`[${botId}] 동기화 시작`); if (bot.weeklySchedule) { await startWeeklyBurst(botId, bot, syncFn); } else { await runSync(botId, bot, syncFn, { setRunningStatus: true }); } }, { timezone: TIMEZONE }); tasks.set(botId, task); await updateStatus(botId, { status: 'running' }); fastify.log.info(`[${botId}] 스케줄 시작 (cron: ${bot.cron})`); // 즉시 1회 실행: meilisearch와 weekly 모드는 제외 (weekly는 지정 시각에만) if (bot.type !== 'meilisearch' && !bot.weeklySchedule) { await runSync(botId, bot, syncFn, { setRunningStatus: false }); } } /** * 봇 정지 */ async function stopBot(botId) { if (tasks.has(botId)) { tasks.get(botId).stop(); tasks.delete(botId); } // weekly 모드 burst 타이머도 정리 if (burstTimers.has(botId)) { clearInterval(burstTimers.get(botId).timer); burstTimers.delete(botId); } // DB enabled 비활성화 await setEnabled(botId, false); await updateStatus(botId, { status: 'stopped' }); fastify.log.info(`[${botId}] 스케줄 정지`); } /** * 모든 활성 봇 시작 */ async function startAll() { const allBots = await getAllBots(true); // DB에서 새로 로드 for (const bot of allBots) { if (bot.enabled) { try { await startBot(bot.id); } catch (err) { fastify.log.error(`[${bot.id}] 시작 실패: ${err.message}`); } } } } /** * 모든 봇 정지 */ async function stopAll() { for (const [botId, task] of tasks) { task.stop(); await updateStatus(botId, { status: 'stopped' }); } tasks.clear(); } /** * 봇 캐시 갱신 (봇 추가/수정/삭제 시 호출) */ function invalidateCache() { cachedBots = null; } // 데코레이터 등록 fastify.decorate('scheduler', { startBot, stopBot, startAll, stopAll, getStatus, getBots: (forceRefresh = false) => getAllBots(forceRefresh), invalidateCache, }); // 앱 종료 시 모든 봇 정지 fastify.addHook('onClose', async () => { await stopAll(); fastify.log.info('모든 봇 스케줄 정지'); }); } export default fp(schedulerPlugin, { name: 'scheduler', dependencies: ['db', 'redis', 'meilisearch', 'youtubeBot', 'xBot'], });