fix memory issue

This commit is contained in:
Nimer Farahty 2025-04-26 19:44:51 +03:00
parent e43ab395e6
commit dff34fb103
2 changed files with 100 additions and 67 deletions

View File

@ -12,51 +12,74 @@ async function start() {
const conn = await amqp.connect(rabbitUrl); const conn = await amqp.connect(rabbitUrl);
const channel = await conn.createChannel(); const channel = await conn.createChannel();
await channel.prefetch(1); await channel.prefetch(1);
await channel.assertQueue(queueName); await channel.assertQueue(queueName, {
durable: true,
arguments: {
"x-dead-letter-exchange": "optimize.images.dlx",
"x-dead-letter-routing-key": "file.uploaded.failed",
"x-delivery-limit": 3,
"x-queue-type": "quorum",
},
});
console.log(`🎧 Listening for messages on "${queueName}"...`); console.log(`🎧 Listening for messages on "${queueName}"...`);
channel.consume(queueName, async (msg) => { channel.consume(
queueName,
async (msg) => {
if (!msg) return; if (!msg) return;
let bucket: string | undefined;
let key: string | undefined;
let eventName: string | undefined;
try { try {
const data = JSON.parse(msg.content.toString()); const data = JSON.parse(msg.content.toString());
eventName = data.Records?.[0]?.eventName; const eventRecord = data.Records?.[0];
bucket = data.Records?.[0]?.s3?.bucket?.name; const eventName = eventRecord?.eventName;
key = (data.Key as string).replace(bucket ?? "", ""); const bucket = eventRecord?.s3?.bucket?.name;
let key = eventRecord?.s3?.object?.key || "";
if (!eventName || !bucket || !key) { if (!eventName || !bucket || !key) {
throw new Error("Missing required event fields."); throw new Error("Missing required event fields.");
} }
key = decodeURIComponent(key.replace(/\+/g, " ")); // Properly decode S3 keys
if (eventName !== "s3:ObjectCreated:Put") { if (eventName !== "s3:ObjectCreated:Put") {
console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`); console.log(`⏭️ Skipping non-create event: ${eventName}`);
channel.ack(msg); channel.ack(msg);
return; return;
} }
const processed = await processImage(bucket, key); const processed = await processImage(bucket, key);
if (processed) { if (processed) {
console.log(`Image processed: ${key}`); console.log(`Successfully processed: ${key}`);
} else { } else {
console.log(`⏭️ Processing skipped: ${key}`); console.log(`♻️ Skipped or already processed: ${key}`);
} }
channel.ack(msg); channel.ack(msg);
} catch (err) { } catch (err) {
console.error("❌ Error processing message:", err); console.error(`❌ Failed processing message:`, err);
// Retry once by requeuing
await sleepWithCountdown(30);
channel.nack(msg, false, true);
}
});
// Handle graceful shutdown // 💤 Sleep a bit before retrying (avoiding tight loop retries)
await sleepWithCountdown(30);
// ❗Important: Protect against dead-letter queue overflow
// Retry only once, otherwise move to DLQ
if (msg.fields.redelivered) {
console.warn(`⚠️ Message redelivered already, rejecting:`);
channel.nack(msg, false, false); // Reject and don't requeue
} else {
console.log(`🔁 Message will be retried once.`);
channel.nack(msg, false, true); // Retry once
}
}
},
{ noAck: false }
);
// Graceful shutdown
const shutdown = async () => { const shutdown = async () => {
console.log("👋 Gracefully shutting down..."); console.log("👋 Gracefully shutting down...");
await channel.close(); await channel.close();

View File

@ -8,19 +8,16 @@ export async function processImage(
): Promise<boolean> { ): Promise<boolean> {
const minio = getMinioClient(); const minio = getMinioClient();
// Get metadata // Fetch metadata
const stat = await minio.statObject(bucket, key); const stat = await minio.statObject(bucket, key);
const meta = stat as unknown as { metaData: Record<string, string> }; const meta = stat as unknown as { metaData: Record<string, string> };
const mime = meta.metaData["content-type"] || lookup(key) || ""; const mime = meta.metaData["content-type"] || lookup(key) || "";
// Skip if not an image
if (!mime.startsWith("image/")) { if (!mime.startsWith("image/")) {
console.log(`⏭️ Skipping non-image file: ${key}`); console.log(`⏭️ Skipping non-image file: ${key}`);
return false; return false;
} }
// Skip if already processed
if ( if (
meta.metaData["x-amz-meta-processed"] === "true" || meta.metaData["x-amz-meta-processed"] === "true" ||
meta.metaData["processed"] === "true" meta.metaData["processed"] === "true"
@ -29,52 +26,65 @@ export async function processImage(
return false; return false;
} }
// Read original image const originalStream = await minio.getObject(bucket, key);
const stream = await minio.getObject(bucket, key);
const chunks: Buffer[] = [];
for await (const chunk of stream) chunks.push(chunk);
const buffer = Buffer.concat(chunks);
const fileName = key.split("/").pop(); const fileName = key.split("/").pop()!;
const filePath = key.substring(0, key.lastIndexOf("/")); const filePath = key.substring(0, key.lastIndexOf("/"));
const processedMeta = { const processedMeta = {
"x-amz-meta-processed": "true", "x-amz-meta-processed": "true",
}; };
// Helper function to write to MinIO // Helper to upload from a stream
async function writeImage(path: string, buffer: Buffer, mimeType: string) { async function uploadFromStream(
await minio.putObject(bucket, path, buffer, buffer.length, { targetPath: string,
mimeType: string,
transformStream: NodeJS.ReadableStream
) {
const chunks: Buffer[] = [];
for await (const chunk of transformStream) {
chunks.push(chunk as Buffer);
}
const finalBuffer = Buffer.concat(chunks);
await minio.putObject(bucket, targetPath, finalBuffer, finalBuffer.length, {
"Content-Type": mimeType, "Content-Type": mimeType,
...processedMeta, ...processedMeta,
}); });
} }
try { try {
// 🖼️ Create thumbnail // 🖼️ Thumbnail (resize to 200px width)
const thumb = await sharp(buffer).resize(200).toBuffer(); await uploadFromStream(
await writeImage(`${filePath}/thumbs/${fileName}`, thumb, mime); `${filePath}/thumbs/${fileName}`,
mime,
// 📸 Optimized JPEG originalStream.pipe(sharp().resize(200))
const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer();
await writeImage(
`${filePath}/optimized/${fileName}`,
optimized,
"image/jpeg"
); );
// 🌐 WebP variant // Re-fetch original again for each variant (streams are one-time-use)
const webpName = fileName?.replace(/\.[^/.]+$/, ".webp"); const optimizedStream = await minio.getObject(bucket, key);
const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer();
await writeImage(`${filePath}/webp/${webpName}`, webp, "image/webp");
// (Optional: AVIF format - super modern) // 📸 Optimized JPEG
// const avifName = fileName?.replace(/\.[^/.]+$/, ".avif"); await uploadFromStream(
// const avif = await sharp(buffer).avif({ quality: 50 }).toBuffer(); `${filePath}/optimized/${fileName}`,
// await writeImage(`${filePath}/avif/${avifName}`, avif, "image/avif"); "image/jpeg",
optimizedStream.pipe(sharp().jpeg({ quality: 80 }))
);
// 🔁 Re-upload original with metadata to mark as processed const webpStream = await minio.getObject(bucket, key);
await writeImage(key, buffer, mime);
// 🌐 WebP version
const webpName = fileName.replace(/\.[^/.]+$/, ".webp");
await uploadFromStream(
`${filePath}/webp/${webpName}`,
"image/webp",
webpStream.pipe(sharp().webp({ quality: 80 }))
);
const finalOriginalStream = await minio.getObject(bucket, key);
// 🔁 Re-upload the original with updated metadata to mark it processed
await uploadFromStream(key, mime, finalOriginalStream);
console.log(`✅ Processed image: ${key}`); console.log(`✅ Processed image: ${key}`);
return true; return true;