first commit

This commit is contained in:
Nimer Farahty 2025-04-27 23:00:52 +03:00
commit fbacedc6dd
11 changed files with 467 additions and 0 deletions

3
.dockerignore Normal file
View File

@ -0,0 +1,3 @@
./app
.env
docker-compose.yml

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.env
docker-compose.yml

26
Dockerfile Normal file
View File

@ -0,0 +1,26 @@
# syntax=docker/dockerfile:1.4
# Build stage
FROM --platform=$BUILDPLATFORM golang:1.24-alpine AS builder
RUN apk add --no-cache build-base vips-dev
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o app
# Final image
FROM alpine:3.21
RUN apk add --no-cache vips
WORKDIR /app
COPY --from=builder /app/app .
CMD ["./app"]

39
config.go Normal file
View File

@ -0,0 +1,39 @@
// config.go
package main
import (
"log"
"os"
"github.com/joho/godotenv"
)
type Config struct {
RabbitURL string
QueueName string
MinioURL string
MinioAccessKey string
MinioSecretKey string
}
func LoadConfig() Config {
err := godotenv.Load()
if err != nil {
log.Println("No .env file found, using environment variables")
}
return Config{
RabbitURL: getEnv("RABBITMQ_URL", "amqp://localhost"),
QueueName: getEnv("QUEUE_NAME", "storage.file"),
MinioURL: getEnv("MINIO_ENDPOINT", "localhost:9000"),
MinioAccessKey: getEnv("MINIO_ACCESS_KEY", "minioadmin"),
MinioSecretKey: getEnv("MINIO_SECRET_KEY", "minioadmin"),
}
}
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return fallback
}

23
go.mod Normal file
View File

@ -0,0 +1,23 @@
module git.farahty.com/nimer/minio-worker-go
go 1.24.2
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/h2non/bimg v1.1.9 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/minio/crc64nvme v1.0.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.91 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/rs/xid v1.6.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
)

35
go.sum Normal file
View File

@ -0,0 +1,35 @@
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/h2non/bimg v1.1.9 h1:WH20Nxko9l/HFm4kZCA3Phbgu2cbHvYzxwxn9YROEGg=
github.com/h2non/bimg v1.1.9/go.mod h1:R3+UiYwkK4rQl6KVFTOFJHitgLbZXBZNFh2cv3AEbp8=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY=
github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc=
github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=

109
main.go Normal file
View File

