feat(mockserver): update mock server for file upload and task defaults
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -144,6 +144,9 @@ func New(opts ...Option) (*Server, error) {
|
|||||||
taskH := handler.NewTaskHandler(taskSvc, logger)
|
taskH := handler.NewTaskHandler(taskSvc, logger)
|
||||||
|
|
||||||
// 11. Router
|
// 11. Router
|
||||||
|
taskSvc.StartProcessor(context.Background())
|
||||||
|
|
||||||
|
// 12. Router
|
||||||
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger)
|
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger)
|
||||||
|
|
||||||
// 12. HTTP server
|
// 12. HTTP server
|
||||||
@@ -202,9 +205,79 @@ func (s *Server) Close() error {
|
|||||||
return errors.Join(errs...)
|
return errors.Join(errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// simulateJobProgress periodically advances mock Slurm job states
|
||||||
|
// and syncs the corresponding Task statuses.
|
||||||
|
// Transition: PENDING → RUNNING (after 3s) → COMPLETED (after 5s).
|
||||||
|
func (s *Server) simulateJobProgress(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(2 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
for _, job := range s.mockSlurm.GetAllActiveJobs() {
|
||||||
|
switch s.mockSlurm.GetJobState(job.JobID) {
|
||||||
|
case "PENDING":
|
||||||
|
if time.Since(job.SubmitTime) > 3*time.Second {
|
||||||
|
s.mockSlurm.SetJobState(job.JobID, "RUNNING")
|
||||||
|
s.logger.Info("mock: job PENDING → RUNNING", zap.Int32("job_id", job.JobID))
|
||||||
|
_ = s.syncTaskStatusFromSlurm(ctx, job.JobID)
|
||||||
|
}
|
||||||
|
case "RUNNING":
|
||||||
|
if job.StartTime != nil && time.Since(*job.StartTime) > 5*time.Second {
|
||||||
|
s.mockSlurm.SetJobState(job.JobID, "COMPLETED")
|
||||||
|
s.logger.Info("mock: job RUNNING → COMPLETED", zap.Int32("job_id", job.JobID))
|
||||||
|
_ = s.syncTaskStatusFromSlurm(ctx, job.JobID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var slurmStateToTaskStatus = map[string]string{
|
||||||
|
"PENDING": model.TaskStatusQueued,
|
||||||
|
"RUNNING": model.TaskStatusRunning,
|
||||||
|
"COMPLETED": model.TaskStatusCompleted,
|
||||||
|
"FAILED": model.TaskStatusFailed,
|
||||||
|
"CANCELLED": model.TaskStatusFailed,
|
||||||
|
"TIMEOUT": model.TaskStatusFailed,
|
||||||
|
"NODE_FAIL": model.TaskStatusFailed,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) syncTaskStatusFromSlurm(ctx context.Context, slurmJobID int32) error {
|
||||||
|
var tasks []model.Task
|
||||||
|
if err := s.db.Where("slurm_job_id = ?", slurmJobID).Find(&tasks).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
slurmState := s.mockSlurm.GetJobState(slurmJobID)
|
||||||
|
taskStatus, ok := slurmStateToTaskStatus[slurmState]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, t := range tasks {
|
||||||
|
if t.Status != taskStatus {
|
||||||
|
s.logger.Info("mock: syncing task status",
|
||||||
|
zap.Int64("task_id", t.ID),
|
||||||
|
zap.String("old", t.Status),
|
||||||
|
zap.String("new", taskStatus),
|
||||||
|
)
|
||||||
|
_ = s.db.Model(&model.Task{}).Where("id = ?", t.ID).Update("status", taskStatus).Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Run starts the HTTP server and blocks until a shutdown signal (SIGINT/SIGTERM)
|
// Run starts the HTTP server and blocks until a shutdown signal (SIGINT/SIGTERM)
|
||||||
// or a server error occurs.
|
// or a server error occurs.
|
||||||
func (s *Server) Run() error {
|
func (s *Server) Run() error {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go s.simulateJobProgress(ctx)
|
||||||
|
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
s.logger.Info("starting server", zap.String("addr", s.httpServer.Addr))
|
s.logger.Info("starting server", zap.String("addr", s.httpServer.Addr))
|
||||||
|
|||||||
Reference in New Issue
Block a user