diff --git a/cmd/server/integration_job_test.go b/cmd/server/integration_job_test.go index 172505c..a245543 100644 --- a/cmd/server/integration_job_test.go +++ b/cmd/server/integration_job_test.go @@ -3,7 +3,6 @@ package main import ( "bytes" "encoding/json" - "fmt" "net/http" "testing" @@ -66,6 +65,8 @@ func jobSubmitViaAPI(t *testing.T, env *testenv.TestEnv, script string) int32 { return job.JobID } +// [已弃用] 以下测试依赖 POST /api/v1/jobs/submit,该接口已被 POST /tasks 取代。 +/* // TestIntegration_Jobs_Submit verifies POST /api/v1/jobs/submit creates a new job. func TestIntegration_Jobs_Submit(t *testing.T) { env := testenv.NewTestEnv(t) @@ -220,3 +221,4 @@ func TestIntegration_Jobs_History(t *testing.T) { t.Fatalf("cancelled job %d not found in history", jobID) } } +*/ diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index 7ef17ea..629e498 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -53,7 +53,7 @@ func TestRouterRegistration(t *testing.T) { method string path string }{ - {"POST", "/api/v1/jobs/submit"}, + // {"POST", "/api/v1/jobs/submit"}, // [已弃用] 已被 POST /tasks 取代 {"GET", "/api/v1/jobs"}, {"GET", "/api/v1/jobs/history"}, {"GET", "/api/v1/jobs/:id"}, diff --git a/internal/handler/job.go b/internal/handler/job.go index 18c8c74..6954bba 100644 --- a/internal/handler/job.go +++ b/internal/handler/job.go @@ -22,29 +22,31 @@ func NewJobHandler(jobSvc *service.JobService, logger *zap.Logger) *JobHandler { return &JobHandler{jobSvc: jobSvc, logger: logger} } +// [已弃用] SubmitJob 已被 POST /tasks 取代。 +// 保留方法体以防需要回滚。 // SubmitJob handles POST /api/v1/jobs/submit. -func (h *JobHandler) SubmitJob(c *gin.Context) { - var req model.SubmitJobRequest - if err := c.ShouldBindJSON(&req); err != nil { - h.logger.Warn("bad request", zap.String("method", "SubmitJob"), zap.String("error", "invalid request body")) - server.BadRequest(c, "invalid request body") - return - } - if req.Script == "" { - h.logger.Warn("bad request", zap.String("method", "SubmitJob"), zap.String("error", "script is required")) - server.BadRequest(c, "script is required") - return - } - - resp, err := h.jobSvc.SubmitJob(c.Request.Context(), &req) - if err != nil { - h.logger.Error("handler error", zap.String("method", "SubmitJob"), zap.Int("status", http.StatusBadGateway), zap.Error(err)) - server.ErrorWithStatus(c, http.StatusBadGateway, "slurm error: "+err.Error()) - return - } - - server.Created(c, resp) -} +// func (h *JobHandler) SubmitJob(c *gin.Context) { +// var req model.SubmitJobRequest +// if err := c.ShouldBindJSON(&req); err != nil { +// h.logger.Warn("bad request", zap.String("method", "SubmitJob"), zap.String("error", "invalid request body")) +// server.BadRequest(c, "invalid request body") +// return +// } +// if req.Script == "" { +// h.logger.Warn("bad request", zap.String("method", "SubmitJob"), zap.String("error", "script is required")) +// server.BadRequest(c, "script is required") +// return +// } +// +// resp, err := h.jobSvc.SubmitJob(c.Request.Context(), &req) +// if err != nil { +// h.logger.Error("handler error", zap.String("method", "SubmitJob"), zap.Int("status", http.StatusBadGateway), zap.Error(err)) +// server.ErrorWithStatus(c, http.StatusBadGateway, "slurm error: "+err.Error()) +// return +// } +// +// server.Created(c, resp) +// } // GetJobs handles GET /api/v1/jobs with pagination. func (h *JobHandler) GetJobs(c *gin.Context) { diff --git a/internal/handler/job_test.go b/internal/handler/job_test.go index a33689d..61e43e4 100644 --- a/internal/handler/job_test.go +++ b/internal/handler/job_test.go @@ -1,9 +1,7 @@ package handler import ( - "bytes" "encoding/json" - "fmt" "net/http" "net/http/httptest" "testing" @@ -23,7 +21,7 @@ func setupJobRouter(h *JobHandler) *gin.Engine { v1 := r.Group("/api/v1") jobs := v1.Group("/jobs") { - jobs.POST("/submit", h.SubmitJob) + // jobs.POST("/submit", h.SubmitJob) // [已弃用] 已被 POST /tasks 取代 jobs.GET("", h.GetJobs) jobs.GET("/history", h.GetJobHistory) jobs.GET("/:id", h.GetJob) @@ -61,6 +59,8 @@ func handlerLogs(logs *observer.ObservedLogs) []observer.LoggedEntry { return handler } +// [已弃用] SubmitJob 相关测试已被禁用,该接口已被 POST /tasks 取代。 +/* func TestSubmitJob_Success(t *testing.T) { mux := http.NewServeMux() mux.HandleFunc("/slurm/v0.0.40/job/submit", func(w http.ResponseWriter, r *http.Request) { @@ -171,6 +171,9 @@ func TestSubmitJob_SlurmError(t *testing.T) { t.Fatalf("expected 502, got %d: %s", w.Code, w.Body.String()) } } +*/ + +// --- Logging verification tests --- func TestGetJobs_Success(t *testing.T) { mux := http.NewServeMux() @@ -462,6 +465,7 @@ func TestGetJobHistory_DefaultPagination(t *testing.T) { } } +/* func TestSubmitJob_InvalidBody(t *testing.T) { mux := http.NewServeMux() srv, handler := setupJobHandler(mux) @@ -479,9 +483,11 @@ func TestSubmitJob_InvalidBody(t *testing.T) { t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) } } +*/ // --- Logging verification tests --- +/* func TestSubmitJob_InvalidBody_LogsWarn(t *testing.T) { mux := http.NewServeMux() srv, handler, logs := setupJobHandlerWithObserver(mux) @@ -614,6 +620,7 @@ func TestSubmitJob_Success_NoHandlerLogs(t *testing.T) { t.Errorf("expected no handler log entries on success, got %d", len(hLogs)) } } +*/ func TestGetJobs_Error_LogsError(t *testing.T) { mux := http.NewServeMux() diff --git a/internal/server/server.go b/internal/server/server.go index 86141ac..187f52f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,7 +10,7 @@ import ( ) type JobHandler interface { - SubmitJob(c *gin.Context) + // SubmitJob(c *gin.Context) // [已弃用] 已被 POST /tasks 取代 GetJobs(c *gin.Context) GetJobHistory(c *gin.Context) GetJob(c *gin.Context) @@ -73,7 +73,7 @@ func NewRouter(jobH JobHandler, clusterH ClusterHandler, appH ApplicationHandler v1 := r.Group("/api/v1") jobs := v1.Group("/jobs") - jobs.POST("/submit", jobH.SubmitJob) + // jobs.POST("/submit", jobH.SubmitJob) // [已弃用] 已被 POST /tasks 取代 jobs.GET("", jobH.GetJobs) jobs.GET("/history", jobH.GetJobHistory) jobs.GET("/:id", jobH.GetJob) @@ -144,7 +144,7 @@ func NewTestRouter() *gin.Engine { func registerPlaceholderRoutes(v1 *gin.RouterGroup) { jobs := v1.Group("/jobs") - jobs.POST("/submit", notImplemented) + // jobs.POST("/submit", notImplemented) // [已弃用] 已被 POST /tasks 取代 jobs.GET("", notImplemented) jobs.GET("/history", notImplemented) jobs.GET("/:id", notImplemented) diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 9aa51ec..56ecd78 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -17,7 +17,7 @@ func TestAllRoutesRegistered(t *testing.T) { method string path string }{ - {"POST", "/api/v1/jobs/submit"}, + // {"POST", "/api/v1/jobs/submit"}, // [已弃用] 已被 POST /tasks 取代 {"GET", "/api/v1/jobs"}, {"GET", "/api/v1/jobs/history"}, {"GET", "/api/v1/jobs/:id"}, diff --git a/internal/service/task_service.go b/internal/service/task_service.go index 8a8d031..ef344c1 100644 --- a/internal/service/task_service.go +++ b/internal/service/task_service.go @@ -263,7 +263,15 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error { } } - // 13-14. Set ready + submitting + // 13-14. Set ready + submitting (guard: skip if already submitted to Slurm) + if task.SlurmJobID != nil { + s.logger.Info("task already has slurm job, skipping submission", + zap.Int64("task_id", taskID), + zap.Int32("slurm_job_id", *task.SlurmJobID), + ) + return nil + } + if err := s.taskStore.UpdateRetryState(ctx, taskID, model.TaskStatusReady, model.TaskStepSubmitting, 0); err != nil { return fail(model.TaskStepSubmitting, fmt.Sprintf("update status to ready: %v", err)) } @@ -694,6 +702,13 @@ func (s *TaskService) RecoverStuckTasks(ctx context.Context) { return } for i := range tasks { + if tasks[i].SlurmJobID != nil { + s.logger.Info("skipping stuck task recovery, already in slurm", + zap.Int64("taskID", tasks[i].ID), + zap.Int32("slurm_job_id", *tasks[i].SlurmJobID), + ) + continue + } _ = s.taskStore.UpdateStatus(ctx, tasks[i].ID, model.TaskStatusSubmitted, "") s.mu.Lock() if !s.stopped { diff --git a/internal/testutil/testenv/env_test.go b/internal/testutil/testenv/env_test.go index 81c3d0f..68c8b15 100644 --- a/internal/testutil/testenv/env_test.go +++ b/internal/testutil/testenv/env_test.go @@ -49,7 +49,7 @@ func TestAllRoutesRegistered(t *testing.T) { method string path string }{ - {"POST", "/api/v1/jobs/submit"}, + // {"POST", "/api/v1/jobs/submit"}, // [已弃用] 已被 POST /tasks 取代 {"GET", "/api/v1/jobs"}, {"GET", "/api/v1/jobs/history"}, {"GET", "/api/v1/jobs/1"}, @@ -82,8 +82,8 @@ func TestAllRoutesRegistered(t *testing.T) { {"GET", "/api/v1/tasks"}, } - if len(routes) != 30 { - t.Fatalf("expected 31 routes, got %d", len(routes)) + if len(routes) != 29 { + t.Fatalf("expected 30 routes, got %d", len(routes)) } for _, r := range routes {