From dff34fb1036f4eab421969f427efc9cfe3c61705 Mon Sep 17 00:00:00 2001 From: Nimer Farahty Date: Sat, 26 Apr 2025 19:44:51 +0300 Subject: [PATCH] fix memory issue --- index.ts | 91 ++++++++++++++++++++++++++++++------------------ process-image.ts | 76 ++++++++++++++++++++++------------------ 2 files changed, 100 insertions(+), 67 deletions(-) diff --git a/index.ts b/index.ts index e68ce2f..dbf545a 100644 --- a/index.ts +++ b/index.ts @@ -12,51 +12,74 @@ async function start() { const conn = await amqp.connect(rabbitUrl); const channel = await conn.createChannel(); await channel.prefetch(1); - await channel.assertQueue(queueName); + await channel.assertQueue(queueName, { + durable: true, + arguments: { + "x-dead-letter-exchange": "optimize.images.dlx", + "x-dead-letter-routing-key": "file.uploaded.failed", + "x-delivery-limit": 3, + "x-queue-type": "quorum", + }, + }); console.log(`🎧 Listening for messages on "${queueName}"...`); - channel.consume(queueName, async (msg) => { - if (!msg) return; + channel.consume( + queueName, + async (msg) => { + if (!msg) return; - let bucket: string | undefined; - let key: string | undefined; - let eventName: string | undefined; + try { + const data = JSON.parse(msg.content.toString()); - try { - const data = JSON.parse(msg.content.toString()); + const eventRecord = data.Records?.[0]; + const eventName = eventRecord?.eventName; + const bucket = eventRecord?.s3?.bucket?.name; + let key = eventRecord?.s3?.object?.key || ""; - eventName = data.Records?.[0]?.eventName; - bucket = data.Records?.[0]?.s3?.bucket?.name; - key = (data.Key as string).replace(bucket ?? "", ""); + if (!eventName || !bucket || !key) { + throw new Error("Missing required event fields."); + } - if (!eventName || !bucket || !key) { - throw new Error("Missing required event fields."); - } + key = decodeURIComponent(key.replace(/\+/g, " ")); // Properly decode S3 keys + + if (eventName !== "s3:ObjectCreated:Put") { + console.log(`⏭️ Skipping non-create event: ${eventName}`); + channel.ack(msg); + return; + } + + const processed = await processImage(bucket, key); + + if (processed) { + console.log(`βœ… Successfully processed: ${key}`); + } else { + console.log(`♻️ Skipped or already processed: ${key}`); + } - if (eventName !== "s3:ObjectCreated:Put") { - console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`); channel.ack(msg); - return; + } catch (err) { + console.error(`❌ Failed processing message:`, err); + + // πŸ’€ Sleep a bit before retrying (avoiding tight loop retries) + await sleepWithCountdown(30); + + // ❗Important: Protect against dead-letter queue overflow + // Retry only once, otherwise move to DLQ + + if (msg.fields.redelivered) { + console.warn(`⚠️ Message redelivered already, rejecting:`); + channel.nack(msg, false, false); // Reject and don't requeue + } else { + console.log(`πŸ” Message will be retried once.`); + channel.nack(msg, false, true); // Retry once + } } + }, + { noAck: false } + ); - const processed = await processImage(bucket, key); - if (processed) { - console.log(`βœ… Image processed: ${key}`); - } else { - console.log(`⏭️ Processing skipped: ${key}`); - } - - channel.ack(msg); - } catch (err) { - console.error("❌ Error processing message:", err); - // Retry once by requeuing - await sleepWithCountdown(30); - channel.nack(msg, false, true); - } - }); - - // Handle graceful shutdown + // Graceful shutdown const shutdown = async () => { console.log("πŸ‘‹ Gracefully shutting down..."); await channel.close(); diff --git a/process-image.ts b/process-image.ts index 6c515e9..8d3b505 100644 --- a/process-image.ts +++ b/process-image.ts @@ -8,19 +8,16 @@ export async function processImage( ): Promise { const minio = getMinioClient(); - // Get metadata + // Fetch metadata const stat = await minio.statObject(bucket, key); const meta = stat as unknown as { metaData: Record }; - const mime = meta.metaData["content-type"] || lookup(key) || ""; - // Skip if not an image if (!mime.startsWith("image/")) { console.log(`⏭️ Skipping non-image file: ${key}`); return false; } - // Skip if already processed if ( meta.metaData["x-amz-meta-processed"] === "true" || meta.metaData["processed"] === "true" @@ -29,52 +26,65 @@ export async function processImage( return false; } - // Read original image - const stream = await minio.getObject(bucket, key); - const chunks: Buffer[] = []; - for await (const chunk of stream) chunks.push(chunk); - const buffer = Buffer.concat(chunks); + const originalStream = await minio.getObject(bucket, key); - const fileName = key.split("/").pop(); + const fileName = key.split("/").pop()!; const filePath = key.substring(0, key.lastIndexOf("/")); const processedMeta = { "x-amz-meta-processed": "true", }; - // Helper function to write to MinIO - async function writeImage(path: string, buffer: Buffer, mimeType: string) { - await minio.putObject(bucket, path, buffer, buffer.length, { + // Helper to upload from a stream + async function uploadFromStream( + targetPath: string, + mimeType: string, + transformStream: NodeJS.ReadableStream + ) { + const chunks: Buffer[] = []; + for await (const chunk of transformStream) { + chunks.push(chunk as Buffer); + } + const finalBuffer = Buffer.concat(chunks); + + await minio.putObject(bucket, targetPath, finalBuffer, finalBuffer.length, { "Content-Type": mimeType, ...processedMeta, }); } try { - // πŸ–ΌοΈ Create thumbnail - const thumb = await sharp(buffer).resize(200).toBuffer(); - await writeImage(`${filePath}/thumbs/${fileName}`, thumb, mime); - - // πŸ“Έ Optimized JPEG - const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer(); - await writeImage( - `${filePath}/optimized/${fileName}`, - optimized, - "image/jpeg" + // πŸ–ΌοΈ Thumbnail (resize to 200px width) + await uploadFromStream( + `${filePath}/thumbs/${fileName}`, + mime, + originalStream.pipe(sharp().resize(200)) ); - // 🌐 WebP variant - const webpName = fileName?.replace(/\.[^/.]+$/, ".webp"); - const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer(); - await writeImage(`${filePath}/webp/${webpName}`, webp, "image/webp"); + // Re-fetch original again for each variant (streams are one-time-use) + const optimizedStream = await minio.getObject(bucket, key); - // (Optional: AVIF format - super modern) - // const avifName = fileName?.replace(/\.[^/.]+$/, ".avif"); - // const avif = await sharp(buffer).avif({ quality: 50 }).toBuffer(); - // await writeImage(`${filePath}/avif/${avifName}`, avif, "image/avif"); + // πŸ“Έ Optimized JPEG + await uploadFromStream( + `${filePath}/optimized/${fileName}`, + "image/jpeg", + optimizedStream.pipe(sharp().jpeg({ quality: 80 })) + ); - // πŸ” Re-upload original with metadata to mark as processed - await writeImage(key, buffer, mime); + const webpStream = await minio.getObject(bucket, key); + + // 🌐 WebP version + const webpName = fileName.replace(/\.[^/.]+$/, ".webp"); + await uploadFromStream( + `${filePath}/webp/${webpName}`, + "image/webp", + webpStream.pipe(sharp().webp({ quality: 80 })) + ); + + const finalOriginalStream = await minio.getObject(bucket, key); + + // πŸ” Re-upload the original with updated metadata to mark it processed + await uploadFromStream(key, mime, finalOriginalStream); console.log(`βœ… Processed image: ${key}`); return true;