diff --git a/.env b/.env index 8d65d4e..e9180f4 100644 --- a/.env +++ b/.env @@ -8,7 +8,14 @@ REDIS_URL=redis://redis:6379 MEILISEARCH_URL=http://meilisearch:7700 MEILISEARCH_API_KEY=your_secure_master_key_here -# OpenAI +# AI Provider Configuration +# Option 1: Ollama (Local LLM - DEFAULT, no API costs!) +USE_OLLAMA=true +OLLAMA_URL=http://ollama:11434 +OLLAMA_MODEL=gpt-oss:latest + +# Option 2: OpenAI (Cloud - optional fallback) +# Set these if you want OpenAI as fallback when Ollama is unavailable OPENAI_API_KEY=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 OPENAI_MODEL=gpt-4o-mini diff --git a/.env.example b/.env.example index 87fc8a3..32c9d4d 100644 --- a/.env.example +++ b/.env.example @@ -8,9 +8,16 @@ REDIS_URL=redis://redis:6379 MEILISEARCH_URL=http://meilisearch:7700 MEILISEARCH_API_KEY=your_secure_master_key_here -# OpenAI -OPENAI_API_KEY=sk-your-openai-api-key-here -OPENAI_MODEL=gpt-4o +# AI Provider Configuration +# Option 1: Ollama (Local LLM - DEFAULT, no API costs!) +USE_OLLAMA=true +OLLAMA_URL=http://ollama:11434 +OLLAMA_MODEL=gpt-oss:latest + +# Option 2: OpenAI (Cloud - requires API key, falls back if Ollama unavailable) +# Uncomment and set these if you want to use OpenAI as fallback +# OPENAI_API_KEY=sk-your-openai-api-key-here +# OPENAI_MODEL=gpt-4o # Admin Credentials (change these!) ADMIN_USERNAME=admin diff --git a/AGENTS.md b/AGENTS.md index e64dd1e..d02dd4e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -18,7 +18,7 @@ This file provides essential context and guidelines for AI agents working on thi - **Search**: Meilisearch v1.6 - **Cache**: Redis 7 - **Templating**: EJS -- **AI**: OpenAI API (GPT-4o/GPT-4-turbo) +- **AI**: Ollama (local LLM - gpt-oss:latest) with OpenAI fallback - **Containerization**: Docker + Docker Compose - **Hosting**: Self-hosted on Linode diff --git a/docker-compose.yml b/docker-compose.yml index 0fd49b3..48c4438 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,9 @@ services: - MEILISEARCH_API_KEY=${MEILISEARCH_API_KEY:-master_key} - OPENAI_API_KEY=${OPENAI_API_KEY} - OPENAI_MODEL=${OPENAI_MODEL:-gpt-4o} + - OLLAMA_URL=http://ollama:11434 + - OLLAMA_MODEL=${OLLAMA_MODEL:-gpt-oss:latest} + - USE_OLLAMA=${USE_OLLAMA:-true} - ADMIN_USERNAME=${ADMIN_USERNAME:-admin} - ADMIN_PASSWORD=${ADMIN_PASSWORD:-changeme} - SESSION_SECRET=${SESSION_SECRET:-changeme} @@ -25,6 +28,8 @@ services: condition: service_started meilisearch: condition: service_started + ollama: + condition: service_started volumes: - ./src:/app/src - ./migrations:/app/migrations @@ -33,6 +38,35 @@ services: networks: - privacy-analyzer-network + ollama: + image: ollama/ollama:latest + container_name: privacy-analyzer-ollama + ports: + - "11434:11434" + volumes: + - ollama_data:/root/.ollama + environment: + - OLLAMA_ORIGINS=* + - OLLAMA_HOST=0.0.0.0 + entrypoint: > + sh -c " + ollama serve & + sleep 5 + echo 'Pulling ${OLLAMA_MODEL:-gpt-oss:latest} model...' + ollama pull ${OLLAMA_MODEL:-gpt-oss:latest} + echo 'Model ready!' + wait + " + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + restart: unless-stopped + networks: + - privacy-analyzer-network + postgres: image: postgres:15-alpine container_name: privacy-analyzer-postgres @@ -86,6 +120,8 @@ volumes: driver: local meilisearch_data: driver: local + ollama_data: + driver: local networks: privacy-analyzer-network: diff --git a/src/app.js b/src/app.js index 10d17c1..140dd4e 100644 --- a/src/app.js +++ b/src/app.js @@ -6,6 +6,8 @@ import { PolicyFetcher } from './services/policyFetcher.js'; import { AIAnalyzer } from './services/aiAnalyzer.js'; import { Scheduler } from './services/scheduler.js'; import { SearchIndexer } from './services/searchIndexer.js'; +import { AnalysisQueue } from './services/analysisQueue.js'; +import { AnalysisWorker } from './services/analysisWorker.js'; import ejs from 'ejs'; import { readFile } from 'fs/promises'; import { join, dirname } from 'path'; @@ -258,6 +260,10 @@ async function handleRequest(req) { !s.last_analyzed || new Date(s.last_analyzed) < thirtyDaysAgo ).length; + // Get flash message from query params + const message = url.searchParams.get('message'); + const jobId = url.searchParams.get('jobId'); + const html = await renderTemplate('admin/dashboard', { title: 'Admin Dashboard', description: 'Manage services and privacy analyses', @@ -267,7 +273,9 @@ async function handleRequest(req) { totalServices: services.length, totalAnalyses, pendingUpdates - } + }, + message, + jobId }); return new Response(html, { @@ -478,49 +486,83 @@ async function handleRequest(req) { return new Response('Service not found', { status: 404 }); } - console.log(`Starting analysis for service: ${service.name}`); + console.log(`Queueing analysis for service: ${service.name}`); - // Fetch policy - const policyData = await PolicyFetcher.fetchPolicy(service.policy_url); + // Add to queue instead of processing synchronously + const jobId = await AnalysisQueue.addJob(id, service.name); - // Check if content has changed - const contentHash = PolicyVersion.generateContentHash(policyData.content); - const existingVersion = await PolicyVersion.findByContentHash(id, contentHash); + // Redirect to dashboard with success message + return new Response(null, { + status: 302, + headers: { + Location: `/admin/dashboard?message=Analysis queued for ${encodeURIComponent(service.name)}&jobId=${jobId}` + } + }); + } catch (error) { + console.error('Queue error:', error); + return new Response(`Failed to queue analysis: ${error.message}`, { status: 500 }); + } + } + + // Get analysis job status - GET /api/analysis/status/:jobId + if (method === 'GET' && pathname.match(/^\/api\/analysis\/status\/[^\/]+$/)) { + try { + const match = pathname.match(/^\/api\/analysis\/status\/([^\/]+)$/); + const jobId = match[1]; - let policyVersion; - if (existingVersion) { - console.log('Policy content unchanged, using existing version'); - policyVersion = existingVersion; - } else { - console.log('New policy content detected, creating new version'); - policyVersion = await PolicyVersion.create({ - service_id: id, - content: policyData.content, - fetched_at: policyData.fetchedAt + const jobStatus = await AnalysisQueue.getJobStatus(jobId); + + if (!jobStatus) { + return new Response(JSON.stringify({ error: 'Job not found' }), { + status: 404, + headers: { 'Content-Type': 'application/json' } }); } - // Analyze with AI - const analysisResult = await AIAnalyzer.analyzePolicy(policyData.content); - - // Save analysis - const analysis = await Analysis.create({ - service_id: id, - policy_version_id: policyVersion.id, - overall_score: analysisResult.overall_score, - findings: analysisResult.findings, - raw_analysis: analysisResult.raw_response - }); - - console.log(`Analysis complete for ${service.name}: Grade ${analysis.overall_score}`); - - return new Response(null, { - status: 302, - headers: { Location: '/admin/dashboard' } + return new Response(JSON.stringify(jobStatus), { + headers: { 'Content-Type': 'application/json' } }); } catch (error) { - console.error('Analysis error:', error); - return new Response(`Analysis failed: ${error.message}`, { status: 500 }); + console.error('Status check error:', error); + return new Response(JSON.stringify({ error: error.message }), { + status: 500, + headers: { 'Content-Type': 'application/json' } + }); + } + } + + // Get queue status - GET /api/analysis/queue + if (method === 'GET' && pathname === '/api/analysis/queue') { + const sessionToken = req.cookies?.session_token; + + if (!sessionToken || !(await AdminSession.findByToken(sessionToken))) { + return new Response(JSON.stringify({ error: 'Unauthorized' }), { + status: 401, + headers: { 'Content-Type': 'application/json' } + }); + } + + try { + const [queueLength, pendingJobs, processingJobs] = await Promise.all([ + AnalysisQueue.getQueueLength(), + AnalysisQueue.getPendingJobs(), + AnalysisQueue.getProcessingJobs() + ]); + + return new Response(JSON.stringify({ + queueLength, + pending: pendingJobs, + processing: processingJobs, + workerStatus: AnalysisWorker.getStatus() + }), { + headers: { 'Content-Type': 'application/json' } + }); + } catch (error) { + console.error('Queue status error:', error); + return new Response(JSON.stringify({ error: error.message }), { + status: 500, + headers: { 'Content-Type': 'application/json' } + }); } } @@ -585,7 +627,7 @@ const server = Bun.serve({ console.log(`Server running at http://localhost:${server.port}`); -// Initialize scheduler and search indexer (non-blocking) +// Initialize scheduler, search indexer, and analysis worker (non-blocking) (async () => { try { // Initialize scheduler @@ -594,6 +636,10 @@ console.log(`Server running at http://localhost:${server.port}`); // Initialize search indexer await SearchIndexer.init(); console.log('Search indexer initialized'); + + // Start analysis worker + AnalysisWorker.start(); + console.log('Analysis worker started'); } catch (error) { console.error('Failed to initialize services:', error); } diff --git a/src/scripts/test-ollama.js b/src/scripts/test-ollama.js new file mode 100644 index 0000000..83a31ab --- /dev/null +++ b/src/scripts/test-ollama.js @@ -0,0 +1,66 @@ +import { AIAnalyzer } from '../services/aiAnalyzer.js'; + +async function testOllama() { + console.log('Testing Ollama integration...'); + console.log('Ollama URL:', AIAnalyzer.OLLAMA_URL); + console.log('Ollama Model:', AIAnalyzer.OLLAMA_MODEL); + + // Check if Ollama is available + const isAvailable = await AIAnalyzer.isOllamaAvailable(); + console.log('Ollama available:', isAvailable); + + if (!isAvailable) { + console.log('Ollama is not available. Make sure the ollama container is running.'); + console.log('Run: docker-compose up -d ollama'); + process.exit(1); + } + + const testPolicy = `Privacy Policy + +Information We Collect: +- Name and email address when you create an account +- Usage data including pages visited and features used +- Device information such as browser type and IP address + +How We Use Your Information: +- To provide and improve our services +- To communicate with you about updates and features +- For analytics and service optimization + +Data Sharing: +We share data with Google Analytics for usage analytics. + +Your Rights: +- You can access and update your personal information +- You can request deletion of your account and data +- You can opt out of marketing communications + +Cookies: +We use cookies for session management and analytics. + +Data Retention: +We retain your data for as long as your account is active.`; + + try { + console.log('\nSending test policy to Ollama...'); + const result = await AIAnalyzer.analyzeWithOllama(testPolicy); + + console.log('\nāœ“ Ollama analysis successful!'); + console.log('\nAnalysis Results:'); + console.log('Overall Score:', result.overall_score); + console.log('Summary:', result.summary); + console.log('\nScore Breakdown:'); + console.log(JSON.stringify(result.score_breakdown, null, 2)); + console.log('\nFindings:'); + console.log('Positive:', result.findings.positive.length); + console.log('Negative:', result.findings.negative.length); + console.log('Neutral:', result.findings.neutral.length); + + } catch (error) { + console.error('āœ— Test failed:', error.message); + console.error('Stack:', error.stack); + process.exit(1); + } +} + +testOllama(); diff --git a/src/services/aiAnalyzer.js b/src/services/aiAnalyzer.js index 5f0a336..2cb1361 100644 --- a/src/services/aiAnalyzer.js +++ b/src/services/aiAnalyzer.js @@ -1,5 +1,6 @@ /** - * Service to analyze privacy policies using OpenAI + * Service to analyze privacy policies using Ollama (local LLM) + * Falls back to OpenAI if Ollama is not available */ import openai from '../config/openai.js'; @@ -7,6 +8,9 @@ import openai from '../config/openai.js'; export class AIAnalyzer { static MAX_RETRIES = 3; static RETRY_DELAY_MS = 2000; + static OLLAMA_URL = process.env.OLLAMA_URL || 'http://ollama:11434'; + static OLLAMA_MODEL = process.env.OLLAMA_MODEL || 'gpt-oss:latest'; + static USE_OLLAMA = process.env.USE_OLLAMA === 'true' || true; // Default to Ollama /** * System prompt for the AI @@ -78,7 +82,128 @@ Important: - Mark critical issues as "blocker", significant issues as "bad" - Include at least 2-3 positive aspects if they exist - Include all significant negative aspects -- Be objective and factual based on the policy text provided`; +- Be objective and factual based on the policy text provided +- Respond ONLY with valid JSON, no markdown formatting, no code blocks`; + + /** + * Check if Ollama is available + * @returns {Promise} + */ + static async isOllamaAvailable() { + try { + const response = await fetch(`${this.OLLAMA_URL}/api/tags`, { + method: 'GET', + timeout: 5000 + }); + return response.ok; + } catch (error) { + console.log('Ollama not available:', error.message); + return false; + } + } + + /** + * Analyze using Ollama + * @param {string} policyText - Policy text + * @returns {Promise} + */ + static async analyzeWithOllama(policyText) { + console.log(`Analyzing with Ollama (${this.OLLAMA_MODEL})...`); + + const response = await fetch(`${this.OLLAMA_URL}/api/generate`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + model: this.OLLAMA_MODEL, + prompt: `${this.SYSTEM_PROMPT}\n\nAnalyze this privacy policy and respond with ONLY valid JSON:\n\n${policyText}`, + stream: false, + options: { + temperature: 0.3, + num_predict: 4000 + } + }) + }); + + if (!response.ok) { + throw new Error(`Ollama error: ${response.status} ${response.statusText}`); + } + + const result = await response.json(); + + // Parse the response - Ollama returns the full response in 'response' field + let content = result.response; + + // Try to extract JSON from the response (in case there's extra text) + const jsonMatch = content.match(/\{[\s\S]*\}/); + if (jsonMatch) { + content = jsonMatch[0]; + } + + let analysis; + try { + analysis = JSON.parse(content); + } catch (parseError) { + console.error('Failed to parse Ollama response:', content); + throw new Error('Invalid JSON response from Ollama'); + } + + this.validateAnalysis(analysis); + + return { + ...analysis, + raw_response: result.response, + model: this.OLLAMA_MODEL, + provider: 'ollama' + }; + } + + /** + * Analyze using OpenAI + * @param {string} policyText - Policy text + * @returns {Promise} + */ + static async analyzeWithOpenAI(policyText) { + console.log('Analyzing with OpenAI...'); + + const response = await openai.chat.completions.create({ + model: process.env.OPENAI_MODEL || 'gpt-4o', + messages: [ + { + role: 'system', + content: this.SYSTEM_PROMPT + }, + { + role: 'user', + content: `Analyze this privacy policy and provide a structured assessment:\n\n${policyText}` + } + ], + response_format: { type: 'json_object' }, + temperature: 0.3, + max_tokens: 4000 + }); + + const content = response.choices[0].message.content; + + let analysis; + try { + analysis = JSON.parse(content); + } catch (parseError) { + console.error('Failed to parse OpenAI response:', parseError); + throw new Error('Invalid JSON response from OpenAI'); + } + + this.validateAnalysis(analysis); + + return { + ...analysis, + raw_response: content, + model: response.model, + usage: response.usage, + provider: 'openai' + }; + } /** * Analyze a privacy policy @@ -86,13 +211,12 @@ Important: * @returns {Promise} - Analysis results */ static async analyzePolicy(policyText) { - // Token-efficient truncation - reduce to ~8000 chars (~2000 tokens) for cost savings + // Token-efficient truncation const maxLength = 8000; let truncatedText = policyText; if (policyText.length > maxLength) { console.warn(`Policy text too long (${policyText.length} chars), truncating to ${maxLength} to save tokens`); - // Keep the beginning (usually contains important info) and end (usually has rights/legal) const beginning = policyText.substring(0, Math.floor(maxLength * 0.7)); const end = policyText.substring(policyText.length - Math.floor(maxLength * 0.25)); truncatedText = beginning + '\n\n[... Content truncated to save tokens ...]\n\n' + end; @@ -100,59 +224,36 @@ Important: let lastError; + // Try Ollama first if enabled + if (this.USE_OLLAMA) { + try { + const isAvailable = await this.isOllamaAvailable(); + if (isAvailable) { + console.log('Ollama is available, using local model'); + return await this.analyzeWithOllama(truncatedText); + } + } catch (error) { + console.log('Ollama analysis failed, falling back to OpenAI:', error.message); + } + } + + // Fallback to OpenAI for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) { try { - console.log(`Analyzing policy with OpenAI (attempt ${attempt}/${this.MAX_RETRIES})`); + console.log(`Analyzing with OpenAI (attempt ${attempt}/${this.MAX_RETRIES})`); - const response = await openai.chat.completions.create({ - model: process.env.OPENAI_MODEL || 'gpt-4o', - messages: [ - { - role: 'system', - content: this.SYSTEM_PROMPT - }, - { - role: 'user', - content: `Analyze this privacy policy and provide a structured assessment:\n\n${truncatedText}` - } - ], - response_format: { type: 'json_object' }, - temperature: 0.3, // Lower temperature for more consistent results - max_tokens: 4000 - }); - - const content = response.choices[0].message.content; + const result = await this.analyzeWithOpenAI(truncatedText); - // Parse the JSON response - let analysis; - try { - analysis = JSON.parse(content); - } catch (parseError) { - console.error('Failed to parse AI response as JSON:', parseError); - throw new Error('Invalid JSON response from AI'); - } - - // Validate response structure - this.validateAnalysis(analysis); - - console.log(`Analysis complete. Overall score: ${analysis.overall_score}`); - - return { - ...analysis, - raw_response: content, - model: response.model, - usage: response.usage - }; + console.log(`Analysis complete. Overall score: ${result.overall_score}`); + return result; } catch (error) { lastError = error; console.error(`Analysis attempt ${attempt} failed:`, error.message); - // Check for specific error types if (error.status === 429 || error.message?.includes('429')) { const errorMsg = 'OpenAI API rate limit exceeded. Please check your plan and billing details at https://platform.openai.com/account/billing'; console.error(errorMsg); - // Don't retry on rate limit - it won't help throw new Error(errorMsg); } @@ -182,19 +283,16 @@ Important: const validGrades = ['A', 'B', 'C', 'D', 'E']; const requiredFields = ['overall_score', 'score_breakdown', 'findings', 'summary']; - // Check required fields for (const field of requiredFields) { if (!(field in analysis)) { throw new Error(`Missing required field: ${field}`); } } - // Validate overall_score if (!validGrades.includes(analysis.overall_score)) { throw new Error(`Invalid overall_score: ${analysis.overall_score}`); } - // Validate score_breakdown const breakdownFields = ['data_collection', 'data_sharing', 'user_rights', 'data_retention', 'tracking_security']; for (const field of breakdownFields) { if (!(field in analysis.score_breakdown)) { @@ -205,7 +303,6 @@ Important: } } - // Validate findings if (!Array.isArray(analysis.findings.positive)) { analysis.findings.positive = []; } @@ -216,7 +313,6 @@ Important: analysis.findings.neutral = []; } - // Ensure arrays if (!Array.isArray(analysis.data_types_collected)) { analysis.data_types_collected = []; } @@ -225,24 +321,6 @@ Important: } } - /** - * Calculate overall score from breakdown - * @param {Object} breakdown - Score breakdown - * @returns {string} - Overall grade - */ - static calculateOverallScore(breakdown) { - const gradeValues = { A: 5, B: 4, C: 3, D: 2, E: 1 }; - const grades = Object.values(breakdown); - const total = grades.reduce((sum, grade) => sum + (gradeValues[grade] || 0), 0); - const average = total / grades.length; - - if (average >= 4.5) return 'A'; - if (average >= 3.5) return 'B'; - if (average >= 2.5) return 'C'; - if (average >= 1.5) return 'D'; - return 'E'; - } - /** * Sleep helper * @param {number} ms - Milliseconds to sleep diff --git a/src/services/analysisQueue.js b/src/services/analysisQueue.js new file mode 100644 index 0000000..1ac4be7 --- /dev/null +++ b/src/services/analysisQueue.js @@ -0,0 +1,178 @@ +/** + * Redis-based job queue for background processing + */ + +import redis from '../config/redis.js'; + +const QUEUE_KEY = 'analysis:queue'; +const PROCESSING_KEY = 'analysis:processing'; +const RESULTS_KEY_PREFIX = 'analysis:result:'; +const RESULTS_TTL = 3600; // 1 hour + +export class AnalysisQueue { + /** + * Add analysis job to queue + * @param {number} serviceId - Service ID + * @param {string} serviceName - Service name for display + * @returns {Promise} - Job ID + */ + static async addJob(serviceId, serviceName) { + const jobId = `job-${Date.now()}-${serviceId}`; + const job = { + id: jobId, + serviceId, + serviceName, + status: 'pending', + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString() + }; + + // Add to queue + await redis.lpush(QUEUE_KEY, JSON.stringify(job)); + + console.log(`Added analysis job ${jobId} for service ${serviceName}`); + return jobId; + } + + /** + * Get next job from queue + * @returns {Promise} - Job or null if queue empty + */ + static async getNextJob() { + // Pop from the end of the queue (FIFO) + const jobData = await redis.rpop(QUEUE_KEY); + + if (!jobData) { + return null; + } + + const job = JSON.parse(jobData); + job.status = 'processing'; + job.updatedAt = new Date().toISOString(); + + // Add to processing set + await redis.hset(PROCESSING_KEY, job.id, JSON.stringify(job)); + + return job; + } + + /** + * Mark job as completed + * @param {string} jobId - Job ID + * @param {Object} result - Analysis result + */ + static async completeJob(jobId, result) { + const jobData = await redis.hget(PROCESSING_KEY, jobId); + + if (!jobData) { + console.warn(`Job ${jobId} not found in processing set`); + return; + } + + const job = JSON.parse(jobData); + job.status = 'completed'; + job.result = result; + job.completedAt = new Date().toISOString(); + job.updatedAt = new Date().toISOString(); + + // Remove from processing + await redis.hdel(PROCESSING_KEY, jobId); + + // Store result with TTL + await redis.setex( + `${RESULTS_KEY_PREFIX}${jobId}`, + RESULTS_TTL, + JSON.stringify(job) + ); + + console.log(`Completed analysis job ${jobId}`); + } + + /** + * Mark job as failed + * @param {string} jobId - Job ID + * @param {string} error - Error message + */ + static async failJob(jobId, error) { + const jobData = await redis.hget(PROCESSING_KEY, jobId); + + if (!jobData) { + console.warn(`Job ${jobId} not found in processing set`); + return; + } + + const job = JSON.parse(jobData); + job.status = 'failed'; + job.error = error; + job.failedAt = new Date().toISOString(); + job.updatedAt = new Date().toISOString(); + + // Remove from processing + await redis.hdel(PROCESSING_KEY, jobId); + + // Store failed result with TTL + await redis.setex( + `${RESULTS_KEY_PREFIX}${jobId}`, + RESULTS_TTL, + JSON.stringify(job) + ); + + console.error(`Failed analysis job ${jobId}: ${error}`); + } + + /** + * Get job status + * @param {string} jobId - Job ID + * @returns {Promise} + */ + static async getJobStatus(jobId) { + // Check results first + const result = await redis.get(`${RESULTS_KEY_PREFIX}${jobId}`); + if (result) { + return JSON.parse(result); + } + + // Check processing + const processing = await redis.hget(PROCESSING_KEY, jobId); + if (processing) { + return JSON.parse(processing); + } + + return null; + } + + /** + * Get queue length + * @returns {Promise} + */ + static async getQueueLength() { + return await redis.llen(QUEUE_KEY); + } + + /** + * Get all pending jobs + * @returns {Promise} + */ + static async getPendingJobs() { + const jobs = await redis.lrange(QUEUE_KEY, 0, -1); + return jobs.map(j => JSON.parse(j)); + } + + /** + * Get all processing jobs + * @returns {Promise} + */ + static async getProcessingJobs() { + const jobs = await redis.hgetall(PROCESSING_KEY); + return Object.values(jobs).map(j => JSON.parse(j)); + } + + /** + * Clear all queues (use with caution) + */ + static async clearAll() { + await redis.del(QUEUE_KEY); + await redis.del(PROCESSING_KEY); + console.log('Cleared all analysis queues'); + } +} diff --git a/src/services/analysisWorker.js b/src/services/analysisWorker.js new file mode 100644 index 0000000..3f17f87 --- /dev/null +++ b/src/services/analysisWorker.js @@ -0,0 +1,151 @@ +/** + * Background worker for processing analysis jobs + */ + +import { AnalysisQueue } from './analysisQueue.js'; +import { Service } from '../models/Service.js'; +import { PolicyVersion } from '../models/PolicyVersion.js'; +import { Analysis } from '../models/Analysis.js'; +import { PolicyFetcher } from './policyFetcher.js'; +import { AIAnalyzer } from './aiAnalyzer.js'; + +export class AnalysisWorker { + static isRunning = false; + static currentJob = null; + static pollInterval = 5000; // Check every 5 seconds + + /** + * Start the worker + */ + static start() { + if (this.isRunning) { + console.log('Analysis worker already running'); + return; + } + + this.isRunning = true; + console.log('Starting analysis worker...'); + + this.processLoop(); + } + + /** + * Stop the worker + */ + static stop() { + this.isRunning = false; + console.log('Analysis worker stopped'); + } + + /** + * Main processing loop + */ + static async processLoop() { + while (this.isRunning) { + try { + // Try to get a job + const job = await AnalysisQueue.getNextJob(); + + if (job) { + this.currentJob = job; + console.log(`Processing job ${job.id} for ${job.serviceName}`); + + try { + await this.processJob(job); + } catch (error) { + console.error(`Job ${job.id} failed:`, error); + await AnalysisQueue.failJob(job.id, error.message); + } + + this.currentJob = null; + } else { + // No jobs, wait before checking again + await this.sleep(this.pollInterval); + } + } catch (error) { + console.error('Worker error:', error); + await this.sleep(this.pollInterval); + } + } + } + + /** + * Process a single job + * @param {Object} job - Job data + */ + static async processJob(job) { + const { serviceId, id: jobId } = job; + + // Get service details + const service = await Service.findById(serviceId); + if (!service) { + throw new Error(`Service ${serviceId} not found`); + } + + console.log(`[${jobId}] Fetching policy for ${service.name}...`); + + // Fetch policy + const policyData = await PolicyFetcher.fetchPolicy(service.policy_url); + + // Check if content has changed + const contentHash = PolicyVersion.generateContentHash(policyData.content); + const existingVersion = await PolicyVersion.findByContentHash(serviceId, contentHash); + + let policyVersion; + if (existingVersion) { + console.log(`[${jobId}] Policy unchanged, using existing version`); + policyVersion = existingVersion; + } else { + console.log(`[${jobId}] New policy version detected`); + policyVersion = await PolicyVersion.create({ + service_id: serviceId, + content: policyData.content, + fetched_at: policyData.fetchedAt + }); + } + + console.log(`[${jobId}] Analyzing with AI...`); + + // Analyze with AI + const analysisResult = await AIAnalyzer.analyzePolicy(policyData.content); + + console.log(`[${jobId}] Saving analysis results...`); + + // Save analysis + const analysis = await Analysis.create({ + service_id: serviceId, + policy_version_id: policyVersion.id, + overall_score: analysisResult.overall_score, + findings: analysisResult.findings, + raw_analysis: analysisResult.raw_response + }); + + console.log(`[${jobId}] Analysis complete: Grade ${analysis.overall_score}`); + + // Mark job as complete + await AnalysisQueue.completeJob(jobId, { + analysisId: analysis.id, + overallScore: analysis.overall_score, + model: analysisResult.model || analysisResult.provider, + contentLength: policyData.content.length + }); + } + + /** + * Get worker status + */ + static getStatus() { + return { + isRunning: this.isRunning, + currentJob: this.currentJob, + queueLength: null // Would need async call + }; + } + + /** + * Sleep helper + */ + static sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} diff --git a/src/views/admin/dashboard.ejs b/src/views/admin/dashboard.ejs index 768d2e6..b300b0c 100644 --- a/src/views/admin/dashboard.ejs +++ b/src/views/admin/dashboard.ejs @@ -1,6 +1,20 @@

Admin Dashboard

+ <% if (typeof message !== 'undefined' && message) { %> +
+ <%= message %> +
+ <% } %> + + +

Total Services

@@ -159,4 +173,90 @@ padding: 3rem; color: #666; } + + .alert { + padding: 1rem; + margin-bottom: 1rem; + border-radius: 4px; + } + + .alert-success { + background: #d4edda; + color: #155724; + border: 1px solid #c3e6cb; + } + + .queue-status { + background: #fff; + padding: 1rem; + border-radius: 8px; + margin-bottom: 1rem; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + } + + .queue-info { + display: flex; + align-items: center; + gap: 0.5rem; + margin-bottom: 0.5rem; + } + + .queue-badge { + background: #3498db; + color: white; + padding: 0.25rem 0.75rem; + border-radius: 12px; + font-weight: bold; + } + + .processing-indicator { + color: #e67e22; + font-style: italic; + } + +