From fbacedc6ddfead4f09e23d6aef7df3c523782b5b Mon Sep 17 00:00:00 2001 From: Nimer Farahty Date: Sun, 27 Apr 2025 23:00:52 +0300 Subject: [PATCH] first commit --- .dockerignore | 3 ++ .gitignore | 2 + Dockerfile | 26 +++++++++++ config.go | 39 ++++++++++++++++ go.mod | 23 ++++++++++ go.sum | 35 ++++++++++++++ main.go | 109 +++++++++++++++++++++++++++++++++++++++++++ minio.go | 39 ++++++++++++++++ processor.go | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++ rabbitmq.go | 46 +++++++++++++++++++ utils.go | 20 ++++++++ 11 files changed, 467 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 minio.go create mode 100644 processor.go create mode 100644 rabbitmq.go create mode 100644 utils.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3b89858 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +./app +.env +docker-compose.yml \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..00a3413 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +docker-compose.yml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..dd704b8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +# syntax=docker/dockerfile:1.4 + +# Build stage +FROM --platform=$BUILDPLATFORM golang:1.24-alpine AS builder + +RUN apk add --no-cache build-base vips-dev + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN go build -o app + +# Final image +FROM alpine:3.21 + +RUN apk add --no-cache vips + +WORKDIR /app + +COPY --from=builder /app/app . + +CMD ["./app"] diff --git a/config.go b/config.go new file mode 100644 index 0000000..77d6500 --- /dev/null +++ b/config.go @@ -0,0 +1,39 @@ +// config.go +package main + +import ( + "log" + "os" + + "github.com/joho/godotenv" +) + +type Config struct { + RabbitURL string + QueueName string + MinioURL string + MinioAccessKey string + MinioSecretKey string +} + +func LoadConfig() Config { + err := godotenv.Load() + if err != nil { + log.Println("No .env file found, using environment variables") + } + + return Config{ + RabbitURL: getEnv("RABBITMQ_URL", "amqp://localhost"), + QueueName: getEnv("QUEUE_NAME", "storage.file"), + MinioURL: getEnv("MINIO_ENDPOINT", "localhost:9000"), + MinioAccessKey: getEnv("MINIO_ACCESS_KEY", "minioadmin"), + MinioSecretKey: getEnv("MINIO_SECRET_KEY", "minioadmin"), + } +} + +func getEnv(key, fallback string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return fallback +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a00dcb4 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module git.farahty.com/nimer/minio-worker-go + +go 1.24.2 + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/h2non/bimg v1.1.9 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/minio/crc64nvme v1.0.1 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.91 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/rs/xid v1.6.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e3c04e4 --- /dev/null +++ b/go.sum @@ -0,0 +1,35 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/h2non/bimg v1.1.9 h1:WH20Nxko9l/HFm4kZCA3Phbgu2cbHvYzxwxn9YROEGg= +github.com/h2non/bimg v1.1.9/go.mod h1:R3+UiYwkK4rQl6KVFTOFJHitgLbZXBZNFh2cv3AEbp8= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY= +github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc= +github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= diff --git a/main.go b/main.go new file mode 100644 index 0000000..2b1db70 --- /dev/null +++ b/main.go @@ -0,0 +1,109 @@ +// main.go +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/rabbitmq/amqp091-go" +) + +type S3Event struct { + Key string `json:"Key"` + Records []struct { + EventName string `json:"eventName"` + S3 struct { + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` + } `json:"s3"` + } `json:"Records"` +} + +func main() { + cfg := LoadConfig() + + rabbitConn, rabbitChannel, err := ConnectRabbitMQ(cfg) + if err != nil { + log.Fatalf("🚨 Failed to connect RabbitMQ: %v", err) + } + defer rabbitConn.Close() + defer rabbitChannel.Close() + + minioClient, err := ConnectMinIO(cfg) + if err != nil { + log.Fatalf("🚨 Failed to connect MinIO: %v", err) + } + + msgs, err := rabbitChannel.Consume( + cfg.QueueName, + "", + false, // manual ack + false, + false, + false, + nil, + ) + if err != nil { + log.Fatalf("🚨 Failed to consume RabbitMQ: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Graceful shutdown + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c + log.Println("👋 Shutting down...") + cancel() + rabbitConn.Close() + os.Exit(0) + }() + + for msg := range msgs { + go func(msg amqp091.Delivery) { + if ctx.Err() != nil { + return + } + + var event S3Event + err := json.Unmarshal(msg.Body, &event) + if err != nil || len(event.Records) == 0 { + log.Printf("❌ Invalid message: %v", err) + msg.Nack(false, false) + return + } + + record := event.Records[0] + if record.EventName != "s3:ObjectCreated:Put" { + log.Printf("⏭️ Skipping event: %s", record.EventName) + msg.Ack(false) + return + } + + bucket := record.S3.Bucket.Name + key := strings.TrimPrefix(event.Key, bucket+"/") + log.Printf("📥 Processing image: %s/%s", bucket, key) + err = ProcessImage(ctx, minioClient, bucket, key) + if err != nil { + log.Printf("❌ Error processing image: %v", err) + log.Println("🔁 Retrying once") + SleepWithCountdown(5) + msg.Nack(false, true) + return + } + + msg.Ack(false) + }(msg) + } +} diff --git a/minio.go b/minio.go new file mode 100644 index 0000000..03eb5a0 --- /dev/null +++ b/minio.go @@ -0,0 +1,39 @@ +// minio.go +package main + +import ( + "context" + "log" + "strings" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +func ConnectMinIO(cfg Config) (*minio.Client, error) { + minioClient, err := minio.New(cfg.MinioURL, &minio.Options{ + Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), + Secure: true, // change to true if using https + }) + if err != nil { + return nil, err + } + + return minioClient, nil +} + +func UploadToMinIO(ctx context.Context, client *minio.Client, bucket, path, mimeType string, data []byte, meta map[string]string) error { + + key := strings.TrimPrefix(path, "/") + key = strings.TrimPrefix(key, "./") + + log.Printf("Uploading to MinIO: %s", key) + _, err := client.PutObject(ctx, bucket, key, + NewBytesReader(data), + int64(len(data)), + minio.PutObjectOptions{ + ContentType: mimeType, + UserMetadata: meta, + }) + return err +} diff --git a/processor.go b/processor.go new file mode 100644 index 0000000..3f5e094 --- /dev/null +++ b/processor.go @@ -0,0 +1,125 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "log" + "path" + "path/filepath" + "strings" + + "github.com/h2non/bimg" + "github.com/minio/minio-go/v7" +) + +func ProcessImage(ctx context.Context, client *minio.Client, bucket, key string) error { + stat, err := client.StatObject(ctx, bucket, key, minio.StatObjectOptions{}) + if err != nil { + log.Printf("🚨 Failed to stat object: %s, error: %v", key, err) + return err + } + + contentType := stat.ContentType + if !strings.HasPrefix(contentType, "image/") { + log.Printf("⏭️ Skipping non-image file: %s", key) + return nil + } + + if processed := stat.UserMetadata["X-Amz-Meta-Processed"]; processed == "true" { + log.Printf("♻️ Already processed: %s", key) + return nil + } + + if processed := stat.UserMetadata["Processed"]; processed == "true" { + log.Printf("♻️ Already processed: %s", key) + return nil + } + + object, err := client.GetObject(ctx, bucket, key, minio.GetObjectOptions{}) + if err != nil { + log.Printf("🚨 Failed to get object: %s, error: %v", key, err) + return err + } + defer object.Close() + + sourceBuffer := new(bytes.Buffer) + _, err = sourceBuffer.ReadFrom(object) + if err != nil { + log.Printf("🚨 Failed to read object: %s, error: %v", key, err) + return err + } + + img := bimg.NewImage(sourceBuffer.Bytes()) + + fileName := filepath.Base(key) + filePath := path.Dir(key) + + meta := map[string]string{ + "Processed": "true", + } + + // Generate optimized JPEG: just convert without resizing, keep quality high + jpegBuf, err := img.Process(bimg.Options{ + Quality: 90, + Type: bimg.JPEG, + StripMetadata: true, + NoAutoRotate: false, + }) + + if err != nil { + return fmt.Errorf("failed to generate optimized jpeg: %w", err) + } + + // Generate optimized WebP: just convert without resizing, keep quality high + webpBuf, err := img.Process(bimg.Options{ + Quality: 90, + Type: bimg.WEBP, + StripMetadata: true, + NoAutoRotate: false, + }) + if err != nil { + return fmt.Errorf("failed to generate webp: %w", err) + } + + // Generate thumbnail: resize to 400px width + thumbBuf, err := img.Process(bimg.Options{ + Width: 400, + Quality: 85, + Type: bimg.JPEG, + StripMetadata: true, + NoAutoRotate: false, + }) + + if err != nil { + return fmt.Errorf("failed to generate thumbnail: %w", err) + } + + // Upload optimized JPEG + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "optimized", fileName), "image/jpeg", jpegBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload optimized jpeg: %w", err) + } + + // Upload WebP + webpName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + ".webp" + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "webp", webpName), "image/webp", webpBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload webp image: %w", err) + } + + // Upload thumbnail + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "thumbs", fileName), "image/jpeg", thumbBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload thumbnail: %w", err) + } + + // Reupload original with processed=true metadata (unchanged) + err = UploadToMinIO(ctx, client, bucket, key, contentType, sourceBuffer.Bytes(), meta) + if err != nil { + return fmt.Errorf("failed to reupload original image: %w", err) + } + + log.Printf("✅ Image processed: %s", key) + return nil +} diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..fe92c35 --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,46 @@ +// rabbitmq.go +package main + +import ( + "log" + + "github.com/rabbitmq/amqp091-go" +) + +func ConnectRabbitMQ(cfg Config) (*amqp091.Connection, *amqp091.Channel, error) { + conn, err := amqp091.Dial(cfg.RabbitURL) + if err != nil { + return nil, nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, nil, err + } + + err = ch.Qos(1, 0, false) // Prefetch 1 + if err != nil { + return nil, nil, err + } + + _, err = ch.QueueDeclare( + cfg.QueueName, + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp091.Table{ + "x-dead-letter-exchange": "optimize.images.dlx", + "x-dead-letter-routing-key": "file.uploaded.failed", + "x-delivery-limit": int32(3), + "x-queue-type": "quorum", + }, + ) + if err != nil { + return nil, nil, err + } + + log.Printf("🎧 Listening to queue: %s", cfg.QueueName) + + return conn, ch, nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..7413e45 --- /dev/null +++ b/utils.go @@ -0,0 +1,20 @@ +// utils.go +package main + +import ( + "bytes" + "io" + "log" + "time" +) + +func SleepWithCountdown(seconds int) { + for i := seconds; i > 0; i-- { + log.Printf("⏳ Retrying in %d seconds...", i) + time.Sleep(1 * time.Second) + } +} + +func NewBytesReader(data []byte) io.ReadSeeker { + return bytes.NewReader(data) +}