diff --git a/internal/mockserver/server.go b/internal/mockserver/server.go index 9db22e3..f12c9b1 100644 --- a/internal/mockserver/server.go +++ b/internal/mockserver/server.go @@ -144,6 +144,9 @@ func New(opts ...Option) (*Server, error) { taskH := handler.NewTaskHandler(taskSvc, logger) // 11. Router + taskSvc.StartProcessor(context.Background()) + + // 12. Router router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger) // 12. HTTP server @@ -202,9 +205,79 @@ func (s *Server) Close() error { return errors.Join(errs...) } +// simulateJobProgress periodically advances mock Slurm job states +// and syncs the corresponding Task statuses. +// Transition: PENDING → RUNNING (after 3s) → COMPLETED (after 5s). +func (s *Server) simulateJobProgress(ctx context.Context) { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, job := range s.mockSlurm.GetAllActiveJobs() { + switch s.mockSlurm.GetJobState(job.JobID) { + case "PENDING": + if time.Since(job.SubmitTime) > 3*time.Second { + s.mockSlurm.SetJobState(job.JobID, "RUNNING") + s.logger.Info("mock: job PENDING → RUNNING", zap.Int32("job_id", job.JobID)) + _ = s.syncTaskStatusFromSlurm(ctx, job.JobID) + } + case "RUNNING": + if job.StartTime != nil && time.Since(*job.StartTime) > 5*time.Second { + s.mockSlurm.SetJobState(job.JobID, "COMPLETED") + s.logger.Info("mock: job RUNNING → COMPLETED", zap.Int32("job_id", job.JobID)) + _ = s.syncTaskStatusFromSlurm(ctx, job.JobID) + } + } + } + } + } +} + +var slurmStateToTaskStatus = map[string]string{ + "PENDING": model.TaskStatusQueued, + "RUNNING": model.TaskStatusRunning, + "COMPLETED": model.TaskStatusCompleted, + "FAILED": model.TaskStatusFailed, + "CANCELLED": model.TaskStatusFailed, + "TIMEOUT": model.TaskStatusFailed, + "NODE_FAIL": model.TaskStatusFailed, +} + +func (s *Server) syncTaskStatusFromSlurm(ctx context.Context, slurmJobID int32) error { + var tasks []model.Task + if err := s.db.Where("slurm_job_id = ?", slurmJobID).Find(&tasks).Error; err != nil { + return err + } + slurmState := s.mockSlurm.GetJobState(slurmJobID) + taskStatus, ok := slurmStateToTaskStatus[slurmState] + if !ok { + return nil + } + for _, t := range tasks { + if t.Status != taskStatus { + s.logger.Info("mock: syncing task status", + zap.Int64("task_id", t.ID), + zap.String("old", t.Status), + zap.String("new", taskStatus), + ) + _ = s.db.Model(&model.Task{}).Where("id = ?", t.ID).Update("status", taskStatus).Error + } + } + return nil +} + // Run starts the HTTP server and blocks until a shutdown signal (SIGINT/SIGTERM) // or a server error occurs. func (s *Server) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go s.simulateJobProgress(ctx) + errCh := make(chan error, 1) go func() { s.logger.Info("starting server", zap.String("addr", s.httpServer.Addr))