fromis_9/backend/src/plugins/scheduler.js

305 lines
9.2 KiB
JavaScript
Raw Normal View History

import fp from 'fastify-plugin';
import cron from 'node-cron';
import bots from '../config/bots.js';
import { syncWithRetry, getVersion } from '../services/meilisearch/index.js';
import { nowKST } from '../utils/date.js';
const REDIS_PREFIX = 'bot:status:';
const TIMEZONE = 'Asia/Seoul';
async function schedulerPlugin(fastify, opts) {
const tasks = new Map();
/**
* 상태 Redis에 저장
*/
async function updateStatus(botId, status) {
const current = await getStatus(botId);
const updated = { ...current, ...status, updatedAt: nowKST() };
await fastify.redis.set(`${REDIS_PREFIX}${botId}`, JSON.stringify(updated));
return updated;
}
/**
* 상태 Redis에서 조회
*/
async function getStatus(botId) {
const data = await fastify.redis.get(`${REDIS_PREFIX}${botId}`);
if (data) {
return JSON.parse(data);
}
return {
status: 'stopped',
lastCheckAt: null,
lastAddedCount: 0,
totalAdded: 0,
lastSyncDuration: null,
errorMessage: null,
};
}
/**
* 동기화 함수 가져오기
*/
function getSyncFunction(bot) {
if (bot.type === 'youtube') {
return fastify.youtubeBot.syncNewVideos;
} else if (bot.type === 'x') {
return fastify.xBot.syncNewTweets;
} else if (bot.type === 'meilisearch') {
return async () => {
const count = await syncWithRetry(fastify.meilisearch, fastify.db);
return { addedCount: count, total: count };
};
}
return null;
}
/**
* Meilisearch 버전 체크 동기화 (업데이트 감지용)
*/
async function startMeilisearchVersionCheck(botId, bot) {
const REDIS_VERSION_KEY = 'meilisearch:version';
const CHECK_INTERVAL = 60 * 1000; // 1분
const CHECK_DURATION = 5 * 60 * 1000; // 5분간 체크
// 체크 시작 cron (매일 4시 KST)
const task = cron.schedule(bot.cron, async () => {
fastify.log.info(`[${botId}] 버전 체크 시작 (5분간 1분 간격)`);
await updateStatus(botId, { status: 'running' });
const startTime = Date.now();
let synced = false;
let checkCount = 0;
// 초기 버전 저장
const initialVersion = await getVersion(fastify.meilisearch);
if (!initialVersion) {
fastify.log.error(`[${botId}] Meilisearch 연결 실패`);
await updateStatus(botId, { status: 'error', errorMessage: 'Meilisearch 연결 실패' });
return;
}
const savedVersion = await fastify.redis.get(REDIS_VERSION_KEY);
fastify.log.info(`[${botId}] 현재 버전: ${initialVersion}, 저장된 버전: ${savedVersion || '없음'}`);
// 버전이 이미 다르면 즉시 동기화
if (savedVersion && savedVersion !== initialVersion) {
fastify.log.info(`[${botId}] 버전 변경 감지! ${savedVersion}${initialVersion}`);
await performSync(botId, initialVersion, REDIS_VERSION_KEY);
return;
}
// 5분간 1분 간격으로 체크
const intervalId = setInterval(async () => {
checkCount++;
const elapsed = Date.now() - startTime;
if (synced || elapsed >= CHECK_DURATION) {
clearInterval(intervalId);
if (!synced) {
fastify.log.info(`[${botId}] 버전 변경 없음, 체크 종료`);
await updateStatus(botId, {
status: 'running',
lastCheckAt: nowKST(),
});
}
return;
}
const currentVersion = await getVersion(fastify.meilisearch);
fastify.log.info(`[${botId}] 체크 #${checkCount}: 버전 ${currentVersion}`);
if (currentVersion && currentVersion !== initialVersion) {
synced = true;
clearInterval(intervalId);
fastify.log.info(`[${botId}] 버전 변경 감지! ${initialVersion}${currentVersion}`);
await performSync(botId, currentVersion, REDIS_VERSION_KEY);
}
}, CHECK_INTERVAL);
}, { timezone: TIMEZONE });
tasks.set(botId, task);
await updateStatus(botId, { status: 'running' });
fastify.log.info(`[${botId}] 버전 체크 스케줄 시작 (cron: ${bot.cron})`);
// 초기 버전 저장 (최초 실행 시)
const currentVersion = await getVersion(fastify.meilisearch);
if (currentVersion) {
const savedVersion = await fastify.redis.get(REDIS_VERSION_KEY);
if (!savedVersion) {
await fastify.redis.set(REDIS_VERSION_KEY, currentVersion);
fastify.log.info(`[${botId}] 초기 버전 저장: ${currentVersion}`);
}
}
}
/**
* 동기화 실행 상태 업데이트
*/
async function performSync(botId, newVersion, versionKey) {
const startTime = Date.now();
try {
const count = await syncWithRetry(fastify.meilisearch, fastify.db);
const duration = Date.now() - startTime;
await fastify.redis.set(versionKey, newVersion);
await updateStatus(botId, {
status: 'running',
lastCheckAt: nowKST(),
lastAddedCount: count,
lastSyncDuration: duration,
errorMessage: null,
});
fastify.log.info(`[${botId}] 동기화 완료: ${count}개, ${duration}ms, 새 버전: ${newVersion}`);
} catch (err) {
const duration = Date.now() - startTime;
await updateStatus(botId, {
status: 'error',
lastCheckAt: nowKST(),
lastSyncDuration: duration,
errorMessage: err.message,
});
fastify.log.error(`[${botId}] 동기화 오류: ${err.message}`);
}
}
/**
* 동기화 결과 처리 (중복 코드 제거)
*/
async function handleSyncResult(botId, result, options = {}) {
const { setRunningStatus = false, setErrorOnFail = false } = options;
const status = await getStatus(botId);
const updateData = {
lastCheckAt: nowKST(),
totalAdded: (status.totalAdded || 0) + result.addedCount,
};
if (setRunningStatus) {
updateData.status = 'running';
updateData.errorMessage = null;
}
if (result.addedCount > 0) {
updateData.lastAddedCount = result.addedCount;
}
await updateStatus(botId, updateData);
return result.addedCount;
}
/**
* 시작
*/
async function startBot(botId) {
const bot = bots.find(b => b.id === botId);
if (!bot) {
throw new Error(`봇을 찾을 수 없습니다: ${botId}`);
}
// 기존 태스크가 있으면 정지
if (tasks.has(botId)) {
tasks.get(botId).stop();
tasks.delete(botId);
}
// Meilisearch는 버전 체크 방식 사용
if (bot.type === 'meilisearch') {
await startMeilisearchVersionCheck(botId, bot);
return;
}
const syncFn = getSyncFunction(bot);
if (!syncFn) {
throw new Error(`지원하지 않는 봇 타입: ${bot.type}`);
}
// cron 태스크 등록 (한국 시간 기준)
const task = cron.schedule(bot.cron, async () => {
fastify.log.info(`[${botId}] 동기화 시작`);
try {
const result = await syncFn(bot);
const addedCount = await handleSyncResult(botId, result, { setRunningStatus: true });
fastify.log.info(`[${botId}] 동기화 완료: ${addedCount}개 추가`);
} catch (err) {
await updateStatus(botId, {
status: 'error',
lastCheckAt: nowKST(),
errorMessage: err.message,
});
fastify.log.error(`[${botId}] 동기화 오류: ${err.message}`);
}
}, { timezone: TIMEZONE });
tasks.set(botId, task);
await updateStatus(botId, { status: 'running' });
fastify.log.info(`[${botId}] 스케줄 시작 (cron: ${bot.cron})`);
// 즉시 1회 실행 (meilisearch는 스케줄 시간에만 실행)
if (bot.type !== 'meilisearch') {
try {
const result = await syncFn(bot);
const addedCount = await handleSyncResult(botId, result);
fastify.log.info(`[${botId}] 초기 동기화 완료: ${addedCount}개 추가`);
} catch (err) {
fastify.log.error(`[${botId}] 초기 동기화 오류: ${err.message}`);
}
}
}
/**
* 정지
*/
async function stopBot(botId) {
if (tasks.has(botId)) {
tasks.get(botId).stop();
tasks.delete(botId);
}
await updateStatus(botId, { status: 'stopped' });
fastify.log.info(`[${botId}] 스케줄 정지`);
}
/**
* 모든 활성 시작
*/
async function startAll() {
for (const bot of bots) {
if (bot.enabled) {
try {
await startBot(bot.id);
} catch (err) {
fastify.log.error(`[${bot.id}] 시작 실패: ${err.message}`);
}
}
}
}
/**
* 모든 정지
*/
async function stopAll() {
for (const [botId, task] of tasks) {
task.stop();
await updateStatus(botId, { status: 'stopped' });
}
tasks.clear();
}
// 데코레이터 등록
fastify.decorate('scheduler', {
startBot,
stopBot,
startAll,
stopAll,
getStatus,
getBots: () => bots,
});
// 앱 종료 시 모든 봇 정지
fastify.addHook('onClose', async () => {
await stopAll();
fastify.log.info('모든 봇 스케줄 정지');
});
}
export default fp(schedulerPlugin, {
name: 'scheduler',
dependencies: ['db', 'redis', 'meilisearch', 'youtubeBot', 'xBot'],
});