minio-worker/index.ts
2025-04-26 19:44:51 +03:00

98 lines
2.8 KiB
TypeScript

import amqp from "amqplib";
import { processImage } from "./process-image";
import dotenv from "dotenv";
import { sleepWithCountdown } from "./util";
dotenv.config();
const queueName = process.env.QUEUE_NAME || "storage.file";
const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost";
async function start() {
const conn = await amqp.connect(rabbitUrl);
const channel = await conn.createChannel();
await channel.prefetch(1);
await channel.assertQueue(queueName, {
durable: true,
arguments: {
"x-dead-letter-exchange": "optimize.images.dlx",
"x-dead-letter-routing-key": "file.uploaded.failed",
"x-delivery-limit": 3,
"x-queue-type": "quorum",
},
});
console.log(`🎧 Listening for messages on "${queueName}"...`);
channel.consume(
queueName,
async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
const eventRecord = data.Records?.[0];
const eventName = eventRecord?.eventName;
const bucket = eventRecord?.s3?.bucket?.name;
let key = eventRecord?.s3?.object?.key || "";
if (!eventName || !bucket || !key) {
throw new Error("Missing required event fields.");
}
key = decodeURIComponent(key.replace(/\+/g, " ")); // Properly decode S3 keys
if (eventName !== "s3:ObjectCreated:Put") {
console.log(`⏭️ Skipping non-create event: ${eventName}`);
channel.ack(msg);
return;
}
const processed = await processImage(bucket, key);
if (processed) {
console.log(`✅ Successfully processed: ${key}`);
} else {
console.log(`♻️ Skipped or already processed: ${key}`);
}
channel.ack(msg);
} catch (err) {
console.error(`❌ Failed processing message:`, err);
// 💤 Sleep a bit before retrying (avoiding tight loop retries)
await sleepWithCountdown(30);
// ❗Important: Protect against dead-letter queue overflow
// Retry only once, otherwise move to DLQ
if (msg.fields.redelivered) {
console.warn(`⚠️ Message redelivered already, rejecting:`);
channel.nack(msg, false, false); // Reject and don't requeue
} else {
console.log(`🔁 Message will be retried once.`);
channel.nack(msg, false, true); // Retry once
}
}
},
{ noAck: false }
);
// Graceful shutdown
const shutdown = async () => {
console.log("👋 Gracefully shutting down...");
await channel.close();
await conn.close();
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
start().catch((err) => {
console.error("🚨 Worker failed to start:", err);
process.exit(1);
});