fix(service): correct CPU/memory mapping and add TRES/memory_used extraction

- Map CPUs to CpusPerTask (not MinimumCpus) for consistent SlurmDBD history

- Add Set:true to memory Uint64NoVal on submission

- Filter number=0 in mapUint64NoValToInt64 to avoid false zeros

- Extract peak memory from Steps.Tres.Requested.Max across all steps

- Add formatTresList, parseGresDetail, extractMemoryFromSteps helpers

- Update mapJobInfo and mapSlurmdbJob with new field mappings

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
dailz
2026-04-20 17:10:19 +08:00
parent d79656c728
commit d9ca9233b3
3 changed files with 125 additions and 15 deletions

View File

@@ -37,7 +37,7 @@ func (s *JobService) SubmitJob(ctx context.Context, req *model.SubmitJobRequest)
jobDesc.CurrentWorkingDirectory = &req.WorkDir
}
if req.CPUs > 0 {
jobDesc.MinimumCpus = slurm.Ptr(req.CPUs)
jobDesc.CpusPerTask = slurm.Ptr(req.CPUs)
}
if req.TimeLimit != "" {
if mins, err := strconv.ParseInt(req.TimeLimit, 10, 64); err == nil {
@@ -51,10 +51,10 @@ func (s *JobService) SubmitJob(ctx context.Context, req *model.SubmitJobRequest)
}
if req.MemoryPerNode != nil {
jobDesc.MemoryPerNode = &slurm.Uint64NoVal{Number: req.MemoryPerNode}
jobDesc.MemoryPerNode = &slurm.Uint64NoVal{Set: slurm.Ptr(true), Number: req.MemoryPerNode}
}
if req.MemoryPerCpu != nil {
jobDesc.MemoryPerCpu = &slurm.Uint64NoVal{Number: req.MemoryPerCpu}
jobDesc.MemoryPerCpu = &slurm.Uint64NoVal{Set: slurm.Ptr(true), Number: req.MemoryPerCpu}
}
if req.Nodes != nil {
jobDesc.Nodes = req.Nodes
@@ -461,6 +461,88 @@ func mapUint32NoValToInt32(v *slurm.Uint32NoVal) *int32 {
return nil
}
func mapUint16NoValToInt32(v *slurm.Uint16NoVal) *int32 {
if v != nil && v.Number != nil {
n := int32(*v.Number)
return &n
}
return nil
}
func mapUint64NoValToInt64(v *slurm.Uint64NoVal) *int64 {
if v != nil && v.Number != nil && *v.Number != 0 {
return v.Number
}
return nil
}
// formatTresList serializes a TresList to Slurm-style TRES string.
// Format: "type=count,type:name=count" (e.g. "billing=4,cpu=4,mem=16384M").
func formatTresList(tl slurm.TresList) string {
if len(tl) == 0 {
return ""
}
parts := make([]string, 0, len(tl))
for _, t := range tl {
if t.Type == nil || t.Count == nil {
continue
}
key := *t.Type
if t.Name != nil && *t.Name != "" {
key += ":" + *t.Name
}
parts = append(parts, fmt.Sprintf("%s=%d", key, *t.Count))
}
return strings.Join(parts, ",")
}
// parseGresDetail parses a comma-separated GRES string into a slice.
func parseGresDetail(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
result := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
result = append(result, p)
}
}
if len(result) == 0 {
return nil
}
return result
}
// extractMemoryFromSteps extracts peak memory usage (in MB) from a SlurmDBD
// job's step TRES data. Despite the confusing naming, Slurm stores actual
// resource consumption (CPU time, memory RSS) in Tres.Requested.Max, not in
// Tres.Consumed (which holds I/O output data like disk/network).
// It scans ALL steps and returns the maximum mem value found.
// Returns nil if no memory data is available.
func extractMemoryFromSteps(steps slurm.StepList) *int64 {
var peakMB int64
for _, step := range steps {
if step.Tres == nil || step.Tres.Requested == nil {
continue
}
for _, t := range step.Tres.Requested.Max {
if t.Type != nil && *t.Type == "mem" && t.Count != nil {
// Slurm reports memory in bytes, convert to MB
mb := *t.Count / (1024 * 1024)
if mb > peakMB {
peakMB = mb
}
}
}
}
if peakMB > 0 {
return &peakMB
}
return nil
}
// mapJobInfo maps SDK JobInfo to API JobResponse.
func mapJobInfo(ji *slurm.JobInfo) model.JobResponse {
resp := model.JobResponse{}
@@ -485,6 +567,14 @@ func mapJobInfo(ji *slurm.JobInfo) model.JobResponse {
resp.Tasks = mapUint32NoValToInt32(ji.Tasks)
resp.NodeCount = mapUint32NoValToInt32(ji.NodeCount)
resp.BatchHost = derefStr(ji.BatchHost)
resp.CpusPerTask = mapUint16NoValToInt32(ji.CpusPerTask)
resp.MemoryPerCpu = mapUint64NoValToInt64(ji.MemoryPerCpu)
resp.MemoryPerNode = mapUint64NoValToInt64(ji.MemoryPerNode)
resp.TresReqStr = derefStr(ji.TresReqStr)
resp.TresAllocStr = derefStr(ji.TresAllocStr)
if ji.GresDetail != nil {
resp.GresDetail = []string(ji.GresDetail)
}
if ji.SubmitTime != nil && ji.SubmitTime.Number != nil {
resp.SubmitTime = ji.SubmitTime.Number
}
@@ -555,10 +645,22 @@ func mapSlurmdbJob(j *slurm.Job) model.JobResponse {
}
if j.Required != nil {
resp.Cpus = j.Required.CPUs
resp.CpusPerTask = nil // SlurmDBD Job doesn't expose CpusPerTask directly
resp.MemoryPerCpu = mapUint64NoValToInt64(j.Required.MemoryPerCpu)
resp.MemoryPerNode = mapUint64NoValToInt64(j.Required.MemoryPerNode)
}
if j.AllocationNodes != nil {
resp.NodeCount = j.AllocationNodes
}
if j.Tres != nil {
resp.TresAllocated = formatTresList(j.Tres.Allocated)
resp.TresRequested = formatTresList(j.Tres.Requested)
}
if j.Time != nil {
resp.Elapsed = j.Time.Elapsed
}
resp.MemoryUsed = extractMemoryFromSteps(j.Steps)
resp.GresDetail = parseGresDetail(derefStr(j.UsedGres))
resp.WorkDir = derefStr(j.WorkingDirectory)
return resp
}

View File

@@ -78,8 +78,8 @@ func TestSubmitJob_WithOptionalFields(t *testing.T) {
if body.Job.Partition != nil {
t.Error("expected partition nil for empty string")
}
if body.Job.MinimumCpus != nil {
t.Error("expected minimum_cpus nil when CPUs=0")
if body.Job.CpusPerTask != nil {
t.Error("expected cpus_per_task nil when CPUs=0")
}
jobID := int32(456)
@@ -885,18 +885,22 @@ func TestSubmitJob_AllSchedulingFields(t *testing.T) {
if j.Name == nil || *j.Name != "full-test" {
t.Errorf("Name mismatch: %v", j.Name)
}
if j.MinimumCpus == nil || *j.MinimumCpus != int32(8) {
t.Errorf("MinimumCpus mismatch: %v", j.MinimumCpus)
// CPUs=8 maps to CpusPerTask, then overridden by explicit CpusPerTask=2
if j.CpusPerTask == nil || *j.CpusPerTask != cpusPerTask {
t.Errorf("CpusPerTask mismatch: got %v, want %d (explicit CpusPerTask overrides CPUs)", j.CpusPerTask, cpusPerTask)
}
if j.MinimumCpus != nil {
t.Errorf("MinimumCpus should be nil, got %v", j.MinimumCpus)
}
// --- 22 new scheduling fields ---
// MemoryPerNode → *Uint64NoVal
if j.MemoryPerNode == nil || j.MemoryPerNode.Number == nil || *j.MemoryPerNode.Number != memoryPerNode {
if j.MemoryPerNode == nil || j.MemoryPerNode.Set == nil || !*j.MemoryPerNode.Set || j.MemoryPerNode.Number == nil || *j.MemoryPerNode.Number != memoryPerNode {
t.Errorf("MemoryPerNode mismatch: %v", j.MemoryPerNode)
}
// MemoryPerCpu → *Uint64NoVal
if j.MemoryPerCpu == nil || j.MemoryPerCpu.Number == nil || *j.MemoryPerCpu.Number != memoryPerCpu {
if j.MemoryPerCpu == nil || j.MemoryPerCpu.Set == nil || !*j.MemoryPerCpu.Set || j.MemoryPerCpu.Number == nil || *j.MemoryPerCpu.Number != memoryPerCpu {
t.Errorf("MemoryPerCpu mismatch: %v", j.MemoryPerCpu)
}
// Nodes → *string
@@ -1151,10 +1155,10 @@ func TestSubmitJob_MemoryBothSet(t *testing.T) {
j := body.Job
// Both memory fields should be mapped independently
if j.MemoryPerNode == nil || j.MemoryPerNode.Number == nil || *j.MemoryPerNode.Number != memoryPerNode {
if j.MemoryPerNode == nil || j.MemoryPerNode.Set == nil || !*j.MemoryPerNode.Set || j.MemoryPerNode.Number == nil || *j.MemoryPerNode.Number != memoryPerNode {
t.Errorf("MemoryPerNode mismatch: %v", j.MemoryPerNode)
}
if j.MemoryPerCpu == nil || j.MemoryPerCpu.Number == nil || *j.MemoryPerCpu.Number != memoryPerCpu {
if j.MemoryPerCpu == nil || j.MemoryPerCpu.Set == nil || !*j.MemoryPerCpu.Set || j.MemoryPerCpu.Number == nil || *j.MemoryPerCpu.Number != memoryPerCpu {
t.Errorf("MemoryPerCpu mismatch: %v", j.MemoryPerCpu)
}

View File

@@ -1048,8 +1048,12 @@ func TestProcessTask_SchedulingParams(t *testing.T) {
if j.Partition == nil || *j.Partition != "gpu" {
t.Errorf("Partition = %v, want %q", j.Partition, "gpu")
}
if j.MinimumCpus == nil || *j.MinimumCpus != int32(8) {
t.Errorf("MinimumCpus = %v, want 8", j.MinimumCpus)
// CPUs=8 maps to CpusPerTask, then overridden by explicit CpusPerTask=2
if j.CpusPerTask == nil || *j.CpusPerTask != int32(2) {
t.Errorf("CpusPerTask = %v, want 2 (explicit CpusPerTask overrides CPUs)", j.CpusPerTask)
}
if j.MinimumCpus != nil {
t.Errorf("MinimumCpus should be nil, got %v", j.MinimumCpus)
}
if j.TimeLimit == nil || j.TimeLimit.Number == nil || *j.TimeLimit.Number != int64(60) {
t.Errorf("TimeLimit = %v, want 60", j.TimeLimit)
@@ -1175,8 +1179,8 @@ func TestProcessTask_PartialSchedulingParams(t *testing.T) {
t.Errorf("Partition = %v, want %q", j.Partition, "debug")
}
if j.MinimumCpus != nil {
t.Errorf("MinimumCpus = %v, want nil (no cpus set)", j.MinimumCpus)
if j.CpusPerTask != nil {
t.Errorf("CpusPerTask = %v, want nil (no cpus set)", j.CpusPerTask)
}
if j.TimeLimit == nil {
t.Errorf("TimeLimit = nil, want non-nil (default should be injected)")