feat(scheduler): in-process matching scheduler with audit + admin endpoints
- Add in-process scheduler service triggered by ENABLE_SCHEDULER - Record runs in new matching_runs table; throttle per-event and log stats - Add admin endpoints: POST /api/admin/events/:slug/run-now and GET /api/admin/events/:slug/matching-runs - Wire scheduler start/stop in server and add ENV flags + compose defaults - Prisma schema: add MatchingRun model and relation - Update env examples for scheduler configuration
This commit is contained in:
@@ -50,3 +50,11 @@ LOCKOUT_DURATION_MINUTES=15
|
||||
|
||||
# Logging
|
||||
LOG_LEVEL=debug
|
||||
|
||||
# Scheduler
|
||||
# Enable simple in-process scheduler for auto-matching
|
||||
ENABLE_SCHEDULER=false
|
||||
# Global tick interval in seconds (default 300 = 5min)
|
||||
SCHEDULER_INTERVAL_SEC=300
|
||||
# Per-event minimum time between runs in seconds (default 60s)
|
||||
MATCHING_MIN_INTERVAL_SEC=60
|
||||
|
||||
@@ -50,3 +50,11 @@ LOCKOUT_DURATION_MINUTES=15
|
||||
|
||||
# Logging
|
||||
LOG_LEVEL=warn
|
||||
|
||||
# Scheduler
|
||||
# Enable simple in-process scheduler for auto-matching (enable on exactly one replica)
|
||||
ENABLE_SCHEDULER=false
|
||||
# Global tick interval in seconds (e.g., 300 = 5min)
|
||||
SCHEDULER_INTERVAL_SEC=300
|
||||
# Per-event minimum time between runs in seconds to avoid thrashing
|
||||
MATCHING_MIN_INTERVAL_SEC=120
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
.cli users:list
|
||||
.cli users:lit
|
||||
.cli help
|
||||
.cli
|
||||
.cli users:verify --email test@radziel.com
|
||||
users:verify --email test@radziel.com
|
||||
.cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gie
|
||||
.cli users
|
||||
.cli uers
|
||||
.cli
|
||||
.cli users:list
|
||||
.cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gierwialo
|
||||
.cli users:verify --email test@radziel.com
|
||||
.cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gierwialo
|
||||
@@ -17,14 +27,4 @@ clear
|
||||
.cli
|
||||
.cli events
|
||||
.events:
|
||||
events:
|
||||
events:lists
|
||||
events:list
|
||||
users
|
||||
event:let
|
||||
:
|
||||
event
|
||||
.
|
||||
help
|
||||
.exity
|
||||
.cli events:import:worldsdc --dry-run --limit 20
|
||||
events:
|
||||
@@ -102,6 +102,7 @@ model Event {
|
||||
checkinToken EventCheckinToken?
|
||||
userHeats EventUserHeat[]
|
||||
recordingSuggestions RecordingSuggestion[]
|
||||
matchingRuns MatchingRun[]
|
||||
|
||||
@@map("events")
|
||||
}
|
||||
@@ -298,3 +299,22 @@ model RecordingSuggestion {
|
||||
@@index([recorderId])
|
||||
@@map("recording_suggestions")
|
||||
}
|
||||
|
||||
// Matching runs audit log
|
||||
model MatchingRun {
|
||||
id Int @id @default(autoincrement())
|
||||
eventId Int @map("event_id")
|
||||
trigger String @db.VarChar(20) // 'manual' | 'scheduler'
|
||||
status String @default("running") @db.VarChar(20) // 'running' | 'success' | 'error'
|
||||
startedAt DateTime @default(now()) @map("started_at")
|
||||
endedAt DateTime? @map("ended_at")
|
||||
matchedCount Int @default(0) @map("matched_count")
|
||||
notFoundCount Int @default(0) @map("not_found_count")
|
||||
error String? @db.Text
|
||||
|
||||
// Relations
|
||||
event Event @relation(fields: [eventId], references: [id], onDelete: Cascade)
|
||||
|
||||
@@index([eventId, startedAt])
|
||||
@@map("matching_runs")
|
||||
}
|
||||
|
||||
@@ -117,6 +117,7 @@ app.use('/api/wsdc', require('./routes/wsdc'));
|
||||
app.use('/api/divisions', require('./routes/divisions'));
|
||||
app.use('/api/competition-types', require('./routes/competitionTypes'));
|
||||
app.use('/api/matches', require('./routes/matches'));
|
||||
app.use('/api/admin', require('./routes/admin'));
|
||||
// app.use('/api/ratings', require('./routes/ratings'));
|
||||
|
||||
// 404 handler
|
||||
|
||||
114
backend/src/routes/admin.js
Normal file
114
backend/src/routes/admin.js
Normal file
@@ -0,0 +1,114 @@
|
||||
const express = require('express');
|
||||
const { prisma } = require('../utils/db');
|
||||
const { authenticate } = require('../middleware/auth');
|
||||
const matchingService = require('../services/matching');
|
||||
const { SUGGESTION_STATUS } = require('../constants');
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
// POST /api/admin/events/:slug/run-now - Trigger matching immediately for an event
|
||||
router.post('/events/:slug/run-now', authenticate, async (req, res, next) => {
|
||||
try {
|
||||
const { slug } = req.params;
|
||||
|
||||
const event = await prisma.event.findUnique({
|
||||
where: { slug },
|
||||
select: { id: true, slug: true },
|
||||
});
|
||||
|
||||
if (!event) {
|
||||
return res.status(404).json({ success: false, error: 'Event not found' });
|
||||
}
|
||||
|
||||
const startedAt = new Date();
|
||||
const runRow = await prisma.matchingRun.create({
|
||||
data: {
|
||||
eventId: event.id,
|
||||
trigger: 'manual',
|
||||
status: 'running',
|
||||
startedAt,
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
const suggestions = await matchingService.runMatching(event.id);
|
||||
await matchingService.saveMatchingResults(event.id, suggestions);
|
||||
|
||||
const notFoundCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.NOT_FOUND).length;
|
||||
const matchedCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.PENDING).length;
|
||||
|
||||
await prisma.matchingRun.update({
|
||||
where: { id: runRow.id },
|
||||
data: {
|
||||
status: 'success',
|
||||
endedAt: new Date(),
|
||||
matchedCount,
|
||||
notFoundCount,
|
||||
},
|
||||
});
|
||||
|
||||
return res.json({
|
||||
success: true,
|
||||
data: {
|
||||
eventSlug: event.slug,
|
||||
startedAt,
|
||||
endedAt: new Date(),
|
||||
matched: matchedCount,
|
||||
notFound: notFoundCount,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
await prisma.matchingRun.update({
|
||||
where: { id: runRow.id },
|
||||
data: {
|
||||
status: 'error',
|
||||
endedAt: new Date(),
|
||||
error: String(err?.message || err),
|
||||
},
|
||||
});
|
||||
return res.status(500).json({ success: false, error: 'Matching failed', details: String(err?.message || err) });
|
||||
}
|
||||
} catch (error) {
|
||||
next(error);
|
||||
}
|
||||
});
|
||||
|
||||
// GET /api/admin/events/:slug/matching-runs?limit=20 - List recent runs
|
||||
router.get('/events/:slug/matching-runs', authenticate, async (req, res, next) => {
|
||||
try {
|
||||
const { slug } = req.params;
|
||||
const limit = Math.min(parseInt(req.query.limit || '20', 10), 100);
|
||||
|
||||
const event = await prisma.event.findUnique({
|
||||
where: { slug },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (!event) {
|
||||
return res.status(404).json({ success: false, error: 'Event not found' });
|
||||
}
|
||||
|
||||
const runs = await prisma.matchingRun.findMany({
|
||||
where: { eventId: event.id },
|
||||
orderBy: { startedAt: 'desc' },
|
||||
take: limit,
|
||||
select: {
|
||||
id: true,
|
||||
trigger: true,
|
||||
status: true,
|
||||
startedAt: true,
|
||||
endedAt: true,
|
||||
matchedCount: true,
|
||||
notFoundCount: true,
|
||||
error: true,
|
||||
},
|
||||
});
|
||||
|
||||
res.json({ success: true, count: runs.length, data: runs });
|
||||
} catch (error) {
|
||||
next(error);
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
|
||||
@@ -3,6 +3,7 @@ const http = require('http');
|
||||
const app = require('./app');
|
||||
const { testConnection, disconnect } = require('./utils/db');
|
||||
const { initializeSocket } = require('./socket');
|
||||
const scheduler = require('./services/scheduler');
|
||||
|
||||
const PORT = process.env.PORT || 3000;
|
||||
|
||||
@@ -24,6 +25,16 @@ async function startServer() {
|
||||
console.log(`Server running on port: ${PORT}`);
|
||||
console.log(`Health check: http://localhost:${PORT}/api/health`);
|
||||
console.log('=================================');
|
||||
if (process.env.ENABLE_SCHEDULER === 'true') {
|
||||
try {
|
||||
scheduler.start();
|
||||
console.log('[*] Scheduler started');
|
||||
} catch (e) {
|
||||
console.error('Failed to start scheduler:', e?.message || e);
|
||||
}
|
||||
} else {
|
||||
console.log('[*] Scheduler disabled (ENABLE_SCHEDULER != "true")');
|
||||
}
|
||||
});
|
||||
|
||||
return server;
|
||||
@@ -37,12 +48,14 @@ startServer().catch((err) => {
|
||||
// Graceful shutdown
|
||||
process.on('SIGTERM', async () => {
|
||||
console.log('SIGTERM received, shutting down gracefully...');
|
||||
try { scheduler.stop(); } catch (_) {}
|
||||
await disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('SIGINT received, shutting down gracefully...');
|
||||
try { scheduler.stop(); } catch (_) {}
|
||||
await disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
155
backend/src/services/scheduler.js
Normal file
155
backend/src/services/scheduler.js
Normal file
@@ -0,0 +1,155 @@
|
||||
const { prisma } = require('../utils/db');
|
||||
const matchingService = require('./matching');
|
||||
const { SUGGESTION_STATUS } = require('../constants');
|
||||
|
||||
// Simple in-process scheduler based on setInterval (no external deps)
|
||||
// Designed for single-backend deployments. When scaling to multiple replicas,
|
||||
// add a DB-based lock (e.g., pg advisory lock) to ensure single run per event.
|
||||
|
||||
const DEFAULT_INTERVAL_SEC = parseInt(process.env.SCHEDULER_INTERVAL_SEC || '300', 10); // 5 min
|
||||
const MIN_INTERVAL_SEC = parseInt(process.env.MATCHING_MIN_INTERVAL_SEC || '60', 10); // 1 min guard
|
||||
|
||||
let timer = null;
|
||||
let runningEvents = new Set(); // In-memory guard to avoid overlapping runs per event
|
||||
|
||||
async function listCandidateEvents() {
|
||||
// Fetch events that are ongoing or upcoming (basic heuristic)
|
||||
// This avoids hammering historical events.
|
||||
const now = new Date();
|
||||
|
||||
return prisma.event.findMany({
|
||||
where: {
|
||||
// Include events that end today or in the future
|
||||
OR: [
|
||||
{ endDate: { gte: now } },
|
||||
{ endDate: null },
|
||||
],
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
slug: true,
|
||||
matchingRunAt: true,
|
||||
startDate: true,
|
||||
endDate: true,
|
||||
},
|
||||
orderBy: { startDate: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
function shouldRunForEvent(event) {
|
||||
if (!event) return false;
|
||||
// Rate limiting per event by last run timestamp
|
||||
if (!event.matchingRunAt) return true;
|
||||
|
||||
const last = new Date(event.matchingRunAt).getTime();
|
||||
const now = Date.now();
|
||||
return (now - last) / 1000 >= MIN_INTERVAL_SEC;
|
||||
}
|
||||
|
||||
async function runForEvent(event) {
|
||||
if (runningEvents.has(event.id)) {
|
||||
return; // Skip overlapping runs for the same event
|
||||
}
|
||||
|
||||
runningEvents.add(event.id);
|
||||
const startedAt = new Date();
|
||||
let runRow = null;
|
||||
try {
|
||||
// Create run audit row
|
||||
runRow = await prisma.matchingRun.create({
|
||||
data: {
|
||||
eventId: event.id,
|
||||
trigger: 'scheduler',
|
||||
status: 'running',
|
||||
startedAt,
|
||||
},
|
||||
});
|
||||
|
||||
const suggestions = await matchingService.runMatching(event.id);
|
||||
await matchingService.saveMatchingResults(event.id, suggestions);
|
||||
|
||||
const notFoundCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.NOT_FOUND).length;
|
||||
const matchedCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.PENDING).length;
|
||||
|
||||
console.log(
|
||||
`[scheduler] ${event.slug}: matched=${matchedCount} notFound=${notFoundCount} ` +
|
||||
`took=${((Date.now() - startedAt.getTime()) / 1000).toFixed(2)}s`
|
||||
);
|
||||
|
||||
// Update run audit row
|
||||
if (runRow) {
|
||||
await prisma.matchingRun.update({
|
||||
where: { id: runRow.id },
|
||||
data: {
|
||||
status: 'success',
|
||||
endedAt: new Date(),
|
||||
matchedCount,
|
||||
notFoundCount,
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[scheduler] Error running matching for ${event.slug}:`, err?.message || err);
|
||||
if (runRow) {
|
||||
try {
|
||||
await prisma.matchingRun.update({
|
||||
where: { id: runRow.id },
|
||||
data: {
|
||||
status: 'error',
|
||||
endedAt: new Date(),
|
||||
error: String(err?.message || err),
|
||||
},
|
||||
});
|
||||
} catch (_) {}
|
||||
}
|
||||
} finally {
|
||||
runningEvents.delete(event.id);
|
||||
}
|
||||
}
|
||||
|
||||
async function tick() {
|
||||
try {
|
||||
const events = await listCandidateEvents();
|
||||
for (const event of events) {
|
||||
if (shouldRunForEvent(event)) {
|
||||
// Fire and forget to allow parallel per-event processing in one process
|
||||
// but still guarded per event by runningEvents set
|
||||
// eslint-disable-next-line no-void
|
||||
void runForEvent(event);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[scheduler] tick() failed:', err?.message || err);
|
||||
}
|
||||
}
|
||||
|
||||
function start() {
|
||||
if (timer) return; // Already started
|
||||
|
||||
const intervalMs = Math.max(5, DEFAULT_INTERVAL_SEC) * 1000;
|
||||
console.log(`[*] Scheduler enabled. Interval=${intervalMs / 1000}s, minPerEvent=${MIN_INTERVAL_SEC}s`);
|
||||
|
||||
// Initial delayed run to avoid cold-start spikes
|
||||
setTimeout(() => {
|
||||
// eslint-disable-next-line no-void
|
||||
void tick();
|
||||
}, 5_000);
|
||||
|
||||
timer = setInterval(() => {
|
||||
// eslint-disable-next-line no-void
|
||||
void tick();
|
||||
}, intervalMs);
|
||||
}
|
||||
|
||||
function stop() {
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
}
|
||||
runningEvents.clear();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
start,
|
||||
stop,
|
||||
};
|
||||
Reference in New Issue
Block a user