Add DI wiring with graceful MinIO fallback, background cleanup worker for expired sessions and leaked multipart uploads, and end-to-end integration tests. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
84 lines
2.8 KiB
Go
84 lines
2.8 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"gcy_hpc_server/internal/storage"
|
|
"gcy_hpc_server/internal/store"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// startCleanupWorker runs a background goroutine that periodically cleans up:
|
|
// 1. Expired upload sessions (mark → delete MinIO chunks → delete DB records)
|
|
// 2. Leaked multipart uploads from failed ComposeObject calls
|
|
func startCleanupWorker(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
|
|
cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger)
|
|
cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("cleanup worker stopped")
|
|
return
|
|
case <-ticker.C:
|
|
cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger)
|
|
cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupExpiredSessions performs three-phase cleanup of expired upload sessions:
|
|
// Phase 1: Find and mark expired sessions
|
|
// Phase 2: Delete MinIO temp chunks for each session
|
|
// Phase 3: Delete DB records (session + chunks)
|
|
func cleanupExpiredSessions(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
|
sessions, err := uploadStore.ListExpiredSessions(ctx)
|
|
if err != nil {
|
|
logger.Error("failed to list expired sessions", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if len(sessions) == 0 {
|
|
return
|
|
}
|
|
|
|
logger.Info("cleaning up expired sessions", zap.Int("count", len(sessions)))
|
|
|
|
for i := range sessions {
|
|
session := &sessions[i]
|
|
|
|
if err := uploadStore.UpdateSessionStatus(ctx, session.ID, "expired"); err != nil {
|
|
logger.Error("failed to mark session expired", zap.Int64("session_id", session.ID), zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
objects, err := objStorage.ListObjects(ctx, bucket, session.MinioPrefix, true)
|
|
if err != nil {
|
|
logger.Error("failed to list session objects", zap.Int64("session_id", session.ID), zap.Error(err))
|
|
} else if len(objects) > 0 {
|
|
keys := make([]string, len(objects))
|
|
for j, obj := range objects {
|
|
keys[j] = obj.Key
|
|
}
|
|
if err := objStorage.RemoveObjects(ctx, bucket, keys, storage.RemoveObjectsOptions{}); err != nil {
|
|
logger.Error("failed to remove session objects", zap.Int64("session_id", session.ID), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if err := uploadStore.DeleteSession(ctx, session.ID); err != nil {
|
|
logger.Error("failed to delete session", zap.Int64("session_id", session.ID), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func cleanupLeakedMultipartUploads(ctx context.Context, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
|
if err := objStorage.RemoveIncompleteUpload(ctx, bucket, "uploads/"); err != nil {
|
|
logger.Error("failed to cleanup leaked multipart uploads", zap.Error(err))
|
|
}
|
|
}
|