From 537dd112ffb986e986351611419ce02d667a04ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Gierwia=C5=82o?= Date: Sun, 30 Nov 2025 13:14:02 +0100 Subject: [PATCH] feat(scheduler): in-process matching scheduler with audit + admin endpoints - Add in-process scheduler service triggered by ENABLE_SCHEDULER - Record runs in new matching_runs table; throttle per-event and log stats - Add admin endpoints: POST /api/admin/events/:slug/run-now and GET /api/admin/events/:slug/matching-runs - Wire scheduler start/stop in server and add ENV flags + compose defaults - Prisma schema: add MatchingRun model and relation - Update env examples for scheduler configuration --- backend/.env.development.example | 8 ++ backend/.env.production.example | 8 ++ backend/.repl_history | 22 ++--- backend/prisma/schema.prisma | 20 ++++ backend/src/app.js | 1 + backend/src/routes/admin.js | 114 ++++++++++++++++++++++ backend/src/server.js | 13 +++ backend/src/services/scheduler.js | 155 ++++++++++++++++++++++++++++++ docker-compose.yml | 8 ++ 9 files changed, 338 insertions(+), 11 deletions(-) create mode 100644 backend/src/routes/admin.js create mode 100644 backend/src/services/scheduler.js diff --git a/backend/.env.development.example b/backend/.env.development.example index b43093a..14ef9d1 100644 --- a/backend/.env.development.example +++ b/backend/.env.development.example @@ -50,3 +50,11 @@ LOCKOUT_DURATION_MINUTES=15 # Logging LOG_LEVEL=debug + +# Scheduler +# Enable simple in-process scheduler for auto-matching +ENABLE_SCHEDULER=false +# Global tick interval in seconds (default 300 = 5min) +SCHEDULER_INTERVAL_SEC=300 +# Per-event minimum time between runs in seconds (default 60s) +MATCHING_MIN_INTERVAL_SEC=60 diff --git a/backend/.env.production.example b/backend/.env.production.example index e2bd3b9..65b7bc9 100644 --- a/backend/.env.production.example +++ b/backend/.env.production.example @@ -50,3 +50,11 @@ LOCKOUT_DURATION_MINUTES=15 # Logging LOG_LEVEL=warn + +# Scheduler +# Enable simple in-process scheduler for auto-matching (enable on exactly one replica) +ENABLE_SCHEDULER=false +# Global tick interval in seconds (e.g., 300 = 5min) +SCHEDULER_INTERVAL_SEC=300 +# Per-event minimum time between runs in seconds to avoid thrashing +MATCHING_MIN_INTERVAL_SEC=120 diff --git a/backend/.repl_history b/backend/.repl_history index 5c021f3..bcaddaf 100644 --- a/backend/.repl_history +++ b/backend/.repl_history @@ -1,4 +1,14 @@ .cli users:list +.cli users:lit +.cli help +.cli +.cli users:verify --email test@radziel.com +users:verify --email test@radziel.com +.cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gie +.cli users +.cli uers +.cli +.cli users:list .cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gierwialo .cli users:verify --email test@radziel.com .cli users:create --email test@radziel.com --username radziel --password QWEqwe123 --first Radek --last Gierwialo @@ -17,14 +27,4 @@ clear .cli .cli events .events: -events: -events:lists -events:list -users -event:let -: -event -. -help -.exity -.cli events:import:worldsdc --dry-run --limit 20 \ No newline at end of file +events: \ No newline at end of file diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index c7f1376..4a198d5 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -102,6 +102,7 @@ model Event { checkinToken EventCheckinToken? userHeats EventUserHeat[] recordingSuggestions RecordingSuggestion[] + matchingRuns MatchingRun[] @@map("events") } @@ -298,3 +299,22 @@ model RecordingSuggestion { @@index([recorderId]) @@map("recording_suggestions") } + +// Matching runs audit log +model MatchingRun { + id Int @id @default(autoincrement()) + eventId Int @map("event_id") + trigger String @db.VarChar(20) // 'manual' | 'scheduler' + status String @default("running") @db.VarChar(20) // 'running' | 'success' | 'error' + startedAt DateTime @default(now()) @map("started_at") + endedAt DateTime? @map("ended_at") + matchedCount Int @default(0) @map("matched_count") + notFoundCount Int @default(0) @map("not_found_count") + error String? @db.Text + + // Relations + event Event @relation(fields: [eventId], references: [id], onDelete: Cascade) + + @@index([eventId, startedAt]) + @@map("matching_runs") +} diff --git a/backend/src/app.js b/backend/src/app.js index de85812..03a57d9 100644 --- a/backend/src/app.js +++ b/backend/src/app.js @@ -117,6 +117,7 @@ app.use('/api/wsdc', require('./routes/wsdc')); app.use('/api/divisions', require('./routes/divisions')); app.use('/api/competition-types', require('./routes/competitionTypes')); app.use('/api/matches', require('./routes/matches')); +app.use('/api/admin', require('./routes/admin')); // app.use('/api/ratings', require('./routes/ratings')); // 404 handler diff --git a/backend/src/routes/admin.js b/backend/src/routes/admin.js new file mode 100644 index 0000000..62f763a --- /dev/null +++ b/backend/src/routes/admin.js @@ -0,0 +1,114 @@ +const express = require('express'); +const { prisma } = require('../utils/db'); +const { authenticate } = require('../middleware/auth'); +const matchingService = require('../services/matching'); +const { SUGGESTION_STATUS } = require('../constants'); + +const router = express.Router(); + +// POST /api/admin/events/:slug/run-now - Trigger matching immediately for an event +router.post('/events/:slug/run-now', authenticate, async (req, res, next) => { + try { + const { slug } = req.params; + + const event = await prisma.event.findUnique({ + where: { slug }, + select: { id: true, slug: true }, + }); + + if (!event) { + return res.status(404).json({ success: false, error: 'Event not found' }); + } + + const startedAt = new Date(); + const runRow = await prisma.matchingRun.create({ + data: { + eventId: event.id, + trigger: 'manual', + status: 'running', + startedAt, + }, + }); + + try { + const suggestions = await matchingService.runMatching(event.id); + await matchingService.saveMatchingResults(event.id, suggestions); + + const notFoundCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.NOT_FOUND).length; + const matchedCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.PENDING).length; + + await prisma.matchingRun.update({ + where: { id: runRow.id }, + data: { + status: 'success', + endedAt: new Date(), + matchedCount, + notFoundCount, + }, + }); + + return res.json({ + success: true, + data: { + eventSlug: event.slug, + startedAt, + endedAt: new Date(), + matched: matchedCount, + notFound: notFoundCount, + }, + }); + } catch (err) { + await prisma.matchingRun.update({ + where: { id: runRow.id }, + data: { + status: 'error', + endedAt: new Date(), + error: String(err?.message || err), + }, + }); + return res.status(500).json({ success: false, error: 'Matching failed', details: String(err?.message || err) }); + } + } catch (error) { + next(error); + } +}); + +// GET /api/admin/events/:slug/matching-runs?limit=20 - List recent runs +router.get('/events/:slug/matching-runs', authenticate, async (req, res, next) => { + try { + const { slug } = req.params; + const limit = Math.min(parseInt(req.query.limit || '20', 10), 100); + + const event = await prisma.event.findUnique({ + where: { slug }, + select: { id: true }, + }); + + if (!event) { + return res.status(404).json({ success: false, error: 'Event not found' }); + } + + const runs = await prisma.matchingRun.findMany({ + where: { eventId: event.id }, + orderBy: { startedAt: 'desc' }, + take: limit, + select: { + id: true, + trigger: true, + status: true, + startedAt: true, + endedAt: true, + matchedCount: true, + notFoundCount: true, + error: true, + }, + }); + + res.json({ success: true, count: runs.length, data: runs }); + } catch (error) { + next(error); + } +}); + +module.exports = router; + diff --git a/backend/src/server.js b/backend/src/server.js index 34f6923..e4cc3f6 100644 --- a/backend/src/server.js +++ b/backend/src/server.js @@ -3,6 +3,7 @@ const http = require('http'); const app = require('./app'); const { testConnection, disconnect } = require('./utils/db'); const { initializeSocket } = require('./socket'); +const scheduler = require('./services/scheduler'); const PORT = process.env.PORT || 3000; @@ -24,6 +25,16 @@ async function startServer() { console.log(`Server running on port: ${PORT}`); console.log(`Health check: http://localhost:${PORT}/api/health`); console.log('================================='); + if (process.env.ENABLE_SCHEDULER === 'true') { + try { + scheduler.start(); + console.log('[*] Scheduler started'); + } catch (e) { + console.error('Failed to start scheduler:', e?.message || e); + } + } else { + console.log('[*] Scheduler disabled (ENABLE_SCHEDULER != "true")'); + } }); return server; @@ -37,12 +48,14 @@ startServer().catch((err) => { // Graceful shutdown process.on('SIGTERM', async () => { console.log('SIGTERM received, shutting down gracefully...'); + try { scheduler.stop(); } catch (_) {} await disconnect(); process.exit(0); }); process.on('SIGINT', async () => { console.log('SIGINT received, shutting down gracefully...'); + try { scheduler.stop(); } catch (_) {} await disconnect(); process.exit(0); }); diff --git a/backend/src/services/scheduler.js b/backend/src/services/scheduler.js new file mode 100644 index 0000000..8df4db7 --- /dev/null +++ b/backend/src/services/scheduler.js @@ -0,0 +1,155 @@ +const { prisma } = require('../utils/db'); +const matchingService = require('./matching'); +const { SUGGESTION_STATUS } = require('../constants'); + +// Simple in-process scheduler based on setInterval (no external deps) +// Designed for single-backend deployments. When scaling to multiple replicas, +// add a DB-based lock (e.g., pg advisory lock) to ensure single run per event. + +const DEFAULT_INTERVAL_SEC = parseInt(process.env.SCHEDULER_INTERVAL_SEC || '300', 10); // 5 min +const MIN_INTERVAL_SEC = parseInt(process.env.MATCHING_MIN_INTERVAL_SEC || '60', 10); // 1 min guard + +let timer = null; +let runningEvents = new Set(); // In-memory guard to avoid overlapping runs per event + +async function listCandidateEvents() { + // Fetch events that are ongoing or upcoming (basic heuristic) + // This avoids hammering historical events. + const now = new Date(); + + return prisma.event.findMany({ + where: { + // Include events that end today or in the future + OR: [ + { endDate: { gte: now } }, + { endDate: null }, + ], + }, + select: { + id: true, + slug: true, + matchingRunAt: true, + startDate: true, + endDate: true, + }, + orderBy: { startDate: 'asc' }, + }); +} + +function shouldRunForEvent(event) { + if (!event) return false; + // Rate limiting per event by last run timestamp + if (!event.matchingRunAt) return true; + + const last = new Date(event.matchingRunAt).getTime(); + const now = Date.now(); + return (now - last) / 1000 >= MIN_INTERVAL_SEC; +} + +async function runForEvent(event) { + if (runningEvents.has(event.id)) { + return; // Skip overlapping runs for the same event + } + + runningEvents.add(event.id); + const startedAt = new Date(); + let runRow = null; + try { + // Create run audit row + runRow = await prisma.matchingRun.create({ + data: { + eventId: event.id, + trigger: 'scheduler', + status: 'running', + startedAt, + }, + }); + + const suggestions = await matchingService.runMatching(event.id); + await matchingService.saveMatchingResults(event.id, suggestions); + + const notFoundCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.NOT_FOUND).length; + const matchedCount = suggestions.filter(s => s.status === SUGGESTION_STATUS.PENDING).length; + + console.log( + `[scheduler] ${event.slug}: matched=${matchedCount} notFound=${notFoundCount} ` + + `took=${((Date.now() - startedAt.getTime()) / 1000).toFixed(2)}s` + ); + + // Update run audit row + if (runRow) { + await prisma.matchingRun.update({ + where: { id: runRow.id }, + data: { + status: 'success', + endedAt: new Date(), + matchedCount, + notFoundCount, + }, + }); + } + } catch (err) { + console.error(`[scheduler] Error running matching for ${event.slug}:`, err?.message || err); + if (runRow) { + try { + await prisma.matchingRun.update({ + where: { id: runRow.id }, + data: { + status: 'error', + endedAt: new Date(), + error: String(err?.message || err), + }, + }); + } catch (_) {} + } + } finally { + runningEvents.delete(event.id); + } +} + +async function tick() { + try { + const events = await listCandidateEvents(); + for (const event of events) { + if (shouldRunForEvent(event)) { + // Fire and forget to allow parallel per-event processing in one process + // but still guarded per event by runningEvents set + // eslint-disable-next-line no-void + void runForEvent(event); + } + } + } catch (err) { + console.error('[scheduler] tick() failed:', err?.message || err); + } +} + +function start() { + if (timer) return; // Already started + + const intervalMs = Math.max(5, DEFAULT_INTERVAL_SEC) * 1000; + console.log(`[*] Scheduler enabled. Interval=${intervalMs / 1000}s, minPerEvent=${MIN_INTERVAL_SEC}s`); + + // Initial delayed run to avoid cold-start spikes + setTimeout(() => { + // eslint-disable-next-line no-void + void tick(); + }, 5_000); + + timer = setInterval(() => { + // eslint-disable-next-line no-void + void tick(); + }, intervalMs); +} + +function stop() { + if (timer) { + clearInterval(timer); + timer = null; + } + runningEvents.clear(); +} + +module.exports = { + start, + stop, +}; diff --git a/docker-compose.yml b/docker-compose.yml index 9ec4009..182d89b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,6 +112,10 @@ services: environment: - NODE_ENV=development - PORT=3000 + # Scheduler (enable for a single backend instance) + - ENABLE_SCHEDULER=true + - SCHEDULER_INTERVAL_SEC=300 + - MATCHING_MIN_INTERVAL_SEC=60 # Security: Relaxed for development - RATE_LIMIT_ENABLED=false - RATE_LIMIT_AUTH_MAX=100 @@ -141,6 +145,10 @@ services: environment: - NODE_ENV=production - PORT=3000 + # Scheduler (enable on exactly one replica) + - ENABLE_SCHEDULER=false + - SCHEDULER_INTERVAL_SEC=300 + - MATCHING_MIN_INTERVAL_SEC=120 # Security: Strict for production (override with .env file) - RATE_LIMIT_ENABLED=true - RATE_LIMIT_AUTH_MAX=5