minio-worker-go/main.go
2025-04-27 23:00:52 +03:00

110 lines
2.1 KiB
Go

// main.go
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/rabbitmq/amqp091-go"
)
type S3Event struct {
Key string `json:"Key"`
Records []struct {
EventName string `json:"eventName"`
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
func main() {
cfg := LoadConfig()
rabbitConn, rabbitChannel, err := ConnectRabbitMQ(cfg)
if err != nil {
log.Fatalf("🚨 Failed to connect RabbitMQ: %v", err)
}
defer rabbitConn.Close()
defer rabbitChannel.Close()
minioClient, err := ConnectMinIO(cfg)
if err != nil {
log.Fatalf("🚨 Failed to connect MinIO: %v", err)
}
msgs, err := rabbitChannel.Consume(
cfg.QueueName,
"",
false, // manual ack
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("🚨 Failed to consume RabbitMQ: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Graceful shutdown
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
log.Println("👋 Shutting down...")
cancel()
rabbitConn.Close()
os.Exit(0)
}()
for msg := range msgs {
go func(msg amqp091.Delivery) {
if ctx.Err() != nil {
return
}
var event S3Event
err := json.Unmarshal(msg.Body, &event)
if err != nil || len(event.Records) == 0 {
log.Printf("❌ Invalid message: %v", err)
msg.Nack(false, false)
return
}
record := event.Records[0]
if record.EventName != "s3:ObjectCreated:Put" {
log.Printf("⏭️ Skipping event: %s", record.EventName)
msg.Ack(false)
return
}
bucket := record.S3.Bucket.Name
key := strings.TrimPrefix(event.Key, bucket+"/")
log.Printf("📥 Processing image: %s/%s", bucket, key)
err = ProcessImage(ctx, minioClient, bucket, key)
if err != nil {
log.Printf("❌ Error processing image: %v", err)
log.Println("🔁 Retrying once")
SleepWithCountdown(5)
msg.Nack(false, true)
return
}
msg.Ack(false)
}(msg)
}
}