diff --git a/index.ts b/index.ts index f451a8e..6abfce1 100644 --- a/index.ts +++ b/index.ts @@ -1,4 +1,3 @@ -// worker/index.ts import amqp from "amqplib"; import { processImage } from "./process-image"; import dotenv from "dotenv"; @@ -6,9 +5,10 @@ import dotenv from "dotenv"; dotenv.config(); const queueName = process.env.QUEUE_NAME || "storage.file"; +const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost"; async function start() { - const conn = await amqp.connect(process.env.RABBITMQ_URL!); + const conn = await amqp.connect(rabbitUrl); const channel = await conn.createChannel(); await channel.assertQueue(queueName); @@ -17,26 +17,55 @@ async function start() { channel.consume(queueName, async (msg) => { if (!msg) return; + let bucket: string | undefined; + let key: string | undefined; + let eventName: string | undefined; + try { const data = JSON.parse(msg.content.toString()); - const eventName = data.Records[0].eventName; - const key = decodeURIComponent(data.Records[0].s3.object.key); - const bucket = data.Records[0].s3.bucket.name; + eventName = data.Records?.[0]?.eventName; + key = decodeURIComponent(data.Records?.[0]?.s3?.object?.key || ""); + bucket = data.Records?.[0]?.s3?.bucket?.name; + + if (!eventName || !bucket || !key) { + throw new Error("Missing required event fields."); + } if (eventName !== "s3:ObjectCreated:Put") { - console.log(`❌ Skipped , Event is not s3:ObjectCreated:Put: ${key}`); + console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`); channel.ack(msg); return; } - await processImage(bucket, key); + const processed = await processImage(bucket, key); + if (processed) { + console.log(`✅ Image processed: ${key}`); + } else { + console.log(`⏭️ Processing skipped: ${key}`); + } channel.ack(msg); } catch (err) { console.error("❌ Error processing message:", err); + // Retry once by requeuing + channel.nack(msg, false, true); } }); + + // Handle graceful shutdown + const shutdown = async () => { + console.log("👋 Gracefully shutting down..."); + await channel.close(); + await conn.close(); + process.exit(0); + }; + + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); } -start(); +start().catch((err) => { + console.error("🚨 Worker failed to start:", err); + process.exit(1); +}); diff --git a/minio.ts b/minio.ts index 40169e2..8c7635a 100644 --- a/minio.ts +++ b/minio.ts @@ -7,7 +7,7 @@ dotenv.config(); export function getMinioClient() { return new Client({ endPoint: process.env.MINIO_ENDPOINT!, - useSSL: true, + useSSL: process.env.MINIO_USE_TLS === "yes", accessKey: process.env.MINIO_ACCESS_KEY!, secretKey: process.env.MINIO_SECRET_KEY!, }); diff --git a/process-image.ts b/process-image.ts index fab1b60..6c515e9 100644 --- a/process-image.ts +++ b/process-image.ts @@ -1,4 +1,3 @@ -// worker/process-image.ts import sharp from "sharp"; import { getMinioClient } from "./minio"; import { lookup } from "mime-types"; @@ -9,16 +8,19 @@ export async function processImage( ): Promise { const minio = getMinioClient(); + // Get metadata const stat = await minio.statObject(bucket, key); const meta = stat as unknown as { metaData: Record }; const mime = meta.metaData["content-type"] || lookup(key) || ""; + // Skip if not an image if (!mime.startsWith("image/")) { console.log(`⏭️ Skipping non-image file: ${key}`); return false; } + // Skip if already processed if ( meta.metaData["x-amz-meta-processed"] === "true" || meta.metaData["processed"] === "true" @@ -27,6 +29,7 @@ export async function processImage( return false; } + // Read original image const stream = await minio.getObject(bucket, key); const chunks: Buffer[] = []; for await (const chunk of stream) chunks.push(chunk); @@ -35,48 +38,48 @@ export async function processImage( const fileName = key.split("/").pop(); const filePath = key.substring(0, key.lastIndexOf("/")); - const thumb = await sharp(buffer).resize(200).toBuffer(); - await minio.putObject( - bucket, - `${filePath}/thumbs/${fileName}`, - thumb, - thumb.length, - { - "Content-Type": mime, - "x-amz-meta-processed": "true", - } - ); - - const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer(); - await minio.putObject( - bucket, - `${filePath}/optimized/${fileName}`, - optimized, - optimized.length, - { - "Content-Type": mime, - "x-amz-meta-processed": "true", - } - ); - - const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer(); - await minio.putObject( - bucket, - `${filePath}/webp/${fileName?.replace(/\.[^/.]+$/, ".webp")}`, - webp, - webp.length, - { - "Content-Type": "image/webp", - "x-amz-meta-processed": "true", - } - ); - - // Re-upload original object with metadata - await minio.putObject(bucket, key, buffer, buffer.length, { - "Content-Type": mime, + const processedMeta = { "x-amz-meta-processed": "true", - }); + }; - console.log(`✅ Processed image: ${key}`); - return true; + // Helper function to write to MinIO + async function writeImage(path: string, buffer: Buffer, mimeType: string) { + await minio.putObject(bucket, path, buffer, buffer.length, { + "Content-Type": mimeType, + ...processedMeta, + }); + } + + try { + // 🖼️ Create thumbnail + const thumb = await sharp(buffer).resize(200).toBuffer(); + await writeImage(`${filePath}/thumbs/${fileName}`, thumb, mime); + + // 📸 Optimized JPEG + const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer(); + await writeImage( + `${filePath}/optimized/${fileName}`, + optimized, + "image/jpeg" + ); + + // 🌐 WebP variant + const webpName = fileName?.replace(/\.[^/.]+$/, ".webp"); + const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer(); + await writeImage(`${filePath}/webp/${webpName}`, webp, "image/webp"); + + // (Optional: AVIF format - super modern) + // const avifName = fileName?.replace(/\.[^/.]+$/, ".avif"); + // const avif = await sharp(buffer).avif({ quality: 50 }).toBuffer(); + // await writeImage(`${filePath}/avif/${avifName}`, avif, "image/avif"); + + // 🔁 Re-upload original with metadata to mark as processed + await writeImage(key, buffer, mime); + + console.log(`✅ Processed image: ${key}`); + return true; + } catch (err) { + console.error(`❌ Error processing image (${key}):`, err); + return false; + } }