Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
316 lines
9.1 KiB
Go
316 lines
9.1 KiB
Go
package mockserver
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gcy_hpc_server/internal/config"
|
|
"gcy_hpc_server/internal/handler"
|
|
"gcy_hpc_server/internal/model"
|
|
"gcy_hpc_server/internal/server"
|
|
"gcy_hpc_server/internal/service"
|
|
"gcy_hpc_server/internal/slurm"
|
|
"gcy_hpc_server/internal/store"
|
|
"gcy_hpc_server/internal/testutil/mockminio"
|
|
"gcy_hpc_server/internal/testutil/mockslurm"
|
|
|
|
"go.uber.org/zap"
|
|
"gorm.io/driver/sqlite"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type serverConfig struct {
|
|
port int
|
|
}
|
|
|
|
// Option configures a Server during construction.
|
|
type Option func(*serverConfig)
|
|
|
|
// WithPort sets the listen port (default 8080).
|
|
func WithPort(port int) Option {
|
|
return func(c *serverConfig) { c.port = port }
|
|
}
|
|
|
|
// Server is a standalone mock server that wires all dependencies using
|
|
// in-memory SQLite, MockSlurm, and MockMinIO — suitable for integration
|
|
// testing or development without external infrastructure.
|
|
type Server struct {
|
|
httpServer *http.Server
|
|
mockSlurmSrv *httptest.Server
|
|
mockSlurm *mockslurm.MockSlurm
|
|
db *gorm.DB
|
|
workDir string
|
|
logger *zap.Logger
|
|
|
|
taskSvc *service.TaskService
|
|
appSvc *service.ApplicationService
|
|
}
|
|
|
|
// New creates a fully wired mock server. All dependencies are initialized
|
|
// in-memory. Call Close() to release resources, or Run() to start serving.
|
|
func New(opts ...Option) (*Server, error) {
|
|
cfg := &serverConfig{port: 8080}
|
|
for _, o := range opts {
|
|
o(cfg)
|
|
}
|
|
|
|
// 1. Logger
|
|
logger := zap.NewExample()
|
|
|
|
// 2. SQLite in-memory DB + AutoMigrate
|
|
dbName := fmt.Sprintf("file:mockserver-%d?mode=memory&cache=shared", time.Now().UnixNano())
|
|
db, err := gorm.Open(sqlite.Open(dbName), &gorm.Config{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open sqlite: %w", err)
|
|
}
|
|
sqlDB, err := db.DB()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get underlying sql.DB: %w", err)
|
|
}
|
|
sqlDB.SetMaxOpenConns(1)
|
|
if err := db.AutoMigrate(
|
|
&model.Application{},
|
|
&model.FileBlob{},
|
|
&model.File{},
|
|
&model.Folder{},
|
|
&model.UploadSession{},
|
|
&model.UploadChunk{},
|
|
&model.Task{},
|
|
); err != nil {
|
|
return nil, fmt.Errorf("auto-migrate: %w", err)
|
|
}
|
|
|
|
// 3. Temp work directory
|
|
workDir, err := os.MkdirTemp("", "mockserver-*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create temp dir: %w", err)
|
|
}
|
|
|
|
// 4. MockSlurm
|
|
mockSlurmSrv, mockSlurm := mockslurm.NewMockSlurmServer()
|
|
|
|
// 5. MockMinIO
|
|
mockMinIO := mockminio.NewInMemoryStorage()
|
|
|
|
// 6. Stores
|
|
appStore := store.NewApplicationStore(db)
|
|
taskStore := store.NewTaskStore(db)
|
|
fileStore := store.NewFileStore(db)
|
|
blobStore := store.NewBlobStore(db)
|
|
uploadStore := store.NewUploadStore(db)
|
|
folderStore := store.NewFolderStore(db)
|
|
|
|
// 7. Slurm client
|
|
slurmClient, err := slurm.NewClientWithOpts(mockSlurmSrv.URL, slurm.WithHTTPClient(mockSlurmSrv.Client()))
|
|
if err != nil {
|
|
os.RemoveAll(workDir)
|
|
mockSlurmSrv.Close()
|
|
return nil, fmt.Errorf("create slurm client: %w", err)
|
|
}
|
|
|
|
// 8. MinioConfig
|
|
minioCfg := config.MinioConfig{
|
|
Bucket: "files",
|
|
ChunkSize: 16 << 20,
|
|
MaxFileSize: 50 << 30,
|
|
MinChunkSize: 5 << 20,
|
|
SessionTTL: 48,
|
|
}
|
|
|
|
// 9. Services (dependency order)
|
|
jobSvc := service.NewJobService(slurmClient, logger)
|
|
clusterSvc := service.NewClusterService(slurmClient, logger)
|
|
folderSvc := service.NewFolderService(folderStore, fileStore, logger)
|
|
stagingSvc := service.NewFileStagingService(fileStore, blobStore, mockMinIO, minioCfg.Bucket, logger)
|
|
taskSvc := service.NewTaskService(taskStore, appStore, fileStore, blobStore, stagingSvc, jobSvc, workDir, logger)
|
|
appSvc := service.NewApplicationService(appStore, jobSvc, workDir, logger)
|
|
uploadSvc := service.NewUploadService(mockMinIO, blobStore, fileStore, uploadStore, minioCfg, db, logger)
|
|
fileSvc := service.NewFileService(mockMinIO, blobStore, fileStore, folderStore, minioCfg.Bucket, db, logger)
|
|
|
|
// 10. Handlers
|
|
jobH := handler.NewJobHandler(jobSvc, logger)
|
|
clusterH := handler.NewClusterHandler(clusterSvc, logger)
|
|
appH := handler.NewApplicationHandler(appSvc, logger)
|
|
uploadH := handler.NewUploadHandler(uploadSvc, logger)
|
|
fileH := handler.NewFileHandler(fileSvc, logger)
|
|
folderH := handler.NewFolderHandler(folderSvc, logger)
|
|
taskH := handler.NewTaskHandler(taskSvc, logger)
|
|
|
|
// 11. Router
|
|
taskSvc.StartProcessor(context.Background())
|
|
|
|
// 12. Router
|
|
router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger)
|
|
|
|
// 12. HTTP server
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", cfg.port),
|
|
Handler: router,
|
|
}
|
|
|
|
return &Server{
|
|
httpServer: httpServer,
|
|
mockSlurmSrv: mockSlurmSrv,
|
|
mockSlurm: mockSlurm,
|
|
db: db,
|
|
workDir: workDir,
|
|
logger: logger,
|
|
taskSvc: taskSvc,
|
|
appSvc: appSvc,
|
|
}, nil
|
|
}
|
|
|
|
// Close releases all resources: HTTP server, mock slurm, database, temp directory.
|
|
func (s *Server) Close() error {
|
|
var errs []error
|
|
|
|
if s.taskSvc != nil {
|
|
s.taskSvc.StopProcessor()
|
|
}
|
|
|
|
if s.httpServer != nil {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := s.httpServer.Shutdown(ctx); err != nil && err != http.ErrServerClosed {
|
|
errs = append(errs, fmt.Errorf("shutdown http server: %w", err))
|
|
}
|
|
}
|
|
|
|
if s.mockSlurmSrv != nil {
|
|
s.mockSlurmSrv.Close()
|
|
}
|
|
|
|
if s.db != nil {
|
|
sqlDB, err := s.db.DB()
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("get underlying sql.DB: %w", err))
|
|
} else if err := sqlDB.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("close database: %w", err))
|
|
}
|
|
}
|
|
|
|
if s.workDir != "" {
|
|
if err := os.RemoveAll(s.workDir); err != nil {
|
|
errs = append(errs, fmt.Errorf("remove work dir: %w", err))
|
|
}
|
|
}
|
|
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
// simulateJobProgress periodically advances mock Slurm job states
|
|
// and syncs the corresponding Task statuses.
|
|
// Transition: PENDING → RUNNING (after 3s) → COMPLETED (after 5s).
|
|
func (s *Server) simulateJobProgress(ctx context.Context) {
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
for _, job := range s.mockSlurm.GetAllActiveJobs() {
|
|
switch s.mockSlurm.GetJobState(job.JobID) {
|
|
case "PENDING":
|
|
if time.Since(job.SubmitTime) > 3*time.Second {
|
|
s.mockSlurm.SetJobState(job.JobID, "RUNNING")
|
|
s.logger.Info("mock: job PENDING → RUNNING", zap.Int32("job_id", job.JobID))
|
|
_ = s.syncTaskStatusFromSlurm(ctx, job.JobID)
|
|
}
|
|
case "RUNNING":
|
|
if job.StartTime != nil && time.Since(*job.StartTime) > 5*time.Second {
|
|
s.mockSlurm.SetJobState(job.JobID, "COMPLETED")
|
|
s.logger.Info("mock: job RUNNING → COMPLETED", zap.Int32("job_id", job.JobID))
|
|
_ = s.syncTaskStatusFromSlurm(ctx, job.JobID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var slurmStateToTaskStatus = map[string]string{
|
|
"PENDING": model.TaskStatusQueued,
|
|
"RUNNING": model.TaskStatusRunning,
|
|
"COMPLETED": model.TaskStatusCompleted,
|
|
"FAILED": model.TaskStatusFailed,
|
|
"CANCELLED": model.TaskStatusFailed,
|
|
"TIMEOUT": model.TaskStatusFailed,
|
|
"NODE_FAIL": model.TaskStatusFailed,
|
|
}
|
|
|
|
func (s *Server) syncTaskStatusFromSlurm(ctx context.Context, slurmJobID int32) error {
|
|
var tasks []model.Task
|
|
if err := s.db.Where("slurm_job_id = ?", slurmJobID).Find(&tasks).Error; err != nil {
|
|
return err
|
|
}
|
|
slurmState := s.mockSlurm.GetJobState(slurmJobID)
|
|
taskStatus, ok := slurmStateToTaskStatus[slurmState]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
for _, t := range tasks {
|
|
if t.Status != taskStatus {
|
|
s.logger.Info("mock: syncing task status",
|
|
zap.Int64("task_id", t.ID),
|
|
zap.String("old", t.Status),
|
|
zap.String("new", taskStatus),
|
|
)
|
|
_ = s.db.Model(&model.Task{}).Where("id = ?", t.ID).Update("status", taskStatus).Error
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Run starts the HTTP server and blocks until a shutdown signal (SIGINT/SIGTERM)
|
|
// or a server error occurs.
|
|
func (s *Server) Run() error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go s.simulateJobProgress(ctx)
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
s.logger.Info("starting server", zap.String("addr", s.httpServer.Addr))
|
|
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
errCh <- fmt.Errorf("server listen: %w", err)
|
|
}
|
|
}()
|
|
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
s.logger.Error("server exited unexpectedly", zap.Error(err))
|
|
_ = s.Close()
|
|
return err
|
|
case sig := <-quit:
|
|
s.logger.Info("received shutdown signal", zap.String("signal", sig.String()))
|
|
}
|
|
|
|
s.logger.Info("shutting down server...")
|
|
return s.Close()
|
|
}
|
|
|
|
// MockSlurm returns the MockSlurm controller for configuring slurm responses.
|
|
func (s *Server) MockSlurm() *mockslurm.MockSlurm { return s.mockSlurm }
|
|
|
|
// MockSlurmURL returns the URL of the mock slurm HTTP server.
|
|
func (s *Server) MockSlurmURL() string { return s.mockSlurmSrv.URL }
|
|
|
|
// MockSlurmHTTPClient returns the HTTP client wired to the mock slurm server.
|
|
func (s *Server) MockSlurmHTTPClient() *http.Client { return s.mockSlurmSrv.Client() }
|
|
|
|
// ApplicationService returns the wired ApplicationService.
|
|
func (s *Server) ApplicationService() *service.ApplicationService { return s.appSvc }
|