feat(service): add task defaults, job status, and cluster helpers
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -26,6 +26,8 @@ func derefInt32(i *int32) int32 {
|
||||
return *i
|
||||
}
|
||||
|
||||
func int32Ptr(i int32) *int32 { return &i }
|
||||
|
||||
func derefInt64(i *int64) int64 {
|
||||
if i == nil {
|
||||
return 0
|
||||
@@ -33,6 +35,13 @@ func derefInt64(i *int64) int64 {
|
||||
return *i
|
||||
}
|
||||
|
||||
func derefInt32ToStr(i *int32) string {
|
||||
if i == nil {
|
||||
return ""
|
||||
}
|
||||
return strconv.FormatInt(int64(*i), 10)
|
||||
}
|
||||
|
||||
func uint32NoValString(v *slurm.Uint32NoVal) string {
|
||||
if v == nil {
|
||||
return ""
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gcy_hpc_server/internal/model"
|
||||
@@ -49,6 +50,73 @@ func (s *JobService) SubmitJob(ctx context.Context, req *model.SubmitJobRequest)
|
||||
"HOME=/root",
|
||||
}
|
||||
|
||||
if req.MemoryPerNode != nil {
|
||||
jobDesc.MemoryPerNode = &slurm.Uint64NoVal{Number: req.MemoryPerNode}
|
||||
}
|
||||
if req.MemoryPerCpu != nil {
|
||||
jobDesc.MemoryPerCpu = &slurm.Uint64NoVal{Number: req.MemoryPerCpu}
|
||||
}
|
||||
if req.Nodes != nil {
|
||||
jobDesc.Nodes = req.Nodes
|
||||
}
|
||||
if req.Tasks != nil {
|
||||
jobDesc.Tasks = req.Tasks
|
||||
}
|
||||
if req.CpusPerTask != nil {
|
||||
jobDesc.CpusPerTask = req.CpusPerTask
|
||||
}
|
||||
if req.Constraints != nil {
|
||||
jobDesc.Constraints = req.Constraints
|
||||
}
|
||||
if req.Reservation != nil {
|
||||
jobDesc.Reservation = req.Reservation
|
||||
}
|
||||
if req.Account != nil {
|
||||
jobDesc.Account = req.Account
|
||||
}
|
||||
if req.Nice != nil {
|
||||
jobDesc.Nice = req.Nice
|
||||
}
|
||||
if req.MailType != nil {
|
||||
jobDesc.MailType = strings.Split(*req.MailType, ",")
|
||||
}
|
||||
if req.MailUser != nil {
|
||||
jobDesc.MailUser = req.MailUser
|
||||
}
|
||||
if req.StandardOutput != nil {
|
||||
jobDesc.StandardOutput = req.StandardOutput
|
||||
}
|
||||
if req.StandardError != nil {
|
||||
jobDesc.StandardError = req.StandardError
|
||||
}
|
||||
if req.StandardInput != nil {
|
||||
jobDesc.StandardInput = req.StandardInput
|
||||
}
|
||||
if req.RequiredNodes != nil {
|
||||
jobDesc.RequiredNodes = strings.Split(*req.RequiredNodes, ",")
|
||||
}
|
||||
if req.ExcludedNodes != nil {
|
||||
jobDesc.ExcludedNodes = strings.Split(*req.ExcludedNodes, ",")
|
||||
}
|
||||
if req.BeginTime != nil {
|
||||
jobDesc.BeginTime = &slurm.Uint64NoVal{Number: req.BeginTime}
|
||||
}
|
||||
if req.Deadline != nil {
|
||||
jobDesc.Deadline = req.Deadline
|
||||
}
|
||||
if req.Array != nil {
|
||||
jobDesc.Array = req.Array
|
||||
}
|
||||
if req.Dependency != nil {
|
||||
jobDesc.Dependency = req.Dependency
|
||||
}
|
||||
if req.Requeue != nil {
|
||||
jobDesc.Requeue = req.Requeue
|
||||
}
|
||||
if req.KillOnNodeFail != nil {
|
||||
jobDesc.KillOnNodeFail = req.KillOnNodeFail
|
||||
}
|
||||
|
||||
submitReq := &slurm.JobSubmitReq{
|
||||
Script: &script,
|
||||
Job: jobDesc,
|
||||
|
||||
@@ -122,13 +122,40 @@ func (s *TaskService) CreateTask(ctx context.Context, req *model.CreateTaskReque
|
||||
|
||||
// 8. Create task record
|
||||
task := &model.Task{
|
||||
TaskName: taskName,
|
||||
AppID: app.ID,
|
||||
AppName: app.Name,
|
||||
Status: model.TaskStatusSubmitted,
|
||||
Values: valuesJSON,
|
||||
InputFileIDs: fileIDsJSON,
|
||||
SubmittedAt: time.Now(),
|
||||
TaskName: taskName,
|
||||
AppID: app.ID,
|
||||
AppName: app.Name,
|
||||
Status: model.TaskStatusSubmitted,
|
||||
Values: valuesJSON,
|
||||
InputFileIDs: fileIDsJSON,
|
||||
SubmittedAt: time.Now(),
|
||||
Partition: derefStr(req.Partition),
|
||||
Cpus: req.Cpus,
|
||||
MemoryPerNode: req.MemoryPerNode,
|
||||
MemoryPerCpu: req.MemoryPerCpu,
|
||||
TimeLimit: req.TimeLimit,
|
||||
QOS: req.QOS,
|
||||
JobName: req.JobName,
|
||||
Nodes: req.Nodes,
|
||||
Tasks: req.Tasks,
|
||||
CpusPerTask: req.CpusPerTask,
|
||||
Constraints: req.Constraints,
|
||||
Reservation: req.Reservation,
|
||||
Account: req.Account,
|
||||
Nice: req.Nice,
|
||||
MailType: req.MailType,
|
||||
MailUser: req.MailUser,
|
||||
StandardOutput: req.StandardOutput,
|
||||
StandardError: req.StandardError,
|
||||
StandardInput: req.StandardInput,
|
||||
RequiredNodes: req.RequiredNodes,
|
||||
ExcludedNodes: req.ExcludedNodes,
|
||||
BeginTime: req.BeginTime,
|
||||
Deadline: req.Deadline,
|
||||
Array: req.Array,
|
||||
Dependency: req.Dependency,
|
||||
Requeue: req.Requeue,
|
||||
KillOnNodeFail: req.KillOnNodeFail,
|
||||
}
|
||||
|
||||
taskID, err := s.taskStore.Create(ctx, task)
|
||||
@@ -309,6 +336,17 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error {
|
||||
}
|
||||
}
|
||||
|
||||
// 注入默认调度参数(仅在内存中,不持久化到数据库)
|
||||
if task.TimeLimit == nil {
|
||||
task.TimeLimit = int32Ptr(10080) // 168 小时
|
||||
}
|
||||
if task.StandardOutput == nil {
|
||||
task.StandardOutput = strToPtrOrNil(filepath.Join(workDir, "slurm-%j.out"))
|
||||
}
|
||||
if task.StandardError == nil {
|
||||
task.StandardError = strToPtrOrNil(filepath.Join(workDir, "slurm-%j.err"))
|
||||
}
|
||||
|
||||
// 17. Render script
|
||||
rendered := RenderScript(app.ScriptTemplate, params, values)
|
||||
s.logger.Info("rendered script",
|
||||
@@ -319,8 +357,35 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error {
|
||||
|
||||
// 18. Submit to Slurm
|
||||
jobResp, err := s.jobSvc.SubmitJob(ctx, &model.SubmitJobRequest{
|
||||
Script: rendered,
|
||||
WorkDir: workDir,
|
||||
Script: rendered,
|
||||
WorkDir: workDir,
|
||||
Partition: task.Partition,
|
||||
CPUs: derefInt32(task.Cpus),
|
||||
TimeLimit: derefInt32ToStr(task.TimeLimit),
|
||||
QOS: derefStr(task.QOS),
|
||||
JobName: derefStr(task.JobName),
|
||||
MemoryPerNode: task.MemoryPerNode,
|
||||
MemoryPerCpu: task.MemoryPerCpu,
|
||||
Nodes: task.Nodes,
|
||||
Tasks: task.Tasks,
|
||||
CpusPerTask: task.CpusPerTask,
|
||||
Constraints: task.Constraints,
|
||||
Reservation: task.Reservation,
|
||||
Account: task.Account,
|
||||
Nice: task.Nice,
|
||||
MailType: task.MailType,
|
||||
MailUser: task.MailUser,
|
||||
StandardOutput: task.StandardOutput,
|
||||
StandardError: task.StandardError,
|
||||
StandardInput: task.StandardInput,
|
||||
RequiredNodes: task.RequiredNodes,
|
||||
ExcludedNodes: task.ExcludedNodes,
|
||||
BeginTime: task.BeginTime,
|
||||
Deadline: task.Deadline,
|
||||
Array: task.Array,
|
||||
Dependency: task.Dependency,
|
||||
Requeue: task.Requeue,
|
||||
KillOnNodeFail: task.KillOnNodeFail,
|
||||
})
|
||||
if err != nil {
|
||||
return fail(model.TaskStepSubmitting, fmt.Sprintf("submit job: %v", err))
|
||||
|
||||
Reference in New Issue
Block a user