Files
hpc/internal/service/cluster_service.go

357 lines
8.4 KiB
Go

package service
import (
"context"
"fmt"
"strconv"
"time"
"gcy_hpc_server/internal/model"
"gcy_hpc_server/internal/slurm"
"go.uber.org/zap"
)
func derefStr(s *string) string {
if s == nil {
return ""
}
return *s
}
func derefInt32(i *int32) int32 {
if i == nil {
return 0
}
return *i
}
func derefInt64(i *int64) int64 {
if i == nil {
return 0
}
return *i
}
func uint32NoValString(v *slurm.Uint32NoVal) string {
if v == nil {
return ""
}
if v.Infinite != nil && *v.Infinite {
return "UNLIMITED"
}
if v.Number != nil {
return strconv.FormatInt(*v.Number, 10)
}
return ""
}
func derefUint64NoValInt64(v *slurm.Uint64NoVal) *int64 {
if v != nil && v.Number != nil {
return v.Number
}
return nil
}
func derefCSVString(cs *slurm.CSVString) string {
if cs == nil || len(*cs) == 0 {
return ""
}
result := ""
for i, s := range *cs {
if i > 0 {
result += ","
}
result += s
}
return result
}
type ClusterService struct {
client *slurm.Client
logger *zap.Logger
}
func NewClusterService(client *slurm.Client, logger *zap.Logger) *ClusterService {
return &ClusterService{client: client, logger: logger}
}
func (s *ClusterService) GetNodes(ctx context.Context) ([]model.NodeResponse, error) {
s.logger.Debug("slurm API request",
zap.String("operation", "GetNodes"),
)
start := time.Now()
resp, _, err := s.client.Nodes.GetNodes(ctx, nil)
took := time.Since(start)
if err != nil {
s.logger.Debug("slurm API error response",
zap.String("operation", "GetNodes"),
zap.Duration("took", took),
zap.Error(err),
)
s.logger.Error("failed to get nodes", zap.Error(err))
return nil, fmt.Errorf("get nodes: %w", err)
}
s.logger.Debug("slurm API response",
zap.String("operation", "GetNodes"),
zap.Duration("took", took),
zap.Any("body", resp),
)
if resp.Nodes == nil {
return nil, nil
}
result := make([]model.NodeResponse, 0, len(*resp.Nodes))
for _, n := range *resp.Nodes {
result = append(result, mapNode(n))
}
return result, nil
}
func (s *ClusterService) GetNode(ctx context.Context, name string) (*model.NodeResponse, error) {
s.logger.Debug("slurm API request",
zap.String("operation", "GetNode"),
zap.String("node_name", name),
)
start := time.Now()
resp, _, err := s.client.Nodes.GetNode(ctx, name, nil)
took := time.Since(start)
if err != nil {
s.logger.Debug("slurm API error response",
zap.String("operation", "GetNode"),
zap.String("node_name", name),
zap.Duration("took", took),
zap.Error(err),
)
s.logger.Error("failed to get node", zap.String("name", name), zap.Error(err))
return nil, fmt.Errorf("get node %s: %w", name, err)
}
s.logger.Debug("slurm API response",
zap.String("operation", "GetNode"),
zap.String("node_name", name),
zap.Duration("took", took),
zap.Any("body", resp),
)
if resp.Nodes == nil || len(*resp.Nodes) == 0 {
return nil, nil
}
n := (*resp.Nodes)[0]
mapped := mapNode(n)
return &mapped, nil
}
func (s *ClusterService) GetPartitions(ctx context.Context) ([]model.PartitionResponse, error) {
s.logger.Debug("slurm API request",
zap.String("operation", "GetPartitions"),
)
start := time.Now()
resp, _, err := s.client.Partitions.GetPartitions(ctx, nil)
took := time.Since(start)
if err != nil {
s.logger.Debug("slurm API error response",
zap.String("operation", "GetPartitions"),
zap.Duration("took", took),
zap.Error(err),
)
s.logger.Error("failed to get partitions", zap.Error(err))
return nil, fmt.Errorf("get partitions: %w", err)
}
s.logger.Debug("slurm API response",
zap.String("operation", "GetPartitions"),
zap.Duration("took", took),
zap.Any("body", resp),
)
if resp.Partitions == nil {
return nil, nil
}
result := make([]model.PartitionResponse, 0, len(*resp.Partitions))
for _, pi := range *resp.Partitions {
result = append(result, mapPartition(pi))
}
return result, nil
}
func (s *ClusterService) GetPartition(ctx context.Context, name string) (*model.PartitionResponse, error) {
s.logger.Debug("slurm API request",
zap.String("operation", "GetPartition"),
zap.String("partition_name", name),
)
start := time.Now()
resp, _, err := s.client.Partitions.GetPartition(ctx, name, nil)
took := time.Since(start)
if err != nil {
s.logger.Debug("slurm API error response",
zap.String("operation", "GetPartition"),
zap.String("partition_name", name),
zap.Duration("took", took),
zap.Error(err),
)
s.logger.Error("failed to get partition", zap.String("name", name), zap.Error(err))
return nil, fmt.Errorf("get partition %s: %w", name, err)
}
s.logger.Debug("slurm API response",
zap.String("operation", "GetPartition"),
zap.String("partition_name", name),
zap.Duration("took", took),
zap.Any("body", resp),
)
if resp.Partitions == nil || len(*resp.Partitions) == 0 {
return nil, nil
}
p := (*resp.Partitions)[0]
mapped := mapPartition(p)
return &mapped, nil
}
func (s *ClusterService) GetDiag(ctx context.Context) (*slurm.OpenapiDiagResp, error) {
s.logger.Debug("slurm API request",
zap.String("operation", "GetDiag"),
)
start := time.Now()
resp, _, err := s.client.Diag.GetDiag(ctx)
took := time.Since(start)
if err != nil {
s.logger.Debug("slurm API error response",
zap.String("operation", "GetDiag"),
zap.Duration("took", took),
zap.Error(err),
)
s.logger.Error("failed to get diag", zap.Error(err))
return nil, fmt.Errorf("get diag: %w", err)
}
s.logger.Debug("slurm API response",
zap.String("operation", "GetDiag"),
zap.Duration("took", took),
zap.Any("body", resp),
)
return resp, nil
}
func mapNode(n slurm.Node) model.NodeResponse {
return model.NodeResponse{
Name: derefStr(n.Name),
State: n.State,
CPUs: derefInt32(n.Cpus),
AllocCpus: n.AllocCpus,
Cores: n.Cores,
Sockets: n.Sockets,
Threads: n.Threads,
RealMemory: derefInt64(n.RealMemory),
AllocMemory: derefInt64(n.AllocMemory),
FreeMem: derefUint64NoValInt64(n.FreeMem),
CpuLoad: n.CpuLoad,
Arch: derefStr(n.Architecture),
OS: derefStr(n.OperatingSystem),
Gres: derefStr(n.Gres),
GresUsed: derefStr(n.GresUsed),
Reason: derefStr(n.Reason),
ReasonSetByUser: derefStr(n.ReasonSetByUser),
Address: derefStr(n.Address),
Hostname: derefStr(n.Hostname),
Weight: n.Weight,
Features: derefCSVString(n.Features),
ActiveFeatures: derefCSVString(n.ActiveFeatures),
}
}
func mapPartition(pi slurm.PartitionInfo) model.PartitionResponse {
var state []string
var isDefault bool
if pi.Partition != nil {
state = pi.Partition.State
for _, s := range state {
if s == "DEFAULT" {
isDefault = true
break
}
}
}
var nodes string
if pi.Nodes != nil {
nodes = derefStr(pi.Nodes.Configured)
}
var totalCPUs int32
if pi.CPUs != nil {
totalCPUs = derefInt32(pi.CPUs.Total)
}
var totalNodes int32
if pi.Nodes != nil {
totalNodes = derefInt32(pi.Nodes.Total)
}
var maxTime string
if pi.Maximums != nil {
maxTime = uint32NoValString(pi.Maximums.Time)
}
var maxNodes *int32
if pi.Maximums != nil {
maxNodes = mapUint32NoValToInt32(pi.Maximums.Nodes)
}
var maxCPUsPerNode *int32
if pi.Maximums != nil {
maxCPUsPerNode = pi.Maximums.CpusPerNode
}
var minNodes *int32
if pi.Minimums != nil {
minNodes = pi.Minimums.Nodes
}
var defaultTime string
if pi.Defaults != nil {
defaultTime = uint32NoValString(pi.Defaults.Time)
}
var graceTime *int32 = pi.GraceTime
var priority *int32
if pi.Priority != nil {
priority = pi.Priority.JobFactor
}
var qosAllowed, qosDeny, qosAssigned string
if pi.QOS != nil {
qosAllowed = derefStr(pi.QOS.Allowed)
qosDeny = derefStr(pi.QOS.Deny)
qosAssigned = derefStr(pi.QOS.Assigned)
}
var accountsAllowed, accountsDeny string
if pi.Accounts != nil {
accountsAllowed = derefStr(pi.Accounts.Allowed)
accountsDeny = derefStr(pi.Accounts.Deny)
}
return model.PartitionResponse{
Name: derefStr(pi.Name),
State: state,
Default: isDefault,
Nodes: nodes,
TotalNodes: totalNodes,
TotalCPUs: totalCPUs,
MaxTime: maxTime,
MaxNodes: maxNodes,
MaxCPUsPerNode: maxCPUsPerNode,
MinNodes: minNodes,
DefaultTime: defaultTime,
GraceTime: graceTime,
Priority: priority,
QOSAllowed: qosAllowed,
QOSDeny: qosDeny,
QOSAssigned: qosAssigned,
AccountsAllowed: accountsAllowed,
AccountsDeny: accountsDeny,
}
}