import fp from 'fastify-plugin'; import cron from 'node-cron'; import bots from '../config/bots.js'; import { syncWithRetry, getVersion } from '../services/meilisearch/index.js'; const REDIS_PREFIX = 'bot:status:'; const TIMEZONE = 'Asia/Seoul'; async function schedulerPlugin(fastify, opts) { const tasks = new Map(); /** * 봇 상태 Redis에 저장 */ async function updateStatus(botId, status) { const current = await getStatus(botId); const updated = { ...current, ...status, updatedAt: new Date().toISOString() }; await fastify.redis.set(`${REDIS_PREFIX}${botId}`, JSON.stringify(updated)); return updated; } /** * 봇 상태 Redis에서 조회 */ async function getStatus(botId) { const data = await fastify.redis.get(`${REDIS_PREFIX}${botId}`); if (data) { return JSON.parse(data); } return { status: 'stopped', lastCheckAt: null, lastAddedCount: 0, totalAdded: 0, errorMessage: null, }; } /** * 봇 동기화 함수 가져오기 */ function getSyncFunction(bot) { if (bot.type === 'youtube') { return fastify.youtubeBot.syncNewVideos; } else if (bot.type === 'x') { return fastify.xBot.syncNewTweets; } else if (bot.type === 'meilisearch') { return async () => { const count = await syncWithRetry(fastify.meilisearch, fastify.db); return { addedCount: count, total: count }; }; } return null; } /** * Meilisearch 버전 체크 및 동기화 (업데이트 감지용) */ async function startMeilisearchVersionCheck(botId, bot) { const REDIS_VERSION_KEY = 'meilisearch:version'; const CHECK_INTERVAL = 60 * 1000; // 1분 const CHECK_DURATION = 5 * 60 * 1000; // 5분간 체크 // 체크 시작 cron (매일 4시 KST) const task = cron.schedule(bot.cron, async () => { fastify.log.info(`[${botId}] 버전 체크 시작 (5분간 1분 간격)`); await updateStatus(botId, { status: 'running' }); const startTime = Date.now(); let synced = false; let checkCount = 0; // 초기 버전 저장 const initialVersion = await getVersion(fastify.meilisearch); if (!initialVersion) { fastify.log.error(`[${botId}] Meilisearch 연결 실패`); await updateStatus(botId, { status: 'error', errorMessage: 'Meilisearch 연결 실패' }); return; } const savedVersion = await fastify.redis.get(REDIS_VERSION_KEY); fastify.log.info(`[${botId}] 현재 버전: ${initialVersion}, 저장된 버전: ${savedVersion || '없음'}`); // 버전이 이미 다르면 즉시 동기화 if (savedVersion && savedVersion !== initialVersion) { fastify.log.info(`[${botId}] 버전 변경 감지! ${savedVersion} → ${initialVersion}`); await performSync(botId, initialVersion, REDIS_VERSION_KEY); return; } // 5분간 1분 간격으로 체크 const intervalId = setInterval(async () => { checkCount++; const elapsed = Date.now() - startTime; if (synced || elapsed >= CHECK_DURATION) { clearInterval(intervalId); if (!synced) { fastify.log.info(`[${botId}] 버전 변경 없음, 체크 종료`); await updateStatus(botId, { status: 'running', lastCheckAt: new Date().toISOString(), }); } return; } const currentVersion = await getVersion(fastify.meilisearch); fastify.log.info(`[${botId}] 체크 #${checkCount}: 버전 ${currentVersion}`); if (currentVersion && currentVersion !== initialVersion) { synced = true; clearInterval(intervalId); fastify.log.info(`[${botId}] 버전 변경 감지! ${initialVersion} → ${currentVersion}`); await performSync(botId, currentVersion, REDIS_VERSION_KEY); } }, CHECK_INTERVAL); }, { timezone: TIMEZONE }); tasks.set(botId, task); await updateStatus(botId, { status: 'running' }); fastify.log.info(`[${botId}] 버전 체크 스케줄 시작 (cron: ${bot.cron})`); // 초기 버전 저장 (최초 실행 시) const currentVersion = await getVersion(fastify.meilisearch); if (currentVersion) { const savedVersion = await fastify.redis.get(REDIS_VERSION_KEY); if (!savedVersion) { await fastify.redis.set(REDIS_VERSION_KEY, currentVersion); fastify.log.info(`[${botId}] 초기 버전 저장: ${currentVersion}`); } } } /** * 동기화 실행 및 상태 업데이트 */ async function performSync(botId, newVersion, versionKey) { try { const count = await syncWithRetry(fastify.meilisearch, fastify.db); await fastify.redis.set(versionKey, newVersion); await updateStatus(botId, { status: 'running', lastCheckAt: new Date().toISOString(), lastAddedCount: count, errorMessage: null, }); fastify.log.info(`[${botId}] 동기화 완료: ${count}개, 새 버전: ${newVersion}`); } catch (err) { await updateStatus(botId, { status: 'error', lastCheckAt: new Date().toISOString(), errorMessage: err.message, }); fastify.log.error(`[${botId}] 동기화 오류: ${err.message}`); } } /** * 동기화 결과 처리 (중복 코드 제거) */ async function handleSyncResult(botId, result, options = {}) { const { setRunningStatus = false, setErrorOnFail = false } = options; const status = await getStatus(botId); const updateData = { lastCheckAt: new Date().toISOString(), totalAdded: (status.totalAdded || 0) + result.addedCount, }; if (setRunningStatus) { updateData.status = 'running'; updateData.errorMessage = null; } if (result.addedCount > 0) { updateData.lastAddedCount = result.addedCount; } await updateStatus(botId, updateData); return result.addedCount; } /** * 봇 시작 */ async function startBot(botId) { const bot = bots.find(b => b.id === botId); if (!bot) { throw new Error(`봇을 찾을 수 없습니다: ${botId}`); } // 기존 태스크가 있으면 정지 if (tasks.has(botId)) { tasks.get(botId).stop(); tasks.delete(botId); } // Meilisearch는 버전 체크 방식 사용 if (bot.type === 'meilisearch') { await startMeilisearchVersionCheck(botId, bot); return; } const syncFn = getSyncFunction(bot); if (!syncFn) { throw new Error(`지원하지 않는 봇 타입: ${bot.type}`); } // cron 태스크 등록 (한국 시간 기준) const task = cron.schedule(bot.cron, async () => { fastify.log.info(`[${botId}] 동기화 시작`); try { const result = await syncFn(bot); const addedCount = await handleSyncResult(botId, result, { setRunningStatus: true }); fastify.log.info(`[${botId}] 동기화 완료: ${addedCount}개 추가`); } catch (err) { await updateStatus(botId, { status: 'error', lastCheckAt: new Date().toISOString(), errorMessage: err.message, }); fastify.log.error(`[${botId}] 동기화 오류: ${err.message}`); } }, { timezone: TIMEZONE }); tasks.set(botId, task); await updateStatus(botId, { status: 'running' }); fastify.log.info(`[${botId}] 스케줄 시작 (cron: ${bot.cron})`); // 즉시 1회 실행 (meilisearch는 스케줄 시간에만 실행) if (bot.type !== 'meilisearch') { try { const result = await syncFn(bot); const addedCount = await handleSyncResult(botId, result); fastify.log.info(`[${botId}] 초기 동기화 완료: ${addedCount}개 추가`); } catch (err) { fastify.log.error(`[${botId}] 초기 동기화 오류: ${err.message}`); } } } /** * 봇 정지 */ async function stopBot(botId) { if (tasks.has(botId)) { tasks.get(botId).stop(); tasks.delete(botId); } await updateStatus(botId, { status: 'stopped' }); fastify.log.info(`[${botId}] 스케줄 정지`); } /** * 모든 활성 봇 시작 */ async function startAll() { for (const bot of bots) { if (bot.enabled) { try { await startBot(bot.id); } catch (err) { fastify.log.error(`[${bot.id}] 시작 실패: ${err.message}`); } } } } /** * 모든 봇 정지 */ async function stopAll() { for (const [botId, task] of tasks) { task.stop(); await updateStatus(botId, { status: 'stopped' }); } tasks.clear(); } // 데코레이터 등록 fastify.decorate('scheduler', { startBot, stopBot, startAll, stopAll, getStatus, getBots: () => bots, }); // 앱 종료 시 모든 봇 정지 fastify.addHook('onClose', async () => { await stopAll(); fastify.log.info('모든 봇 스케줄 정지'); }); } export default fp(schedulerPlugin, { name: 'scheduler', dependencies: ['db', 'redis', 'meilisearch', 'youtubeBot', 'xBot'], });