package app import ( "context" "time" "gcy_hpc_server/internal/storage" "gcy_hpc_server/internal/store" "go.uber.org/zap" ) // startCleanupWorker runs a background goroutine that periodically cleans up: // 1. Expired upload sessions (mark → delete MinIO chunks → delete DB records) // 2. Leaked multipart uploads from failed ComposeObject calls func startCleanupWorker(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger) cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger) for { select { case <-ctx.Done(): logger.Info("cleanup worker stopped") return case <-ticker.C: cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger) cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger) } } } // cleanupExpiredSessions performs three-phase cleanup of expired upload sessions: // Phase 1: Find and mark expired sessions // Phase 2: Delete MinIO temp chunks for each session // Phase 3: Delete DB records (session + chunks) func cleanupExpiredSessions(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) { sessions, err := uploadStore.ListExpiredSessions(ctx) if err != nil { logger.Error("failed to list expired sessions", zap.Error(err)) return } if len(sessions) == 0 { return } logger.Info("cleaning up expired sessions", zap.Int("count", len(sessions))) for i := range sessions { session := &sessions[i] if err := uploadStore.UpdateSessionStatus(ctx, session.ID, "expired"); err != nil { logger.Error("failed to mark session expired", zap.Int64("session_id", session.ID), zap.Error(err)) continue } objects, err := objStorage.ListObjects(ctx, bucket, session.MinioPrefix, true) if err != nil { logger.Error("failed to list session objects", zap.Int64("session_id", session.ID), zap.Error(err)) } else if len(objects) > 0 { keys := make([]string, len(objects)) for j, obj := range objects { keys[j] = obj.Key } if err := objStorage.RemoveObjects(ctx, bucket, keys, storage.RemoveObjectsOptions{}); err != nil { logger.Error("failed to remove session objects", zap.Int64("session_id", session.ID), zap.Error(err)) } } if err := uploadStore.DeleteSession(ctx, session.ID); err != nil { logger.Error("failed to delete session", zap.Int64("session_id", session.ID), zap.Error(err)) } } } func cleanupLeakedMultipartUploads(ctx context.Context, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) { if err := objStorage.RemoveIncompleteUpload(ctx, bucket, "uploads/"); err != nil { logger.Error("failed to cleanup leaked multipart uploads", zap.Error(err)) } }