feat(app): add TaskPoller, wire DI, and add task integration tests
This commit is contained in:
@@ -28,6 +28,8 @@ type App struct {
|
||||
db *gorm.DB
|
||||
server *http.Server
|
||||
cancelCleanup context.CancelFunc
|
||||
taskSvc *service.TaskService
|
||||
taskPoller *TaskPoller
|
||||
}
|
||||
|
||||
// NewApp initializes all application dependencies: DB, Slurm client, services, handlers, router.
|
||||
@@ -43,7 +45,7 @@ func NewApp(cfg *config.Config, logger *zap.Logger) (*App, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
srv, cancelCleanup := initHTTPServer(cfg, gormDB, slurmClient, logger)
|
||||
srv, cancelCleanup, taskSvc, taskPoller := initHTTPServer(cfg, gormDB, slurmClient, logger)
|
||||
|
||||
return &App{
|
||||
cfg: cfg,
|
||||
@@ -51,6 +53,8 @@ func NewApp(cfg *config.Config, logger *zap.Logger) (*App, error) {
|
||||
db: gormDB,
|
||||
server: srv,
|
||||
cancelCleanup: cancelCleanup,
|
||||
taskSvc: taskSvc,
|
||||
taskPoller: taskPoller,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -86,6 +90,14 @@ func (a *App) Run() error {
|
||||
func (a *App) Close() error {
|
||||
var errs []error
|
||||
|
||||
if a.taskSvc != nil {
|
||||
a.taskSvc.StopProcessor()
|
||||
}
|
||||
|
||||
if a.taskPoller != nil {
|
||||
a.taskPoller.Stop()
|
||||
}
|
||||
|
||||
if a.cancelCleanup != nil {
|
||||
a.cancelCleanup()
|
||||
}
|
||||
@@ -145,15 +157,15 @@ func initSlurmClient(cfg *config.Config) (*slurm.Client, error) {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client, logger *zap.Logger) (*http.Server, context.CancelFunc) {
|
||||
func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client, logger *zap.Logger) (*http.Server, context.CancelFunc, *service.TaskService, *TaskPoller) {
|
||||
ctx := context.Background()
|
||||
|
||||
jobSvc := service.NewJobService(slurmClient, logger)
|
||||
clusterSvc := service.NewClusterService(slurmClient, logger)
|
||||
jobH := handler.NewJobHandler(jobSvc, logger)
|
||||
clusterH := handler.NewClusterHandler(clusterSvc, logger)
|
||||
|
||||
appStore := store.NewApplicationStore(db)
|
||||
appSvc := service.NewApplicationService(appStore, jobSvc, cfg.WorkDirBase, logger)
|
||||
appH := handler.NewApplicationHandler(appSvc, logger)
|
||||
|
||||
// File storage initialization
|
||||
minioClient, err := storage.NewMinioClient(cfg.Minio)
|
||||
@@ -165,9 +177,12 @@ func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client,
|
||||
var fileH *handler.FileHandler
|
||||
var folderH *handler.FolderHandler
|
||||
|
||||
taskStore := store.NewTaskStore(db)
|
||||
fileStore := store.NewFileStore(db)
|
||||
blobStore := store.NewBlobStore(db)
|
||||
|
||||
var stagingSvc *service.FileStagingService
|
||||
if minioClient != nil {
|
||||
blobStore := store.NewBlobStore(db)
|
||||
fileStore := store.NewFileStore(db)
|
||||
folderStore := store.NewFolderStore(db)
|
||||
uploadStore := store.NewUploadStore(db)
|
||||
|
||||
@@ -179,25 +194,34 @@ func initHTTPServer(cfg *config.Config, db *gorm.DB, slurmClient *slurm.Client,
|
||||
fileH = handler.NewFileHandler(fileSvc, logger)
|
||||
folderH = handler.NewFolderHandler(folderSvc, logger)
|
||||
|
||||
cleanupCtx, cancelCleanup := context.WithCancel(context.Background())
|
||||
go startCleanupWorker(cleanupCtx, uploadStore, minioClient, cfg.Minio.Bucket, logger)
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, logger)
|
||||
|
||||
addr := ":" + cfg.ServerPort
|
||||
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}, cancelCleanup
|
||||
stagingSvc = service.NewFileStagingService(fileStore, blobStore, minioClient, cfg.Minio.Bucket, logger)
|
||||
}
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, logger)
|
||||
taskSvc := service.NewTaskService(taskStore, appStore, fileStore, blobStore, stagingSvc, jobSvc, cfg.WorkDirBase, logger)
|
||||
taskSvc.StartProcessor(ctx)
|
||||
|
||||
appSvc := service.NewApplicationService(appStore, jobSvc, cfg.WorkDirBase, logger, taskSvc)
|
||||
appH := handler.NewApplicationHandler(appSvc, logger)
|
||||
|
||||
poller := NewTaskPoller(taskSvc, 10*time.Second, logger)
|
||||
poller.Start(ctx)
|
||||
|
||||
taskH := handler.NewTaskHandler(taskSvc, logger)
|
||||
|
||||
var cancelCleanup context.CancelFunc
|
||||
|
||||
if minioClient != nil {
|
||||
cleanupCtx, cancel := context.WithCancel(context.Background())
|
||||
cancelCleanup = cancel
|
||||
go startCleanupWorker(cleanupCtx, store.NewUploadStore(db), minioClient, cfg.Minio.Bucket, logger)
|
||||
}
|
||||
|
||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger)
|
||||
|
||||
addr := ":" + cfg.ServerPort
|
||||
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}, nil
|
||||
}, cancelCleanup, taskSvc, poller
|
||||
}
|
||||
|
||||
61
internal/app/task_poller.go
Normal file
61
internal/app/task_poller.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TaskPollable defines the interface for refreshing stale task statuses.
|
||||
type TaskPollable interface {
|
||||
RefreshStaleTasks(ctx context.Context) error
|
||||
}
|
||||
|
||||
// TaskPoller periodically polls Slurm for task status updates via TaskPollable.
|
||||
type TaskPoller struct {
|
||||
taskSvc TaskPollable
|
||||
interval time.Duration
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewTaskPoller creates a new TaskPoller with the given service, interval, and logger.
|
||||
func NewTaskPoller(taskSvc TaskPollable, interval time.Duration, logger *zap.Logger) *TaskPoller {
|
||||
return &TaskPoller{
|
||||
taskSvc: taskSvc,
|
||||
interval: interval,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the background goroutine that periodically refreshes stale tasks.
|
||||
func (p *TaskPoller) Start(ctx context.Context) {
|
||||
ctx, p.cancel = context.WithCancel(ctx)
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := p.taskSvc.RefreshStaleTasks(ctx); err != nil {
|
||||
p.logger.Error("failed to refresh stale tasks", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop cancels the background goroutine and waits for it to finish.
|
||||
func (p *TaskPoller) Stop() {
|
||||
if p.cancel != nil {
|
||||
p.cancel()
|
||||
}
|
||||
p.wg.Wait()
|
||||
}
|
||||
70
internal/app/task_poller_test.go
Normal file
70
internal/app/task_poller_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type mockTaskPollable struct {
|
||||
refreshFunc func(ctx context.Context) error
|
||||
callCount int
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (m *mockTaskPollable) RefreshStaleTasks(ctx context.Context) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.callCount++
|
||||
if m.refreshFunc != nil {
|
||||
return m.refreshFunc(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTaskPollable) getCallCount() int {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.callCount
|
||||
}
|
||||
|
||||
func TestTaskPoller_StartStop(t *testing.T) {
|
||||
mock := &mockTaskPollable{}
|
||||
logger := zap.NewNop()
|
||||
poller := NewTaskPoller(mock, 1*time.Second, logger)
|
||||
|
||||
poller.Start(context.Background())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
poller.Stop()
|
||||
|
||||
// No goroutine leak — Stop() returned means wg.Wait() completed.
|
||||
}
|
||||
|
||||
func TestTaskPoller_RefreshesStaleTasks(t *testing.T) {
|
||||
mock := &mockTaskPollable{}
|
||||
logger := zap.NewNop()
|
||||
poller := NewTaskPoller(mock, 50*time.Millisecond, logger)
|
||||
|
||||
poller.Start(context.Background())
|
||||
defer poller.Stop()
|
||||
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
if count := mock.getCallCount(); count < 1 {
|
||||
t.Errorf("expected RefreshStaleTasks to be called at least once, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskPoller_StopsCleanly(t *testing.T) {
|
||||
mock := &mockTaskPollable{}
|
||||
logger := zap.NewNop()
|
||||
poller := NewTaskPoller(mock, 1*time.Second, logger)
|
||||
|
||||
poller.Start(context.Background())
|
||||
poller.Stop()
|
||||
|
||||
// No panic and WaitGroup is done — Stop returned successfully.
|
||||
}
|
||||
Reference in New Issue
Block a user