From 166ca3092c14ed5a89351aea2d0ee9c29988ff0f Mon Sep 17 00:00:00 2001 From: dailz Date: Mon, 20 Apr 2026 10:38:41 +0800 Subject: [PATCH] feat(service): add task defaults, job status, and cluster helpers Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- internal/service/cluster_service.go | 9 ++++ internal/service/job_service.go | 68 +++++++++++++++++++++++ internal/service/task_service.go | 83 +++++++++++++++++++++++++---- 3 files changed, 151 insertions(+), 9 deletions(-) diff --git a/internal/service/cluster_service.go b/internal/service/cluster_service.go index 31684d1..962af08 100644 --- a/internal/service/cluster_service.go +++ b/internal/service/cluster_service.go @@ -26,6 +26,8 @@ func derefInt32(i *int32) int32 { return *i } +func int32Ptr(i int32) *int32 { return &i } + func derefInt64(i *int64) int64 { if i == nil { return 0 @@ -33,6 +35,13 @@ func derefInt64(i *int64) int64 { return *i } +func derefInt32ToStr(i *int32) string { + if i == nil { + return "" + } + return strconv.FormatInt(int64(*i), 10) +} + func uint32NoValString(v *slurm.Uint32NoVal) string { if v == nil { return "" diff --git a/internal/service/job_service.go b/internal/service/job_service.go index 9beb771..994b0e5 100644 --- a/internal/service/job_service.go +++ b/internal/service/job_service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "gcy_hpc_server/internal/model" @@ -49,6 +50,73 @@ func (s *JobService) SubmitJob(ctx context.Context, req *model.SubmitJobRequest) "HOME=/root", } + if req.MemoryPerNode != nil { + jobDesc.MemoryPerNode = &slurm.Uint64NoVal{Number: req.MemoryPerNode} + } + if req.MemoryPerCpu != nil { + jobDesc.MemoryPerCpu = &slurm.Uint64NoVal{Number: req.MemoryPerCpu} + } + if req.Nodes != nil { + jobDesc.Nodes = req.Nodes + } + if req.Tasks != nil { + jobDesc.Tasks = req.Tasks + } + if req.CpusPerTask != nil { + jobDesc.CpusPerTask = req.CpusPerTask + } + if req.Constraints != nil { + jobDesc.Constraints = req.Constraints + } + if req.Reservation != nil { + jobDesc.Reservation = req.Reservation + } + if req.Account != nil { + jobDesc.Account = req.Account + } + if req.Nice != nil { + jobDesc.Nice = req.Nice + } + if req.MailType != nil { + jobDesc.MailType = strings.Split(*req.MailType, ",") + } + if req.MailUser != nil { + jobDesc.MailUser = req.MailUser + } + if req.StandardOutput != nil { + jobDesc.StandardOutput = req.StandardOutput + } + if req.StandardError != nil { + jobDesc.StandardError = req.StandardError + } + if req.StandardInput != nil { + jobDesc.StandardInput = req.StandardInput + } + if req.RequiredNodes != nil { + jobDesc.RequiredNodes = strings.Split(*req.RequiredNodes, ",") + } + if req.ExcludedNodes != nil { + jobDesc.ExcludedNodes = strings.Split(*req.ExcludedNodes, ",") + } + if req.BeginTime != nil { + jobDesc.BeginTime = &slurm.Uint64NoVal{Number: req.BeginTime} + } + if req.Deadline != nil { + jobDesc.Deadline = req.Deadline + } + if req.Array != nil { + jobDesc.Array = req.Array + } + if req.Dependency != nil { + jobDesc.Dependency = req.Dependency + } + if req.Requeue != nil { + jobDesc.Requeue = req.Requeue + } + if req.KillOnNodeFail != nil { + jobDesc.KillOnNodeFail = req.KillOnNodeFail + } + submitReq := &slurm.JobSubmitReq{ Script: &script, Job: jobDesc, diff --git a/internal/service/task_service.go b/internal/service/task_service.go index 707a85f..258c0aa 100644 --- a/internal/service/task_service.go +++ b/internal/service/task_service.go @@ -122,13 +122,40 @@ func (s *TaskService) CreateTask(ctx context.Context, req *model.CreateTaskReque // 8. Create task record task := &model.Task{ - TaskName: taskName, - AppID: app.ID, - AppName: app.Name, - Status: model.TaskStatusSubmitted, - Values: valuesJSON, - InputFileIDs: fileIDsJSON, - SubmittedAt: time.Now(), + TaskName: taskName, + AppID: app.ID, + AppName: app.Name, + Status: model.TaskStatusSubmitted, + Values: valuesJSON, + InputFileIDs: fileIDsJSON, + SubmittedAt: time.Now(), + Partition: derefStr(req.Partition), + Cpus: req.Cpus, + MemoryPerNode: req.MemoryPerNode, + MemoryPerCpu: req.MemoryPerCpu, + TimeLimit: req.TimeLimit, + QOS: req.QOS, + JobName: req.JobName, + Nodes: req.Nodes, + Tasks: req.Tasks, + CpusPerTask: req.CpusPerTask, + Constraints: req.Constraints, + Reservation: req.Reservation, + Account: req.Account, + Nice: req.Nice, + MailType: req.MailType, + MailUser: req.MailUser, + StandardOutput: req.StandardOutput, + StandardError: req.StandardError, + StandardInput: req.StandardInput, + RequiredNodes: req.RequiredNodes, + ExcludedNodes: req.ExcludedNodes, + BeginTime: req.BeginTime, + Deadline: req.Deadline, + Array: req.Array, + Dependency: req.Dependency, + Requeue: req.Requeue, + KillOnNodeFail: req.KillOnNodeFail, } taskID, err := s.taskStore.Create(ctx, task) @@ -309,6 +336,17 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error { } } + // 注入默认调度参数(仅在内存中,不持久化到数据库) + if task.TimeLimit == nil { + task.TimeLimit = int32Ptr(10080) // 168 小时 + } + if task.StandardOutput == nil { + task.StandardOutput = strToPtrOrNil(filepath.Join(workDir, "slurm-%j.out")) + } + if task.StandardError == nil { + task.StandardError = strToPtrOrNil(filepath.Join(workDir, "slurm-%j.err")) + } + // 17. Render script rendered := RenderScript(app.ScriptTemplate, params, values) s.logger.Info("rendered script", @@ -319,8 +357,35 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error { // 18. Submit to Slurm jobResp, err := s.jobSvc.SubmitJob(ctx, &model.SubmitJobRequest{ - Script: rendered, - WorkDir: workDir, + Script: rendered, + WorkDir: workDir, + Partition: task.Partition, + CPUs: derefInt32(task.Cpus), + TimeLimit: derefInt32ToStr(task.TimeLimit), + QOS: derefStr(task.QOS), + JobName: derefStr(task.JobName), + MemoryPerNode: task.MemoryPerNode, + MemoryPerCpu: task.MemoryPerCpu, + Nodes: task.Nodes, + Tasks: task.Tasks, + CpusPerTask: task.CpusPerTask, + Constraints: task.Constraints, + Reservation: task.Reservation, + Account: task.Account, + Nice: task.Nice, + MailType: task.MailType, + MailUser: task.MailUser, + StandardOutput: task.StandardOutput, + StandardError: task.StandardError, + StandardInput: task.StandardInput, + RequiredNodes: task.RequiredNodes, + ExcludedNodes: task.ExcludedNodes, + BeginTime: task.BeginTime, + Deadline: task.Deadline, + Array: task.Array, + Dependency: task.Dependency, + Requeue: task.Requeue, + KillOnNodeFail: task.KillOnNodeFail, }) if err != nil { return fail(model.TaskStepSubmitting, fmt.Sprintf("submit job: %v", err))