@ -0,0 +1,109 @@
// main.go
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/rabbitmq/amqp091-go"
)
type S3Event struct {
Key string `json:"Key"`
Records []struct {
EventName string `json:"eventName"`
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
func main() {
cfg := LoadConfig()
rabbitConn, rabbitChannel, err := ConnectRabbitMQ(cfg)
if err != nil {
log.Fatalf("🚨 Failed to connect RabbitMQ: %v", err)
}
defer rabbitConn.Close()
defer rabbitChannel.Close()
minioClient, err := ConnectMinIO(cfg)
if err != nil {
log.Fatalf("🚨 Failed to connect MinIO: %v", err)
}
msgs, err := rabbitChannel.Consume(
cfg.QueueName,
"",
false, // manual ack
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("🚨 Failed to consume RabbitMQ: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Graceful shutdown
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
log.Println("👋 Shutting down...")
cancel()
rabbitConn.Close()
os.Exit(0)
}()
for msg := range msgs {
go func(msg amqp091.Delivery) {
if ctx.Err() != nil {
return
}
var event S3Event
err := json.Unmarshal(msg.Body, &event)
if err != nil || len(event.Records) == 0 {
log.Printf("❌ Invalid message: %v", err)
msg.Nack(false, false)
return
}
record := event.Records[0]
if record.EventName != "s3:ObjectCreated:Put" {
log.Printf("⏭️ Skipping event: %s", record.EventName)
msg.Ack(false)
return
}
bucket := record.S3.Bucket.Name
key := strings.TrimPrefix(event.Key, bucket+"/")
log.Printf("📥 Processing image: %s/%s", bucket, key)
err = ProcessImage(ctx, minioClient, bucket, key)
if err != nil {
log.Printf("❌ Error processing image: %v", err)
log.Println("🔁 Retrying once")
SleepWithCountdown(5)
msg.Nack(false, true)
return
}
msg.Ack(false)
}(msg)
}
}

39
minio.go Normal file
View File

@ -0,0 +1,39 @@
// minio.go
package main
import (
"context"
"log"
"strings"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
func ConnectMinIO(cfg Config) (*minio.Client, error) {
minioClient, err := minio.New(cfg.MinioURL, &minio.Options{
Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
Secure: true, // change to true if using https
})
if err != nil {
return nil, err
}
return minioClient, nil
}
func UploadToMinIO(ctx context.Context, client *minio.Client, bucket, path, mimeType string, data []byte, meta map[string]string) error {
key := strings.TrimPrefix(path, "/")
key = strings.TrimPrefix(key, "./")
log.Printf("Uploading to MinIO: %s", key)
_, err := client.PutObject(ctx, bucket, key,
NewBytesReader(data),
int64(len(data)),
minio.PutObjectOptions{
ContentType: mimeType,
UserMetadata: meta,
})
return err
}

125
processor.go Normal file
View File

@ -0,0 +1,125 @@
package main
import (
"bytes"
"context"
"fmt"
"log"
"path"
"path/filepath"
"strings"
"github.com/h2non/bimg"
"github.com/minio/minio-go/v7"
)
func ProcessImage(ctx context.Context, client *minio.Client, bucket, key string) error {
stat, err := client.StatObject(ctx, bucket, key, minio.StatObjectOptions{})
if err != nil {
log.Printf("🚨 Failed to stat object: %s, error: %v", key, err)
return err
}
contentType := stat.ContentType
if !strings.HasPrefix(contentType, "image/") {
log.Printf("⏭️ Skipping non-image file: %s", key)
return nil
}
if processed := stat.UserMetadata["X-Amz-Meta-Processed"]; processed == "true" {
log.Printf("♻️ Already processed: %s", key)
return nil
}
if processed := stat.UserMetadata["Processed"]; processed == "true" {
log.Printf("♻️ Already processed: %s", key)
return nil
}
object, err := client.GetObject(ctx, bucket, key, minio.GetObjectOptions{})
if err != nil {
log.Printf("🚨 Failed to get object: %s, error: %v", key, err)
return err
}
defer object.Close()
sourceBuffer := new(bytes.Buffer)
_, err = sourceBuffer.ReadFrom(object)
if err != nil {
log.Printf("🚨 Failed to read object: %s, error: %v", key, err)
return err
}
img := bimg.NewImage(sourceBuffer.Bytes())
fileName := filepath.Base(key)
filePath := path.Dir(key)
meta := map[string]string{
"Processed": "true",
}
// Generate optimized JPEG: just convert without resizing, keep quality high
jpegBuf, err := img.Process(bimg.Options{
Quality: 90,
Type: bimg.JPEG,
StripMetadata: true,
NoAutoRotate: false,
})
if err != nil {
return fmt.Errorf("failed to generate optimized jpeg: %w", err)
}
// Generate optimized WebP: just convert without resizing, keep quality high
webpBuf, err := img.Process(bimg.Options{
Quality: 90,
Type: bimg.WEBP,
StripMetadata: true,
NoAutoRotate: false,
})
if err != nil {
return fmt.Errorf("failed to generate webp: %w", err)
}
// Generate thumbnail: resize to 400px width
thumbBuf, err := img.Process(bimg.Options{
Width: 400,
Quality: 85,
Type: bimg.JPEG,
StripMetadata: true,
NoAutoRotate: false,
})
if err != nil {
return fmt.Errorf("failed to generate thumbnail: %w", err)
}
// Upload optimized JPEG
err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "optimized", fileName), "image/jpeg", jpegBuf, meta)
if err != nil {
return fmt.Errorf("failed to upload optimized jpeg: %w", err)
}
// Upload WebP
webpName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + ".webp"
err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "webp", webpName), "image/webp", webpBuf, meta)
if err != nil {
return fmt.Errorf("failed to upload webp image: %w", err)
}
// Upload thumbnail
err = UploadToMinIO(ctx, client, bucket, path.Join(filePath, "thumbs", fileName), "image/jpeg", thumbBuf, meta)
if err != nil {
return fmt.Errorf("failed to upload thumbnail: %w", err)
}
// Reupload original with processed=true metadata (unchanged)
err = UploadToMinIO(ctx, client, bucket, key, contentType, sourceBuffer.Bytes(), meta)
if err != nil {
return fmt.Errorf("failed to reupload original image: %w", err)
}
log.Printf("✅ Image processed: %s", key)
return nil
}

46
rabbitmq.go Normal file
View File

@ -0,0 +1,46 @@
// rabbitmq.go
package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func ConnectRabbitMQ(cfg Config) (*amqp091.Connection, *amqp091.Channel, error) {
conn, err := amqp091.Dial(cfg.RabbitURL)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
err = ch.Qos(1, 0, false) // Prefetch 1
if err != nil {
return nil, nil, err
}
_, err = ch.QueueDeclare(
cfg.QueueName,
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp091.Table{
"x-dead-letter-exchange": "optimize.images.dlx",
"x-dead-letter-routing-key": "file.uploaded.failed",
"x-delivery-limit": int32(3),
"x-queue-type": "quorum",
},
)
if err != nil {
return nil, nil, err
}
log.Printf("🎧 Listening to queue: %s", cfg.QueueName)
return conn, ch, nil
}

20
utils.go Normal file
View File

@ -0,0 +1,20 @@
// utils.go
package main
import (
"bytes"
"io"
"log"
"time"
)
func SleepWithCountdown(seconds int) {
for i := seconds; i > 0; i-- {
log.Printf("⏳ Retrying in %d seconds...", i)
time.Sleep(1 * time.Second)
}
}
func NewBytesReader(data []byte) io.ReadSeeker {
return bytes.NewReader(data)
}