code enhancement
This commit is contained in:
parent
cb820c47cd
commit
35f168ffe0
45
index.ts
45
index.ts
@ -1,4 +1,3 @@
|
|||||||
// worker/index.ts
|
|
||||||
import amqp from "amqplib";
|
import amqp from "amqplib";
|
||||||
import { processImage } from "./process-image";
|
import { processImage } from "./process-image";
|
||||||
import dotenv from "dotenv";
|
import dotenv from "dotenv";
|
||||||
@ -6,9 +5,10 @@ import dotenv from "dotenv";
|
|||||||
dotenv.config();
|
dotenv.config();
|
||||||
|
|
||||||
const queueName = process.env.QUEUE_NAME || "storage.file";
|
const queueName = process.env.QUEUE_NAME || "storage.file";
|
||||||
|
const rabbitUrl = process.env.RABBITMQ_URL || "amqp://localhost";
|
||||||
|
|
||||||
async function start() {
|
async function start() {
|
||||||
const conn = await amqp.connect(process.env.RABBITMQ_URL!);
|
const conn = await amqp.connect(rabbitUrl);
|
||||||
const channel = await conn.createChannel();
|
const channel = await conn.createChannel();
|
||||||
await channel.assertQueue(queueName);
|
await channel.assertQueue(queueName);
|
||||||
|
|
||||||
@ -17,26 +17,55 @@ async function start() {
|
|||||||
channel.consume(queueName, async (msg) => {
|
channel.consume(queueName, async (msg) => {
|
||||||
if (!msg) return;
|
if (!msg) return;
|
||||||
|
|
||||||
|
let bucket: string | undefined;
|
||||||
|
let key: string | undefined;
|
||||||
|
let eventName: string | undefined;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const data = JSON.parse(msg.content.toString());
|
const data = JSON.parse(msg.content.toString());
|
||||||
|
|
||||||
const eventName = data.Records[0].eventName;
|
eventName = data.Records?.[0]?.eventName;
|
||||||
const key = decodeURIComponent(data.Records[0].s3.object.key);
|
key = decodeURIComponent(data.Records?.[0]?.s3?.object?.key || "");
|
||||||
const bucket = data.Records[0].s3.bucket.name;
|
bucket = data.Records?.[0]?.s3?.bucket?.name;
|
||||||
|
|
||||||
|
if (!eventName || !bucket || !key) {
|
||||||
|
throw new Error("Missing required event fields.");
|
||||||
|
}
|
||||||
|
|
||||||
if (eventName !== "s3:ObjectCreated:Put") {
|
if (eventName !== "s3:ObjectCreated:Put") {
|
||||||
console.log(`❌ Skipped , Event is not s3:ObjectCreated:Put: ${key}`);
|
console.log(`❌ Skipped. Event is not s3:ObjectCreated:Put: ${key}`);
|
||||||
channel.ack(msg);
|
channel.ack(msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await processImage(bucket, key);
|
const processed = await processImage(bucket, key);
|
||||||
|
if (processed) {
|
||||||
|
console.log(`✅ Image processed: ${key}`);
|
||||||
|
} else {
|
||||||
|
console.log(`⏭️ Processing skipped: ${key}`);
|
||||||
|
}
|
||||||
|
|
||||||
channel.ack(msg);
|
channel.ack(msg);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("❌ Error processing message:", err);
|
console.error("❌ Error processing message:", err);
|
||||||
|
// Retry once by requeuing
|
||||||
|
channel.nack(msg, false, true);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Handle graceful shutdown
|
||||||
|
const shutdown = async () => {
|
||||||
|
console.log("👋 Gracefully shutting down...");
|
||||||
|
await channel.close();
|
||||||
|
await conn.close();
|
||||||
|
process.exit(0);
|
||||||
|
};
|
||||||
|
|
||||||
|
process.on("SIGINT", shutdown);
|
||||||
|
process.on("SIGTERM", shutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
start();
|
start().catch((err) => {
|
||||||
|
console.error("🚨 Worker failed to start:", err);
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
|
|||||||
2
minio.ts
2
minio.ts
@ -7,7 +7,7 @@ dotenv.config();
|
|||||||
export function getMinioClient() {
|
export function getMinioClient() {
|
||||||
return new Client({
|
return new Client({
|
||||||
endPoint: process.env.MINIO_ENDPOINT!,
|
endPoint: process.env.MINIO_ENDPOINT!,
|
||||||
useSSL: true,
|
useSSL: process.env.MINIO_USE_TLS === "yes",
|
||||||
accessKey: process.env.MINIO_ACCESS_KEY!,
|
accessKey: process.env.MINIO_ACCESS_KEY!,
|
||||||
secretKey: process.env.MINIO_SECRET_KEY!,
|
secretKey: process.env.MINIO_SECRET_KEY!,
|
||||||
});
|
});
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
// worker/process-image.ts
|
|
||||||
import sharp from "sharp";
|
import sharp from "sharp";
|
||||||
import { getMinioClient } from "./minio";
|
import { getMinioClient } from "./minio";
|
||||||
import { lookup } from "mime-types";
|
import { lookup } from "mime-types";
|
||||||
@ -9,16 +8,19 @@ export async function processImage(
|
|||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
const minio = getMinioClient();
|
const minio = getMinioClient();
|
||||||
|
|
||||||
|
// Get metadata
|
||||||
const stat = await minio.statObject(bucket, key);
|
const stat = await minio.statObject(bucket, key);
|
||||||
const meta = stat as unknown as { metaData: Record<string, string> };
|
const meta = stat as unknown as { metaData: Record<string, string> };
|
||||||
|
|
||||||
const mime = meta.metaData["content-type"] || lookup(key) || "";
|
const mime = meta.metaData["content-type"] || lookup(key) || "";
|
||||||
|
|
||||||
|
// Skip if not an image
|
||||||
if (!mime.startsWith("image/")) {
|
if (!mime.startsWith("image/")) {
|
||||||
console.log(`⏭️ Skipping non-image file: ${key}`);
|
console.log(`⏭️ Skipping non-image file: ${key}`);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip if already processed
|
||||||
if (
|
if (
|
||||||
meta.metaData["x-amz-meta-processed"] === "true" ||
|
meta.metaData["x-amz-meta-processed"] === "true" ||
|
||||||
meta.metaData["processed"] === "true"
|
meta.metaData["processed"] === "true"
|
||||||
@ -27,6 +29,7 @@ export async function processImage(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read original image
|
||||||
const stream = await minio.getObject(bucket, key);
|
const stream = await minio.getObject(bucket, key);
|
||||||
const chunks: Buffer[] = [];
|
const chunks: Buffer[] = [];
|
||||||
for await (const chunk of stream) chunks.push(chunk);
|
for await (const chunk of stream) chunks.push(chunk);
|
||||||
@ -35,48 +38,48 @@ export async function processImage(
|
|||||||
const fileName = key.split("/").pop();
|
const fileName = key.split("/").pop();
|
||||||
const filePath = key.substring(0, key.lastIndexOf("/"));
|
const filePath = key.substring(0, key.lastIndexOf("/"));
|
||||||
|
|
||||||
const thumb = await sharp(buffer).resize(200).toBuffer();
|
const processedMeta = {
|
||||||
await minio.putObject(
|
|
||||||
bucket,
|
|
||||||
`${filePath}/thumbs/${fileName}`,
|
|
||||||
thumb,
|
|
||||||
thumb.length,
|
|
||||||
{
|
|
||||||
"Content-Type": mime,
|
|
||||||
"x-amz-meta-processed": "true",
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer();
|
|
||||||
await minio.putObject(
|
|
||||||
bucket,
|
|
||||||
`${filePath}/optimized/${fileName}`,
|
|
||||||
optimized,
|
|
||||||
optimized.length,
|
|
||||||
{
|
|
||||||
"Content-Type": mime,
|
|
||||||
"x-amz-meta-processed": "true",
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer();
|
|
||||||
await minio.putObject(
|
|
||||||
bucket,
|
|
||||||
`${filePath}/webp/${fileName?.replace(/\.[^/.]+$/, ".webp")}`,
|
|
||||||
webp,
|
|
||||||
webp.length,
|
|
||||||
{
|
|
||||||
"Content-Type": "image/webp",
|
|
||||||
"x-amz-meta-processed": "true",
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// Re-upload original object with metadata
|
|
||||||
await minio.putObject(bucket, key, buffer, buffer.length, {
|
|
||||||
"Content-Type": mime,
|
|
||||||
"x-amz-meta-processed": "true",
|
"x-amz-meta-processed": "true",
|
||||||
});
|
};
|
||||||
|
|
||||||
console.log(`✅ Processed image: ${key}`);
|
// Helper function to write to MinIO
|
||||||
return true;
|
async function writeImage(path: string, buffer: Buffer, mimeType: string) {
|
||||||
|
await minio.putObject(bucket, path, buffer, buffer.length, {
|
||||||
|
"Content-Type": mimeType,
|
||||||
|
...processedMeta,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 🖼️ Create thumbnail
|
||||||
|
const thumb = await sharp(buffer).resize(200).toBuffer();
|
||||||
|
await writeImage(`${filePath}/thumbs/${fileName}`, thumb, mime);
|
||||||
|
|
||||||
|
// 📸 Optimized JPEG
|
||||||
|
const optimized = await sharp(buffer).jpeg({ quality: 80 }).toBuffer();
|
||||||
|
await writeImage(
|
||||||
|
`${filePath}/optimized/${fileName}`,
|
||||||
|
optimized,
|
||||||
|
"image/jpeg"
|
||||||
|
);
|
||||||
|
|
||||||
|
// 🌐 WebP variant
|
||||||
|
const webpName = fileName?.replace(/\.[^/.]+$/, ".webp");
|
||||||
|
const webp = await sharp(buffer).webp({ quality: 80 }).toBuffer();
|
||||||
|
await writeImage(`${filePath}/webp/${webpName}`, webp, "image/webp");
|
||||||
|
|
||||||
|
// (Optional: AVIF format - super modern)
|
||||||
|
// const avifName = fileName?.replace(/\.[^/.]+$/, ".avif");
|
||||||
|
// const avif = await sharp(buffer).avif({ quality: 50 }).toBuffer();
|
||||||
|
// await writeImage(`${filePath}/avif/${avifName}`, avif, "image/avif");
|
||||||
|
|
||||||
|
// 🔁 Re-upload original with metadata to mark as processed
|
||||||
|
await writeImage(key, buffer, mime);
|
||||||
|
|
||||||
|
console.log(`✅ Processed image: ${key}`);
|
||||||
|
return true;
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`❌ Error processing image (${key}):`, err);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user