import amqp from "amqplib"; import { processImage } from "./process-image"; import dotenv from "dotenv"; import { sleepWithCountdown } from "./util"; dotenv.config(); const queueName = process.env.QUEUE_NAME || "storage.file"; const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost"; async function start() { const conn = await amqp.connect(rabbitUrl); const channel = await conn.createChannel(); await channel.prefetch(1); await channel.assertQueue(queueName, { durable: true, arguments: { "x-dead-letter-exchange": "optimize.images.dlx", "x-dead-letter-routing-key": "file.uploaded.failed", "x-delivery-limit": 3, "x-queue-type": "quorum", }, }); console.log(`🎧 Listening for messages on "${queueName}"...`); channel.consume( queueName, async (msg) => { if (!msg) return; try { const data = JSON.parse(msg.content.toString()); const eventRecord = data.Records?.[0]; const eventName = eventRecord?.eventName; const bucket = eventRecord?.s3?.bucket?.name; let key = eventRecord?.s3?.object?.key || ""; if (!eventName || !bucket || !key) { throw new Error("Missing required event fields."); } key = decodeURIComponent(key.replace(/\+/g, " ")); // Properly decode S3 keys if (eventName !== "s3:ObjectCreated:Put") { console.log(`⏭️ Skipping non-create event: ${eventName}`); channel.ack(msg); return; } const processed = await processImage(bucket, key); if (processed) { console.log(`✅ Successfully processed: ${key}`); } else { console.log(`♻️ Skipped or already processed: ${key}`); } channel.ack(msg); } catch (err) { console.error(`❌ Failed processing message:`, err); // 💤 Sleep a bit before retrying (avoiding tight loop retries) await sleepWithCountdown(30); // ❗Important: Protect against dead-letter queue overflow // Retry only once, otherwise move to DLQ if (msg.fields.redelivered) { console.warn(`⚠️ Message redelivered already, rejecting:`); channel.nack(msg, false, false); // Reject and don't requeue } else { console.log(`🔁 Message will be retried once.`); channel.nack(msg, false, true); // Retry once } } }, { noAck: false } ); // Graceful shutdown const shutdown = async () => { console.log("👋 Gracefully shutting down..."); await channel.close(); await conn.close(); process.exit(0); }; process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); } start().catch((err) => { console.error("🚨 Worker failed to start:", err); process.exit(1); });