feat(app): wire file storage DI, cleanup worker, and integration tests
Add DI wiring with graceful MinIO fallback, background cleanup worker for expired sessions and leaked multipart uploads, and end-to-end integration tests. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"gcy_hpc_server/internal/server"
|
||||
"gcy_hpc_server/internal/service"
|
||||
"gcy_hpc_server/internal/slurm"
|
||||
"gcy_hpc_server/internal/storage"
|
||||
"gcy_hpc_server/internal/store"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@@ -22,10 +23,11 @@ import (
|
||||
)
|
||||
|
||||
type App struct {
|
||||
cfg *config.Config
|
||||
logger *zap.Logger
|
||||
db *gorm.DB
|
||||
server *http.Server
|
||||
cfg *config.Config
|
||||
logger *zap.Logger
|
||||
db *gorm.DB
|
||||
server *http.Server
|
||||
cancelCleanup context.CancelFunc
|
||||
}
|
||||
|
||||
// NewApp initializes all application dependencies: DB, Slurm client, services, handlers, router.
|
||||
@@ -41,13 +43,14 @@ func NewApp(cfg *config.Config, logger *zap.Logger) (*App, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
srv := initHTTPServer(cfg, gormDB, slurmClient, logger)
|
||||
srv, cancelCleanup := initHTTPServer(cfg, gormDB, slurmClient, logger)
|
||||
|
||||
return &App{
|
||||
cfg: cfg,
|
||||
logger: logger,
|
||||
db: gormDB,
|
||||
server: srv,
|
||||
cfg: cfg,
|
||||
logger: logger,
|
||||
db: gormDB,
|
||||
server: srv,
|
||||
cancelCleanup: cancelCleanup,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -83,6 +86,10 @@ func (a *App) Run() error {
|
||||
func (a *App) Close() error {
|
||||
var errs []error
|
||||
|
||||
if a.cancelCleanup != nil {
|
||||
a.cancelCleanup()
|
||||
}
|
||||
|
||||
if a.server != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
@@ -138,7 +145,7 @@ func initSlurmClient(cfg *config.Config) (*slurm.Client, error) {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client, logger *zap.Logger) *http.Server {
|
||||
func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client, logger *zap.Logger) (*http.Server, context.CancelFunc) {
|
||||
jobSvc := service.NewJobService(slurmClient, logger)
|
||||
clusterSvc := service.NewClusterService(slurmClient, logger)
|
||||
jobH := handler.NewJobHandler(jobSvc, logger)
|
||||
@@ -148,12 +155,49 @@ func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client,
|
||||
appSvc := service.NewApplicationService(appStore, jobSvc, cfg.WorkDirBase, logger)
|
||||
appH := handler.NewApplicationHandler(appSvc, logger)
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, logger)
|
||||
// File storage initialization
|
||||
minioClient, err := storage.NewMinioClient(cfg.Minio)
|
||||
if err != nil {
|
||||
logger.Warn("failed to initialize MinIO client, file storage disabled", zap.Error(err))
|
||||
}
|
||||
|
||||
var uploadH *handler.UploadHandler
|
||||
var fileH *handler.FileHandler
|
||||
var folderH *handler.FolderHandler
|
||||
|
||||
if minioClient != nil {
|
||||
blobStore := store.NewBlobStore(db)
|
||||
fileStore := store.NewFileStore(db)
|
||||
folderStore := store.NewFolderStore(db)
|
||||
uploadStore := store.NewUploadStore(db)
|
||||
|
||||
uploadSvc := service.NewUploadService(minioClient, blobStore, fileStore, uploadStore, cfg.Minio, db, logger)
|
||||
folderSvc := service.NewFolderService(folderStore, fileStore, logger)
|
||||
fileSvc := service.NewFileService(minioClient, blobStore, fileStore, cfg.Minio.Bucket, db, logger)
|
||||
|
||||
uploadH = handler.NewUploadHandler(uploadSvc, logger)
|
||||
fileH = handler.NewFileHandler(fileSvc, logger)
|
||||
folderH = handler.NewFolderHandler(folderSvc, logger)
|
||||
|
||||
cleanupCtx, cancelCleanup := context.WithCancel(context.Background())
|
||||
go startCleanupWorker(cleanupCtx, uploadStore, minioClient, cfg.Minio.Bucket, logger)
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, logger)
|
||||
|
||||
addr := ":" + cfg.ServerPort
|
||||
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}, cancelCleanup
|
||||
}
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, logger)
|
||||
|
||||
addr := ":" + cfg.ServerPort
|
||||
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
83
internal/app/cleanup.go
Normal file
83
internal/app/cleanup.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gcy_hpc_server/internal/storage"
|
||||
"gcy_hpc_server/internal/store"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// startCleanupWorker runs a background goroutine that periodically cleans up:
|
||||
// 1. Expired upload sessions (mark → delete MinIO chunks → delete DB records)
|
||||
// 2. Leaked multipart uploads from failed ComposeObject calls
|
||||
func startCleanupWorker(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
||||
ticker := time.NewTicker(1 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
|
||||
cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger)
|
||||
cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("cleanup worker stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
cleanupExpiredSessions(ctx, uploadStore, objStorage, bucket, logger)
|
||||
cleanupLeakedMultipartUploads(ctx, objStorage, bucket, logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupExpiredSessions performs three-phase cleanup of expired upload sessions:
|
||||
// Phase 1: Find and mark expired sessions
|
||||
// Phase 2: Delete MinIO temp chunks for each session
|
||||
// Phase 3: Delete DB records (session + chunks)
|
||||
func cleanupExpiredSessions(ctx context.Context, uploadStore *store.UploadStore, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
||||
sessions, err := uploadStore.ListExpiredSessions(ctx)
|
||||
if err != nil {
|
||||
logger.Error("failed to list expired sessions", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(sessions) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("cleaning up expired sessions", zap.Int("count", len(sessions)))
|
||||
|
||||
for i := range sessions {
|
||||
session := &sessions[i]
|
||||
|
||||
if err := uploadStore.UpdateSessionStatus(ctx, session.ID, "expired"); err != nil {
|
||||
logger.Error("failed to mark session expired", zap.Int64("session_id", session.ID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
objects, err := objStorage.ListObjects(ctx, bucket, session.MinioPrefix, true)
|
||||
if err != nil {
|
||||
logger.Error("failed to list session objects", zap.Int64("session_id", session.ID), zap.Error(err))
|
||||
} else if len(objects) > 0 {
|
||||
keys := make([]string, len(objects))
|
||||
for j, obj := range objects {
|
||||
keys[j] = obj.Key
|
||||
}
|
||||
if err := objStorage.RemoveObjects(ctx, bucket, keys, storage.RemoveObjectsOptions{}); err != nil {
|
||||
logger.Error("failed to remove session objects", zap.Int64("session_id", session.ID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := uploadStore.DeleteSession(ctx, session.ID); err != nil {
|
||||
logger.Error("failed to delete session", zap.Int64("session_id", session.ID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func cleanupLeakedMultipartUploads(ctx context.Context, objStorage storage.ObjectStorage, bucket string, logger *zap.Logger) {
|
||||
if err := objStorage.RemoveIncompleteUpload(ctx, bucket, "uploads/"); err != nil {
|
||||
logger.Error("failed to cleanup leaked multipart uploads", zap.Error(err))
|
||||
}
|
||||
}
|
||||
266
internal/app/cleanup_test.go
Normal file
266
internal/app/cleanup_test.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gcy_hpc_server/internal/model"
|
||||
"gcy_hpc_server/internal/storage"
|
||||
"gcy_hpc_server/internal/store"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// mockCleanupStorage implements ObjectStorage for cleanup tests.
|
||||
type mockCleanupStorage struct {
|
||||
listObjectsFn func(ctx context.Context, bucket, prefix string, recursive bool) ([]storage.ObjectInfo, error)
|
||||
removeObjectsFn func(ctx context.Context, bucket string, keys []string, opts storage.RemoveObjectsOptions) error
|
||||
removeIncompleteFn func(ctx context.Context, bucket, object string) error
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) PutObject(_ context.Context, _ string, _ string, _ io.Reader, _ int64, _ storage.PutObjectOptions) (storage.UploadInfo, error) {
|
||||
return storage.UploadInfo{}, nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) GetObject(_ context.Context, _ string, _ string, _ storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) {
|
||||
return nil, storage.ObjectInfo{}, nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) ComposeObject(_ context.Context, _ string, _ string, _ []string) (storage.UploadInfo, error) {
|
||||
return storage.UploadInfo{}, nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) AbortMultipartUpload(_ context.Context, _ string, _ string, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) RemoveIncompleteUpload(_ context.Context, _ string, _ string) error {
|
||||
if m.removeIncompleteFn != nil {
|
||||
return m.removeIncompleteFn(context.Background(), "", "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) RemoveObject(_ context.Context, _ string, _ string, _ storage.RemoveObjectOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) ListObjects(ctx context.Context, bucket string, prefix string, recursive bool) ([]storage.ObjectInfo, error) {
|
||||
if m.listObjectsFn != nil {
|
||||
return m.listObjectsFn(ctx, bucket, prefix, recursive)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) RemoveObjects(ctx context.Context, bucket string, keys []string, opts storage.RemoveObjectsOptions) error {
|
||||
if m.removeObjectsFn != nil {
|
||||
return m.removeObjectsFn(ctx, bucket, keys, opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) BucketExists(_ context.Context, _ string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) MakeBucket(_ context.Context, _ string, _ storage.MakeBucketOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCleanupStorage) StatObject(_ context.Context, _ string, _ string, _ storage.StatObjectOptions) (storage.ObjectInfo, error) {
|
||||
return storage.ObjectInfo{}, nil
|
||||
}
|
||||
|
||||
func setupCleanupTestDB(t *testing.T) (*gorm.DB, *store.UploadStore) {
|
||||
t.Helper()
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("open sqlite: %v", err)
|
||||
}
|
||||
if err := db.AutoMigrate(&model.UploadSession{}, &model.UploadChunk{}); err != nil {
|
||||
t.Fatalf("migrate: %v", err)
|
||||
}
|
||||
return db, store.NewUploadStore(db)
|
||||
}
|
||||
|
||||
func TestCleanupExpiredSessions(t *testing.T) {
|
||||
_, uploadStore := setupCleanupTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
past := time.Now().Add(-1 * time.Hour)
|
||||
err := uploadStore.CreateSession(ctx, &model.UploadSession{
|
||||
FileName: "expired.bin",
|
||||
FileSize: 1024,
|
||||
ChunkSize: 16 << 20,
|
||||
TotalChunks: 1,
|
||||
SHA256: "expired_hash",
|
||||
Status: "pending",
|
||||
MinioPrefix: "uploads/99/",
|
||||
ExpiresAt: past,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create session: %v", err)
|
||||
}
|
||||
|
||||
var listedPrefix string
|
||||
var removedKeys []string
|
||||
|
||||
mockStore := &mockCleanupStorage{
|
||||
listObjectsFn: func(_ context.Context, _, prefix string, _ bool) ([]storage.ObjectInfo, error) {
|
||||
listedPrefix = prefix
|
||||
return []storage.ObjectInfo{{Key: "uploads/99/chunk_00000", Size: 100}}, nil
|
||||
},
|
||||
removeObjectsFn: func(_ context.Context, _ string, keys []string, _ storage.RemoveObjectsOptions) error {
|
||||
removedKeys = keys
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
cleanupExpiredSessions(ctx, uploadStore, mockStore, "test-bucket", zap.NewNop())
|
||||
|
||||
if listedPrefix != "uploads/99/" {
|
||||
t.Errorf("listed prefix = %q, want %q", listedPrefix, "uploads/99/")
|
||||
}
|
||||
if len(removedKeys) != 1 || removedKeys[0] != "uploads/99/chunk_00000" {
|
||||
t.Errorf("removed keys = %v, want [uploads/99/chunk_00000]", removedKeys)
|
||||
}
|
||||
|
||||
session, err := uploadStore.GetSession(ctx, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("get session: %v", err)
|
||||
}
|
||||
if session != nil {
|
||||
t.Error("session should be deleted after cleanup")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupExpiredSessions_Empty(t *testing.T) {
|
||||
_, uploadStore := setupCleanupTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
called := false
|
||||
mockStore := &mockCleanupStorage{
|
||||
listObjectsFn: func(_ context.Context, _, _ string, _ bool) ([]storage.ObjectInfo, error) {
|
||||
called = true
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
cleanupExpiredSessions(ctx, uploadStore, mockStore, "test-bucket", zap.NewNop())
|
||||
|
||||
if called {
|
||||
t.Error("ListObjects should not be called when no expired sessions exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupExpiredSessions_CompletedNotCleaned(t *testing.T) {
|
||||
_, uploadStore := setupCleanupTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
past := time.Now().Add(-1 * time.Hour)
|
||||
err := uploadStore.CreateSession(ctx, &model.UploadSession{
|
||||
FileName: "completed.bin",
|
||||
FileSize: 1024,
|
||||
ChunkSize: 16 << 20,
|
||||
TotalChunks: 1,
|
||||
SHA256: "completed_hash",
|
||||
Status: "completed",
|
||||
MinioPrefix: "uploads/100/",
|
||||
ExpiresAt: past,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create session: %v", err)
|
||||
}
|
||||
|
||||
called := false
|
||||
mockStore := &mockCleanupStorage{
|
||||
listObjectsFn: func(_ context.Context, _, _ string, _ bool) ([]storage.ObjectInfo, error) {
|
||||
called = true
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
cleanupExpiredSessions(ctx, uploadStore, mockStore, "test-bucket", zap.NewNop())
|
||||
|
||||
if called {
|
||||
t.Error("ListObjects should not be called for completed sessions")
|
||||
}
|
||||
|
||||
session, _ := uploadStore.GetSession(ctx, 1)
|
||||
if session == nil {
|
||||
t.Error("completed session should not be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupWorker_StopsOnContextCancel(t *testing.T) {
|
||||
_, uploadStore := setupCleanupTestDB(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
mockStore := &mockCleanupStorage{}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
startCleanupWorker(ctx, uploadStore, mockStore, "test-bucket", zap.NewNop())
|
||||
close(done)
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("worker did not stop after context cancel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupExpiredSessions_NoObjects(t *testing.T) {
|
||||
_, uploadStore := setupCleanupTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
past := time.Now().Add(-1 * time.Hour)
|
||||
err := uploadStore.CreateSession(ctx, &model.UploadSession{
|
||||
FileName: "empty.bin",
|
||||
FileSize: 1024,
|
||||
ChunkSize: 16 << 20,
|
||||
TotalChunks: 1,
|
||||
SHA256: "empty_hash",
|
||||
Status: "pending",
|
||||
MinioPrefix: "uploads/200/",
|
||||
ExpiresAt: past,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create session: %v", err)
|
||||
}
|
||||
|
||||
listCalled := false
|
||||
removeCalled := false
|
||||
mockStore := &mockCleanupStorage{
|
||||
listObjectsFn: func(_ context.Context, _, _ string, _ bool) ([]storage.ObjectInfo, error) {
|
||||
listCalled = true
|
||||
return nil, nil
|
||||
},
|
||||
removeObjectsFn: func(_ context.Context, _ string, _ []string, _ storage.RemoveObjectsOptions) error {
|
||||
removeCalled = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
cleanupExpiredSessions(ctx, uploadStore, mockStore, "test-bucket", zap.NewNop())
|
||||
|
||||
if !listCalled {
|
||||
t.Error("ListObjects should be called")
|
||||
}
|
||||
if removeCalled {
|
||||
t.Error("RemoveObjects should not be called when no objects found")
|
||||
}
|
||||
|
||||
session, _ := uploadStore.GetSession(ctx, 1)
|
||||
if session != nil {
|
||||
t.Error("session should be deleted even when no objects found")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user