import amqp from "amqplib"; import { processImage } from "./process-image"; import dotenv from "dotenv"; dotenv.config(); const queueName = process.env.QUEUE_NAME || "storage.file"; const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost"; function sleepWithCountdown(seconds: number): Promise { return new Promise((resolve) => { let remaining = seconds; const update = () => { const msg = `⏳ Waiting ${remaining}s...`; process.stdout.clearLine(0); // Clear current line process.stdout.cursorTo(0); // Move cursor to start of line process.stdout.write(msg); // Write message }; update(); // Initial message const interval = setInterval(() => { remaining--; if (remaining > 0) { update(); } else { clearInterval(interval); process.stdout.clearLine(0); process.stdout.cursorTo(0); process.stdout.write(`✅ Resuming now\n`); resolve(); } }, 1000); }); } async function start() { const conn = await amqp.connect(rabbitUrl); const channel = await conn.createChannel(); await channel.assertQueue(queueName); console.log(`🎧 Listening for messages on "${queueName}"...`); channel.consume(queueName, async (msg) => { if (!msg) return; let bucket: string | undefined; let key: string | undefined; let eventName: string | undefined; try { const data = JSON.parse(msg.content.toString()); eventName = data.Records?.[0]?.eventName; bucket = data.Records?.[0]?.s3?.bucket?.name; key = (data.Key as string).replace(bucket ?? "", ""); if (!eventName || !bucket || !key) { throw new Error("Missing required event fields."); } if (eventName !== "s3:ObjectCreated:Put") { console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`); channel.ack(msg); return; } const processed = await processImage(bucket, key); if (processed) { console.log(`✅ Image processed: ${key}`); } else { console.log(`⏭️ Processing skipped: ${key}`); } channel.ack(msg); } catch (err) { console.error("❌ Error processing message:", err); // Retry once by requeuing await sleepWithCountdown(30); channel.nack(msg, false, true); } }); // Handle graceful shutdown const shutdown = async () => { console.log("👋 Gracefully shutting down..."); await channel.close(); await conn.close(); process.exit(0); }; process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); } start().catch((err) => { console.error("🚨 Worker failed to start:", err); process.exit(1); });