minio-worker/index.ts

75 lines
2.0 KiB
TypeScript

import amqp from "amqplib";
import { processImage } from "./process-image";
import dotenv from "dotenv";
import { sleepWithCountdown } from "./util";
dotenv.config();
const queueName = process.env.QUEUE_NAME || "storage.file";
const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost";
async function start() {
const conn = await amqp.connect(rabbitUrl);
const channel = await conn.createChannel();
await channel.prefetch(1);
await channel.assertQueue(queueName);
console.log(`🎧 Listening for messages on "${queueName}"...`);
channel.consume(queueName, async (msg) => {
if (!msg) return;
let bucket: string | undefined;
let key: string | undefined;
let eventName: string | undefined;
try {
const data = JSON.parse(msg.content.toString());
eventName = data.Records?.[0]?.eventName;
bucket = data.Records?.[0]?.s3?.bucket?.name;
key = (data.Key as string).replace(bucket ?? "", "");
if (!eventName || !bucket || !key) {
throw new Error("Missing required event fields.");
}
if (eventName !== "s3:ObjectCreated:Put") {
console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`);
channel.ack(msg);
return;
}
const processed = await processImage(bucket, key);
if (processed) {
console.log(`✅ Image processed: ${key}`);
} else {
console.log(`⏭️ Processing skipped: ${key}`);
}
channel.ack(msg);
} catch (err) {
console.error("❌ Error processing message:", err);
// Retry once by requeuing
await sleepWithCountdown(30);
channel.nack(msg, false, true);
}
});
// Handle graceful shutdown
const shutdown = async () => {
console.log("👋 Gracefully shutting down...");
await channel.close();
await conn.close();
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
start().catch((err) => {
console.error("🚨 Worker failed to start:", err);
process.exit(1);
});