// main.go package main import ( "context" "encoding/json" "log" "os" "os/signal" "strings" "syscall" "github.com/rabbitmq/amqp091-go" ) type S3Event struct { Key string `json:"Key"` Records []struct { EventName string `json:"eventName"` S3 struct { Bucket struct { Name string `json:"name"` } `json:"bucket"` Object struct { Key string `json:"key"` } `json:"object"` } `json:"s3"` } `json:"Records"` } func main() { cfg := LoadConfig() rabbitConn, rabbitChannel, err := ConnectRabbitMQ(cfg) if err != nil { log.Fatalf("🚨 Failed to connect RabbitMQ: %v", err) } defer rabbitConn.Close() defer rabbitChannel.Close() minioClient, err := ConnectMinIO(cfg) if err != nil { log.Fatalf("🚨 Failed to connect MinIO: %v", err) } msgs, err := rabbitChannel.Consume( cfg.QueueName, "", false, // manual ack false, false, false, nil, ) if err != nil { log.Fatalf("🚨 Failed to consume RabbitMQ: %v", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Graceful shutdown go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) <-c log.Println("👋 Shutting down...") cancel() rabbitConn.Close() os.Exit(0) }() for msg := range msgs { go func(msg amqp091.Delivery) { if ctx.Err() != nil { return } var event S3Event err := json.Unmarshal(msg.Body, &event) if err != nil || len(event.Records) == 0 { log.Printf("❌ Invalid message: %v", err) msg.Nack(false, false) return } record := event.Records[0] if record.EventName != "s3:ObjectCreated:Put" { log.Printf("⏭️ Skipping event: %s", record.EventName) msg.Ack(false) return } bucket := record.S3.Bucket.Name key := strings.TrimPrefix(event.Key, bucket+"/") log.Printf("📥 Processing image: %s/%s", bucket, key) err = ProcessImage(ctx, minioClient, bucket, key) if err != nil { log.Printf("❌ Error processing image: %v", err) log.Println("🔁 Retrying once") SleepWithCountdown(5) msg.Nack(false, true) return } msg.Ack(false) }(msg) } }