feat: 添加业务服务层和结构化日志

- JobService: 提交、查询、取消、历史记录,记录关键操作日志

- ClusterService: 节点、分区、诊断查询,记录错误日志

- NewSlurmClient: JWT 认证 HTTP 客户端工厂

- 所有构造函数接受 *zap.Logger 参数实现依赖注入

- 提交/取消成功记录 Info,API 错误记录 Error

- 完整 TDD 测试,使用 zaptest/observer 验证日志输出

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
dailz
2026-04-10 08:39:46 +08:00
parent fbfd5c5f42
commit 4903f7d07f
5 changed files with 1598 additions and 0 deletions

View File

@@ -0,0 +1,703 @@
package service
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gcy_hpc_server/internal/model"
"gcy_hpc_server/internal/slurm"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)
func mockJobServer(handler http.HandlerFunc) (*slurm.Client, func()) {
srv := httptest.NewServer(handler)
client, _ := slurm.NewClient(srv.URL, srv.Client())
return client, srv.Close
}
func TestSubmitJob(t *testing.T) {
jobID := int32(123)
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/slurm/v0.0.40/job/submit" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
var body slurm.JobSubmitReq
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode body: %v", err)
}
if body.Job == nil || body.Job.Script == nil || *body.Job.Script != "#!/bin/bash\necho hello" {
t.Errorf("unexpected script in request body")
}
resp := slurm.OpenapiJobSubmitResponse{
Result: &slurm.JobSubmitResponseMsg{
JobID: &jobID,
},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
resp, err := svc.SubmitJob(context.Background(), &model.SubmitJobRequest{
Script: "#!/bin/bash\necho hello",
Partition: "normal",
QOS: "high",
JobName: "test-job",
CPUs: 4,
TimeLimit: "60",
})
if err != nil {
t.Fatalf("SubmitJob: %v", err)
}
if resp.JobID != 123 {
t.Errorf("expected JobID 123, got %d", resp.JobID)
}
}
func TestSubmitJob_WithOptionalFields(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body slurm.JobSubmitReq
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode body: %v", err)
}
if body.Job == nil {
t.Fatal("job desc is nil")
}
if body.Job.Partition != nil {
t.Error("expected partition nil for empty string")
}
if body.Job.MinimumCpus != nil {
t.Error("expected minimum_cpus nil when CPUs=0")
}
jobID := int32(456)
resp := slurm.OpenapiJobSubmitResponse{
Result: &slurm.JobSubmitResponseMsg{JobID: &jobID},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
resp, err := svc.SubmitJob(context.Background(), &model.SubmitJobRequest{
Script: "echo hi",
})
if err != nil {
t.Fatalf("SubmitJob: %v", err)
}
if resp.JobID != 456 {
t.Errorf("expected JobID 456, got %d", resp.JobID)
}
}
func TestSubmitJob_Error(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"internal"}`))
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
_, err := svc.SubmitJob(context.Background(), &model.SubmitJobRequest{
Script: "echo fail",
})
if err == nil {
t.Fatal("expected error, got nil")
}
}
func TestGetJobs(t *testing.T) {
jobID := int32(100)
name := "my-job"
partition := "gpu"
ts := int64(1700000000)
nodes := "node01"
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
resp := slurm.OpenapiJobInfoResp{
Jobs: slurm.JobInfoMsg{
{
JobID: &jobID,
Name: &name,
JobState: []string{"RUNNING"},
Partition: &partition,
SubmitTime: &slurm.Uint64NoVal{Number: &ts},
StartTime: &slurm.Uint64NoVal{Number: &ts},
EndTime: &slurm.Uint64NoVal{Number: &ts},
Nodes: &nodes,
},
},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
jobs, err := svc.GetJobs(context.Background())
if err != nil {
t.Fatalf("GetJobs: %v", err)
}
if len(jobs) != 1 {
t.Fatalf("expected 1 job, got %d", len(jobs))
}
j := jobs[0]
if j.JobID != 100 {
t.Errorf("expected JobID 100, got %d", j.JobID)
}
if j.Name != "my-job" {
t.Errorf("expected Name my-job, got %s", j.Name)
}
if len(j.State) != 1 || j.State[0] != "RUNNING" {
t.Errorf("expected State [RUNNING], got %v", j.State)
}
if j.Partition != "gpu" {
t.Errorf("expected Partition gpu, got %s", j.Partition)
}
if j.SubmitTime == nil || *j.SubmitTime != ts {
t.Errorf("expected SubmitTime %d, got %v", ts, j.SubmitTime)
}
if j.Nodes != "node01" {
t.Errorf("expected Nodes node01, got %s", j.Nodes)
}
}
func TestGetJob(t *testing.T) {
jobID := int32(200)
name := "single-job"
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiJobInfoResp{
Jobs: slurm.JobInfoMsg{
{
JobID: &jobID,
Name: &name,
},
},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
job, err := svc.GetJob(context.Background(), "200")
if err != nil {
t.Fatalf("GetJob: %v", err)
}
if job == nil {
t.Fatal("expected job, got nil")
}
if job.JobID != 200 {
t.Errorf("expected JobID 200, got %d", job.JobID)
}
}
func TestGetJob_NotFound(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiJobInfoResp{Jobs: slurm.JobInfoMsg{}}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
job, err := svc.GetJob(context.Background(), "999")
if err != nil {
t.Fatalf("GetJob: %v", err)
}
if job != nil {
t.Errorf("expected nil for not found, got %+v", job)
}
}
func TestCancelJob(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("expected DELETE, got %s", r.Method)
}
resp := slurm.OpenapiResp{}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
err := svc.CancelJob(context.Background(), "300")
if err != nil {
t.Fatalf("CancelJob: %v", err)
}
}
func TestCancelJob_Error(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`not found`))
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
err := svc.CancelJob(context.Background(), "999")
if err == nil {
t.Fatal("expected error, got nil")
}
}
func TestGetJobHistory(t *testing.T) {
jobID1 := int32(10)
jobID2 := int32(20)
jobID3 := int32(30)
name1 := "hist-1"
name2 := "hist-2"
name3 := "hist-3"
submission1 := int64(1700000000)
submission2 := int64(1700001000)
submission3 := int64(1700002000)
partition := "normal"
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
users := r.URL.Query().Get("users")
if users != "testuser" {
t.Errorf("expected users=testuser, got %s", users)
}
resp := slurm.OpenapiSlurmdbdJobsResp{
Jobs: slurm.JobList{
{
JobID: &jobID1,
Name: &name1,
Partition: &partition,
State: &slurm.JobState{Current: []string{"COMPLETED"}},
Time: &slurm.JobTime{Submission: &submission1},
},
{
JobID: &jobID2,
Name: &name2,
Partition: &partition,
State: &slurm.JobState{Current: []string{"FAILED"}},
Time: &slurm.JobTime{Submission: &submission2},
},
{
JobID: &jobID3,
Name: &name3,
Partition: &partition,
State: &slurm.JobState{Current: []string{"CANCELLED"}},
Time: &slurm.JobTime{Submission: &submission3},
},
},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
result, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{
Users: "testuser",
Page: 1,
PageSize: 2,
})
if err != nil {
t.Fatalf("GetJobHistory: %v", err)
}
if result.Total != 3 {
t.Errorf("expected Total 3, got %d", result.Total)
}
if result.Page != 1 {
t.Errorf("expected Page 1, got %d", result.Page)
}
if result.PageSize != 2 {
t.Errorf("expected PageSize 2, got %d", result.PageSize)
}
if len(result.Jobs) != 2 {
t.Fatalf("expected 2 jobs on page 1, got %d", len(result.Jobs))
}
if result.Jobs[0].JobID != 10 {
t.Errorf("expected first job ID 10, got %d", result.Jobs[0].JobID)
}
if result.Jobs[1].JobID != 20 {
t.Errorf("expected second job ID 20, got %d", result.Jobs[1].JobID)
}
if len(result.Jobs[0].State) != 1 || result.Jobs[0].State[0] != "COMPLETED" {
t.Errorf("expected state [COMPLETED], got %v", result.Jobs[0].State)
}
}
func TestGetJobHistory_Page2(t *testing.T) {
jobID1 := int32(10)
jobID2 := int32(20)
name1 := "a"
name2 := "b"
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiSlurmdbdJobsResp{
Jobs: slurm.JobList{
{JobID: &jobID1, Name: &name1},
{JobID: &jobID2, Name: &name2},
},
}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
result, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{
Page: 2,
PageSize: 1,
})
if err != nil {
t.Fatalf("GetJobHistory: %v", err)
}
if result.Total != 2 {
t.Errorf("expected Total 2, got %d", result.Total)
}
if len(result.Jobs) != 1 {
t.Fatalf("expected 1 job on page 2, got %d", len(result.Jobs))
}
if result.Jobs[0].JobID != 20 {
t.Errorf("expected job ID 20, got %d", result.Jobs[0].JobID)
}
}
func TestGetJobHistory_DefaultPagination(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiSlurmdbdJobsResp{Jobs: slurm.JobList{}}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
result, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{})
if err != nil {
t.Fatalf("GetJobHistory: %v", err)
}
if result.Page != 1 {
t.Errorf("expected default page 1, got %d", result.Page)
}
if result.PageSize != 20 {
t.Errorf("expected default pageSize 20, got %d", result.PageSize)
}
}
func TestGetJobHistory_QueryMapping(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
if v := q.Get("account"); v != "proj1" {
t.Errorf("expected account=proj1, got %s", v)
}
if v := q.Get("partition"); v != "gpu" {
t.Errorf("expected partition=gpu, got %s", v)
}
if v := q.Get("state"); v != "COMPLETED" {
t.Errorf("expected state=COMPLETED, got %s", v)
}
if v := q.Get("job_name"); v != "myjob" {
t.Errorf("expected job_name=myjob, got %s", v)
}
if v := q.Get("start_time"); v != "1700000000" {
t.Errorf("expected start_time=1700000000, got %s", v)
}
if v := q.Get("end_time"); v != "1700099999" {
t.Errorf("expected end_time=1700099999, got %s", v)
}
resp := slurm.OpenapiSlurmdbdJobsResp{Jobs: slurm.JobList{}}
json.NewEncoder(w).Encode(resp)
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
_, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{
Users: "testuser",
Account: "proj1",
Partition: "gpu",
State: "COMPLETED",
JobName: "myjob",
StartTime: "1700000000",
EndTime: "1700099999",
})
if err != nil {
t.Fatalf("GetJobHistory: %v", err)
}
}
func TestGetJobHistory_Error(t *testing.T) {
client, cleanup := mockJobServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"db down"}`))
}))
defer cleanup()
svc := NewJobService(client, zap.NewNop())
_, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{})
if err == nil {
t.Fatal("expected error, got nil")
}
}
func TestMapJobInfo_ExitCode(t *testing.T) {
returnCode := int64(2)
ji := &slurm.JobInfo{
ExitCode: &slurm.ProcessExitCodeVerbose{
ReturnCode: &slurm.Uint32NoVal{Number: &returnCode},
},
}
resp := mapJobInfo(ji)
if resp.ExitCode == nil || *resp.ExitCode != 2 {
t.Errorf("expected exit code 2, got %v", resp.ExitCode)
}
}
func TestMapSlurmdbJob_NilFields(t *testing.T) {
j := &slurm.Job{}
resp := mapSlurmdbJob(j)
if resp.JobID != 0 {
t.Errorf("expected JobID 0, got %d", resp.JobID)
}
if resp.State != nil {
t.Errorf("expected nil State, got %v", resp.State)
}
if resp.SubmitTime != nil {
t.Errorf("expected nil SubmitTime, got %v", resp.SubmitTime)
}
}
// ---------------------------------------------------------------------------
// Structured logging tests using zaptest/observer
// ---------------------------------------------------------------------------
func newJobServiceWithObserver(srv *httptest.Server) (*JobService, *observer.ObservedLogs) {
core, recorded := observer.New(zapcore.DebugLevel)
l := zap.New(core)
client, _ := slurm.NewClient(srv.URL, srv.Client())
return NewJobService(client, l), recorded
}
func TestJobService_SubmitJob_SuccessLog(t *testing.T) {
jobID := int32(789)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiJobSubmitResponse{
Result: &slurm.JobSubmitResponseMsg{JobID: &jobID},
}
json.NewEncoder(w).Encode(resp)
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
_, err := svc.SubmitJob(context.Background(), &model.SubmitJobRequest{
Script: "echo hi",
JobName: "log-test-job",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.InfoLevel {
t.Errorf("expected InfoLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["job_name"] != "log-test-job" {
t.Errorf("expected job_name=log-test-job, got %v", fields["job_name"])
}
gotJobID, ok := fields["job_id"]
if !ok {
t.Fatal("expected job_id field in log entry")
}
if gotJobID != int32(789) && gotJobID != int64(789) {
t.Errorf("expected job_id=789, got %v (%T)", gotJobID, gotJobID)
}
}
func TestJobService_SubmitJob_ErrorLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"internal"}`))
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
_, err := svc.SubmitJob(context.Background(), &model.SubmitJobRequest{Script: "echo fail"})
if err == nil {
t.Fatal("expected error, got nil")
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.ErrorLevel {
t.Errorf("expected ErrorLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["operation"] != "submit" {
t.Errorf("expected operation=submit, got %v", fields["operation"])
}
if _, ok := fields["error"]; !ok {
t.Error("expected error field in log entry")
}
}
func TestJobService_CancelJob_SuccessLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := slurm.OpenapiResp{}
json.NewEncoder(w).Encode(resp)
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
err := svc.CancelJob(context.Background(), "555")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.InfoLevel {
t.Errorf("expected InfoLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["job_id"] != "555" {
t.Errorf("expected job_id=555, got %v", fields["job_id"])
}
}
func TestJobService_CancelJob_ErrorLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`not found`))
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
err := svc.CancelJob(context.Background(), "999")
if err == nil {
t.Fatal("expected error, got nil")
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.ErrorLevel {
t.Errorf("expected ErrorLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["operation"] != "cancel" {
t.Errorf("expected operation=cancel, got %v", fields["operation"])
}
if fields["job_id"] != "999" {
t.Errorf("expected job_id=999, got %v", fields["job_id"])
}
if _, ok := fields["error"]; !ok {
t.Error("expected error field in log entry")
}
}
func TestJobService_GetJobs_ErrorLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"down"}`))
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
_, err := svc.GetJobs(context.Background())
if err == nil {
t.Fatal("expected error, got nil")
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.ErrorLevel {
t.Errorf("expected ErrorLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["operation"] != "get_jobs" {
t.Errorf("expected operation=get_jobs, got %v", fields["operation"])
}
if _, ok := fields["error"]; !ok {
t.Error("expected error field in log entry")
}
}
func TestJobService_GetJob_ErrorLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"down"}`))
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
_, err := svc.GetJob(context.Background(), "200")
if err == nil {
t.Fatal("expected error, got nil")
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.ErrorLevel {
t.Errorf("expected ErrorLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["operation"] != "get_job" {
t.Errorf("expected operation=get_job, got %v", fields["operation"])
}
if fields["job_id"] != "200" {
t.Errorf("expected job_id=200, got %v", fields["job_id"])
}
if _, ok := fields["error"]; !ok {
t.Error("expected error field in log entry")
}
}
func TestJobService_GetJobHistory_ErrorLog(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"db down"}`))
}))
defer srv.Close()
svc, recorded := newJobServiceWithObserver(srv)
_, err := svc.GetJobHistory(context.Background(), &model.JobHistoryQuery{})
if err == nil {
t.Fatal("expected error, got nil")
}
entries := recorded.All()
if len(entries) != 1 {
t.Fatalf("expected 1 log entry, got %d", len(entries))
}
if entries[0].Level != zapcore.ErrorLevel {
t.Errorf("expected ErrorLevel, got %v", entries[0].Level)
}
fields := entries[0].ContextMap()
if fields["operation"] != "get_job_history" {
t.Errorf("expected operation=get_job_history, got %v", fields["operation"])
}
if _, ok := fields["error"]; !ok {
t.Error("expected error field in log entry")
}
}