commit fbacedc6ddfead4f09e23d6aef7df3c523782b5b Author: Nimer Farahty Date: Sun Apr 27 23:00:52 2025 +0300 first commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3b89858 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +./app +.env +docker-compose.yml \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..00a3413 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +docker-compose.yml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..dd704b8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +# syntax=docker/dockerfile:1.4 + +# Build stage +FROM --platform=$BUILDPLATFORM golang:1.24-alpine AS builder + +RUN apk add --no-cache build-base vips-dev + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN go build -o app + +# Final image +FROM alpine:3.21 + +RUN apk add --no-cache vips + +WORKDIR /app + +COPY --from=builder /app/app . + +CMD ["./app"] diff --git a/config.go b/config.go new file mode 100644 index 0000000..77d6500 --- /dev/null +++ b/config.go @@ -0,0 +1,39 @@ +// config.go +package main + +import ( + "log" + "os" + + "github.com/joho/godotenv" +) + +type Config struct { + RabbitURL string + QueueName string + MinioURL string + MinioAccessKey string + MinioSecretKey string +} + +func LoadConfig() Config { + err := godotenv.Load() + if err != nil { + log.Println("No .env file found, using environment variables") + } + + return Config{ + RabbitURL: getEnv("RABBITMQ_URL", "amqp://localhost"), + QueueName: getEnv("QUEUE_NAME", "storage.file"), + MinioURL: getEnv("MINIO_ENDPOINT", "localhost:9000"), + MinioAccessKey: getEnv("MINIO_ACCESS_KEY", "minioadmin"), + MinioSecretKey: getEnv("MINIO_SECRET_KEY", "minioadmin"), + } +} + +func getEnv(key, fallback string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return fallback +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a00dcb4 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module git.farahty.com/nimer/minio-worker-go + +go 1.24.2 + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/h2non/bimg v1.1.9 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/minio/crc64nvme v1.0.1 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.91 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/rs/xid v1.6.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e3c04e4 --- /dev/null +++ b/go.sum @@ -0,0 +1,35 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/h2non/bimg v1.1.9 h1:WH20Nxko9l/HFm4kZCA3Phbgu2cbHvYzxwxn9YROEGg= +github.com/h2non/bimg v1.1.9/go.mod h1:R3+UiYwkK4rQl6KVFTOFJHitgLbZXBZNFh2cv3AEbp8= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY= +github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc= +github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= diff --git a/main.go b/main.go new file mode 100644 index 0000000..2b1db70 --- /dev/null +++ b/main.go @@ -0,0 +1,109 @@ +// main.go +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/rabbitmq/amqp091-go" +) + +type S3Event struct { + Key string `json:"Key"` + Records []struct { + EventName string `json:"eventName"` + S3 struct { + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` + } `json:"s3"` + } `json:"Records"` +} + +func main() { + cfg := LoadConfig() + + rabbitConn, rabbitChannel, err := ConnectRabbitMQ(cfg) + if err != nil { + log.Fatalf("🚨 Failed to connect RabbitMQ: %v", err) + } + defer rabbitConn.Close() + defer rabbitChannel.Close() + + minioClient, err := ConnectMinIO(cfg) + if err != nil { + log.Fatalf("🚨 Failed to connect MinIO: %v", err) + } + + msgs, err := rabbitChannel.Consume( + cfg.QueueName, + "", + false, // manual ack + false, + false, + false, + nil, + ) + if err != nil { + log.Fatalf("🚨 Failed to consume RabbitMQ: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Graceful shutdown + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c + log.Println("👋 Shutting down...") + cancel() + rabbitConn.Close() + os.Exit(0) + }() + + for msg := range msgs { + go func(msg amqp091.Delivery) { + if ctx.Err() != nil { + return + } + + var event S3Event + err := json.Unmarshal(msg.Body, &event) + if err != nil || len(event.Records) == 0 { + log.Printf("❌ Invalid message: %v", err) + msg.Nack(false, false) + return + } + + record := event.Records[0] + if record.EventName != "s3:ObjectCreated:Put" { + log.Printf("⏭️ Skipping event: %s", record.EventName) + msg.Ack(false) + return + } + + bucket := record.S3.Bucket.Name + key := strings.TrimPrefix(event.Key, bucket+"/") + log.Printf("📥 Processing image: %s/%s", bucket, key) + err = ProcessImage(ctx, minioClient, bucket, key) + if err != nil { + log.Printf("❌ Error processing image: %v", err) + log.Println("🔁 Retrying once") + SleepWithCountdown(5) + msg.Nack(false, true) + return + } + + msg.Ack(false) + }(msg) + } +} diff --git a/minio.go b/minio.go new file mode 100644 index 0000000..03eb5a0 --- /dev/null +++ b/minio.go @@ -0,0 +1,39 @@ +// minio.go +package main + +import ( + "context" + "log" + "strings" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +func ConnectMinIO(cfg Config) (*minio.Client, error) { + minioClient, err := minio.New(cfg.MinioURL, &minio.Options{ + Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), + Secure: true, // change to true if using https + }) + if err != nil { + return nil, err + } + + return minioClient, nil +} + +func UploadToMinIO(ctx context.Context, client *minio.Client, bucket, path, mimeType string, data []byte, meta map[string]string) error { + + key := strings.TrimPrefix(path, "/") + key = strings.TrimPrefix(key, "./") + + log.Printf("Uploading to MinIO: %s", key) + _, err := client.PutObject(ctx, bucket, key, + NewBytesReader(data), + int64(len(data)), + minio.PutObjectOptions{ + ContentType: mimeType, + UserMetadata: meta, + }) + return err +} diff --git a/processor.go b/processor.go new file mode 100644 index 0000000..3f5e094 --- /dev/null +++ b/processor.go @@ -0,0 +1,125 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "log" + "path" + "path/filepath" + "strings" + + "github.com/h2non/bimg" + "github.com/minio/minio-go/v7" +) + +func ProcessImage(ctx context.Context, client *minio.Client, bucket, key string) error { + stat, err := client.StatObject(ctx, bucket, key, minio.StatObjectOptions{}) + if err != nil { + log.Printf("🚨 Failed to stat object: %s, error: %v", key, err) + return err + } + + contentType := stat.ContentType + if !strings.HasPrefix(contentType, "image/") { + log.Printf("⏭️ Skipping non-image file: %s", key) + return nil + } + + if processed := stat.UserMetadata["X-Amz-Meta-Processed"]; processed == "true" { + log.Printf("♻️ Already processed: %s", key) + return nil + } + + if processed := stat.UserMetadata["Processed"]; processed == "true" { + log.Printf("♻️ Already processed: %s", key) + return nil + } + + object, err := client.GetObject(ctx, bucket, key, minio.GetObjectOptions{}) + if err != nil { + log.Printf("🚨 Failed to get object: %s, error: %v", key, err) + return err + } + defer object.Close() + + sourceBuffer := new(bytes.Buffer) + _, err = sourceBuffer.ReadFrom(object) + if err != nil { + log.Printf("🚨 Failed to read object: %s, error: %v", key, err) + return err + } + + img := bimg.NewImage(sourceBuffer.Bytes()) + + fileName := filepath.Base(key) + filePath := path.Dir(key) + + meta := map[string]string{ + "Processed": "true", + } + + // Generate optimized JPEG: just convert without resizing, keep quality high + jpegBuf, err := img.Process(bimg.Options{ + Quality: 90, + Type: bimg.JPEG, + StripMetadata: true, + NoAutoRotate: false, + }) + + if err != nil { + return fmt.Errorf("failed to generate optimized jpeg: %w", err) + } + + // Generate optimized WebP: just convert without resizing, keep quality high + webpBuf, err := img.Process(bimg.Options{ + Quality: 90, + Type: bimg.WEBP, + StripMetadata: true, + NoAutoRotate: false, + }) + if err != nil { + return fmt.Errorf("failed to generate webp: %w", err) + } + + // Generate thumbnail: resize to 400px width + thumbBuf, err := img.Process(bimg.Options{ + Width: 400, + Quality: 85, + Type: bimg.JPEG, + StripMetadata: true, + NoAutoRotate: false, + }) + + if err != nil { + return fmt.Errorf("failed to generate thumbnail: %w", err) + } + + // Upload optimized JPEG + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "optimized", fileName), "image/jpeg", jpegBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload optimized jpeg: %w", err) + } + + // Upload WebP + webpName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + ".webp" + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "webp", webpName), "image/webp", webpBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload webp image: %w", err) + } + + // Upload thumbnail + err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "thumbs", fileName), "image/jpeg", thumbBuf, meta) + if err != nil { + return fmt.Errorf("failed to upload thumbnail: %w", err) + } + + // Reupload original with processed=true metadata (unchanged) + err = UploadToMinIO(ctx, client, bucket, key, contentType, sourceBuffer.Bytes(), meta) + if err != nil { + return fmt.Errorf("failed to reupload original image: %w", err) + } + + log.Printf("✅ Image processed: %s", key) + return nil +} diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..fe92c35 --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,46 @@ +// rabbitmq.go +package main + +import ( + "log" + + "github.com/rabbitmq/amqp091-go" +) + +func ConnectRabbitMQ(cfg Config) (*amqp091.Connection, *amqp091.Channel, error) { + conn, err := amqp091.Dial(cfg.RabbitURL) + if err != nil { + return nil, nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, nil, err + } + + err = ch.Qos(1, 0, false) // Prefetch 1 + if err != nil { + return nil, nil, err + } + + _, err = ch.QueueDeclare( + cfg.QueueName, + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + amqp091.Table{ + "x-dead-letter-exchange": "optimize.images.dlx", + "x-dead-letter-routing-key": "file.uploaded.failed", + "x-delivery-limit": int32(3), + "x-queue-type": "quorum", + }, + ) + if err != nil { + return nil, nil, err + } + + log.Printf("🎧 Listening to queue: %s", cfg.QueueName) + + return conn, ch, nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..7413e45 --- /dev/null +++ b/utils.go @@ -0,0 +1,20 @@ +// utils.go +package main + +import ( + "bytes" + "io" + "log" + "time" +) + +func SleepWithCountdown(seconds int) { + for i := seconds; i > 0; i-- { + log.Printf("⏳ Retrying in %d seconds...", i) + time.Sleep(1 * time.Second) + } +} + +func NewBytesReader(data []byte) io.ReadSeeker { + return bytes.NewReader(data) +}