74 lines
2.0 KiB
TypeScript
74 lines
2.0 KiB
TypeScript
import amqp from "amqplib";
|
|
import { processImage } from "./process-image";
|
|
import dotenv from "dotenv";
|
|
import { sleepWithCountdown } from "./util";
|
|
|
|
dotenv.config();
|
|
|
|
const queueName = process.env.QUEUE_NAME || "storage.file";
|
|
const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost";
|
|
|
|
async function start() {
|
|
const conn = await amqp.connect(rabbitUrl);
|
|
const channel = await conn.createChannel();
|
|
await channel.assertQueue(queueName);
|
|
|
|
console.log(`🎧 Listening for messages on "${queueName}"...`);
|
|
|
|
channel.consume(queueName, async (msg) => {
|
|
if (!msg) return;
|
|
|
|
let bucket: string | undefined;
|
|
let key: string | undefined;
|
|
let eventName: string | undefined;
|
|
|
|
try {
|
|
const data = JSON.parse(msg.content.toString());
|
|
|
|
eventName = data.Records?.[0]?.eventName;
|
|
bucket = data.Records?.[0]?.s3?.bucket?.name;
|
|
key = (data.Key as string).replace(bucket ?? "", "");
|
|
|
|
if (!eventName || !bucket || !key) {
|
|
throw new Error("Missing required event fields.");
|
|
}
|
|
|
|
if (eventName !== "s3:ObjectCreated:Put") {
|
|
console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`);
|
|
channel.ack(msg);
|
|
return;
|
|
}
|
|
|
|
const processed = await processImage(bucket, key);
|
|
if (processed) {
|
|
console.log(`✅ Image processed: ${key}`);
|
|
} else {
|
|
console.log(`⏭️ Processing skipped: ${key}`);
|
|
}
|
|
|
|
channel.ack(msg);
|
|
} catch (err) {
|
|
console.error("❌ Error processing message:", err);
|
|
// Retry once by requeuing
|
|
await sleepWithCountdown(30);
|
|
channel.nack(msg, false, true);
|
|
}
|
|
});
|
|
|
|
// Handle graceful shutdown
|
|
const shutdown = async () => {
|
|
console.log("👋 Gracefully shutting down...");
|
|
await channel.close();
|
|
await conn.close();
|
|
process.exit(0);
|
|
};
|
|
|
|
process.on("SIGINT", shutdown);
|
|
process.on("SIGTERM", shutdown);
|
|
}
|
|
|
|
start().catch((err) => {
|
|
console.error("🚨 Worker failed to start:", err);
|
|
process.exit(1);
|
|
});
|