diff --git a/cmd/mockserver/main.go b/cmd/mockserver/main.go new file mode 100644 index 0000000..08b8516 --- /dev/null +++ b/cmd/mockserver/main.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strconv" + "strings" + + "gcy_hpc_server/internal/mockserver" + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/slurm" +) + +func main() { + defaultPort := 8080 + if v := os.Getenv("MOCK_PORT"); v != "" { + if p, err := strconv.Atoi(v); err == nil { + defaultPort = p + } + } + + port := flag.Int("port", defaultPort, "listen port") + seed := flag.Bool("seed", true, "inject seed data") + flag.Parse() + + srv, err := mockserver.New(mockserver.WithPort(*port)) + if err != nil { + log.Fatalf("failed to create server: %v", err) + } + + if *seed { + injectSeedData(srv) + } + + if err := srv.Run(); err != nil { + log.Fatalf("server error: %v", err) + } +} + +func injectSeedData(srv *mockserver.Server) { + client := srv.MockSlurmHTTPClient() + baseURL := srv.MockSlurmURL() + + type jobDef struct { + name string + partition string + states []string + } + + jobs := []jobDef{ + {"gpu-training-job", "gpu", nil}, + {"data-processing", "normal", []string{"RUNNING"}}, + {"benchmark-test", "normal", []string{"RUNNING"}}, + {"model-evaluation", "gpu", []string{"RUNNING", "COMPLETED"}}, + {"failed-script", "normal", []string{"RUNNING", "FAILED"}}, + {"cancelled-job", "gpu", []string{"RUNNING", "CANCELLED"}}, + } + + mockSlurm := srv.MockSlurm() + for _, j := range jobs { + id, err := submitJob(client, baseURL, j.name, j.partition) + if err != nil { + log.Printf("warning: failed to submit job %q: %v", j.name, err) + continue + } + for _, state := range j.states { + mockSlurm.SetJobState(id, state) + } + } + + appSvc := srv.ApplicationService() + ctx := context.Background() + + apps := []model.CreateApplicationRequest{ + { + Name: "图像分类训练", + Description: "基于ResNet的图像分类模型训练", + ScriptTemplate: "#!/bin/bash\n#SBATCH --job-name={{.Name}}\necho 'Running {{.Name}}'", + }, + { + Name: "数据处理流水线", + Description: "ETL数据处理脚本", + ScriptTemplate: "#!/bin/bash\necho 'Processing data'", + }, + } + for _, app := range apps { + if _, err := appSvc.CreateApplication(ctx, &app); err != nil { + log.Printf("warning: failed to create application %q: %v", app.Name, err) + } + } + + log.Println("Seeded 6 jobs (1 PENDING, 2 RUNNING, 1 COMPLETED, 1 FAILED, 1 CANCELLED) and 2 applications") +} + +func submitJob(client *http.Client, baseURL, name, partition string) (int32, error) { + body := fmt.Sprintf( + `{"script":"#!/bin/bash\necho %s","job":{"name":"%s","partition":"%s"}}`, + name, name, partition, + ) + req, err := http.NewRequest("POST", baseURL+"/slurm/v0.0.40/job/submit", strings.NewReader(body)) + if err != nil { + return 0, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return 0, fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, b) + } + + var result slurm.OpenapiJobSubmitResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return 0, fmt.Errorf("decode response: %w", err) + } + if result.JobID == nil { + return 0, fmt.Errorf("no job_id in response") + } + return *result.JobID, nil +} diff --git a/internal/mockserver/server.go b/internal/mockserver/server.go new file mode 100644 index 0000000..9db22e3 --- /dev/null +++ b/internal/mockserver/server.go @@ -0,0 +1,242 @@ +package mockserver + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/signal" + "syscall" + "time" + + "gcy_hpc_server/internal/config" + "gcy_hpc_server/internal/handler" + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/server" + "gcy_hpc_server/internal/service" + "gcy_hpc_server/internal/slurm" + "gcy_hpc_server/internal/store" + "gcy_hpc_server/internal/testutil/mockminio" + "gcy_hpc_server/internal/testutil/mockslurm" + + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +type serverConfig struct { + port int +} + +// Option configures a Server during construction. +type Option func(*serverConfig) + +// WithPort sets the listen port (default 8080). +func WithPort(port int) Option { + return func(c *serverConfig) { c.port = port } +} + +// Server is a standalone mock server that wires all dependencies using +// in-memory SQLite, MockSlurm, and MockMinIO — suitable for integration +// testing or development without external infrastructure. +type Server struct { + httpServer *http.Server + mockSlurmSrv *httptest.Server + mockSlurm *mockslurm.MockSlurm + db *gorm.DB + workDir string + logger *zap.Logger + + taskSvc *service.TaskService + appSvc *service.ApplicationService +} + +// New creates a fully wired mock server. All dependencies are initialized +// in-memory. Call Close() to release resources, or Run() to start serving. +func New(opts ...Option) (*Server, error) { + cfg := &serverConfig{port: 8080} + for _, o := range opts { + o(cfg) + } + + // 1. Logger + logger := zap.NewExample() + + // 2. SQLite in-memory DB + AutoMigrate + dbName := fmt.Sprintf("file:mockserver-%d?mode=memory&cache=shared", time.Now().UnixNano()) + db, err := gorm.Open(sqlite.Open(dbName), &gorm.Config{}) + if err != nil { + return nil, fmt.Errorf("open sqlite: %w", err) + } + sqlDB, err := db.DB() + if err != nil { + return nil, fmt.Errorf("get underlying sql.DB: %w", err) + } + sqlDB.SetMaxOpenConns(1) + if err := db.AutoMigrate( + &model.Application{}, + &model.FileBlob{}, + &model.File{}, + &model.Folder{}, + &model.UploadSession{}, + &model.UploadChunk{}, + &model.Task{}, + ); err != nil { + return nil, fmt.Errorf("auto-migrate: %w", err) + } + + // 3. Temp work directory + workDir, err := os.MkdirTemp("", "mockserver-*") + if err != nil { + return nil, fmt.Errorf("create temp dir: %w", err) + } + + // 4. MockSlurm + mockSlurmSrv, mockSlurm := mockslurm.NewMockSlurmServer() + + // 5. MockMinIO + mockMinIO := mockminio.NewInMemoryStorage() + + // 6. Stores + appStore := store.NewApplicationStore(db) + taskStore := store.NewTaskStore(db) + fileStore := store.NewFileStore(db) + blobStore := store.NewBlobStore(db) + uploadStore := store.NewUploadStore(db) + folderStore := store.NewFolderStore(db) + + // 7. Slurm client + slurmClient, err := slurm.NewClientWithOpts(mockSlurmSrv.URL, slurm.WithHTTPClient(mockSlurmSrv.Client())) + if err != nil { + os.RemoveAll(workDir) + mockSlurmSrv.Close() + return nil, fmt.Errorf("create slurm client: %w", err) + } + + // 8. MinioConfig + minioCfg := config.MinioConfig{ + Bucket: "files", + ChunkSize: 16 << 20, + MaxFileSize: 50 << 30, + MinChunkSize: 5 << 20, + SessionTTL: 48, + } + + // 9. Services (dependency order) + jobSvc := service.NewJobService(slurmClient, logger) + clusterSvc := service.NewClusterService(slurmClient, logger) + folderSvc := service.NewFolderService(folderStore, fileStore, logger) + stagingSvc := service.NewFileStagingService(fileStore, blobStore, mockMinIO, minioCfg.Bucket, logger) + taskSvc := service.NewTaskService(taskStore, appStore, fileStore, blobStore, stagingSvc, jobSvc, workDir, logger) + appSvc := service.NewApplicationService(appStore, jobSvc, workDir, logger) + uploadSvc := service.NewUploadService(mockMinIO, blobStore, fileStore, uploadStore, minioCfg, db, logger) + fileSvc := service.NewFileService(mockMinIO, blobStore, fileStore, folderStore, minioCfg.Bucket, db, logger) + + // 10. Handlers + jobH := handler.NewJobHandler(jobSvc, logger) + clusterH := handler.NewClusterHandler(clusterSvc, logger) + appH := handler.NewApplicationHandler(appSvc, logger) + uploadH := handler.NewUploadHandler(uploadSvc, logger) + fileH := handler.NewFileHandler(fileSvc, logger) + folderH := handler.NewFolderHandler(folderSvc, logger) + taskH := handler.NewTaskHandler(taskSvc, logger) + + // 11. Router + router := server.NewRouter(jobH, clusterH, appH, uploadH, fileH, folderH, taskH, logger) + + // 12. HTTP server + httpServer := &http.Server{ + Addr: fmt.Sprintf(":%d", cfg.port), + Handler: router, + } + + return &Server{ + httpServer: httpServer, + mockSlurmSrv: mockSlurmSrv, + mockSlurm: mockSlurm, + db: db, + workDir: workDir, + logger: logger, + taskSvc: taskSvc, + appSvc: appSvc, + }, nil +} + +// Close releases all resources: HTTP server, mock slurm, database, temp directory. +func (s *Server) Close() error { + var errs []error + + if s.taskSvc != nil { + s.taskSvc.StopProcessor() + } + + if s.httpServer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.httpServer.Shutdown(ctx); err != nil && err != http.ErrServerClosed { + errs = append(errs, fmt.Errorf("shutdown http server: %w", err)) + } + } + + if s.mockSlurmSrv != nil { + s.mockSlurmSrv.Close() + } + + if s.db != nil { + sqlDB, err := s.db.DB() + if err != nil { + errs = append(errs, fmt.Errorf("get underlying sql.DB: %w", err)) + } else if err := sqlDB.Close(); err != nil { + errs = append(errs, fmt.Errorf("close database: %w", err)) + } + } + + if s.workDir != "" { + if err := os.RemoveAll(s.workDir); err != nil { + errs = append(errs, fmt.Errorf("remove work dir: %w", err)) + } + } + + return errors.Join(errs...) +} + +// Run starts the HTTP server and blocks until a shutdown signal (SIGINT/SIGTERM) +// or a server error occurs. +func (s *Server) Run() error { + errCh := make(chan error, 1) + go func() { + s.logger.Info("starting server", zap.String("addr", s.httpServer.Addr)) + if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errCh <- fmt.Errorf("server listen: %w", err) + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + select { + case err := <-errCh: + s.logger.Error("server exited unexpectedly", zap.Error(err)) + _ = s.Close() + return err + case sig := <-quit: + s.logger.Info("received shutdown signal", zap.String("signal", sig.String())) + } + + s.logger.Info("shutting down server...") + return s.Close() +} + +// MockSlurm returns the MockSlurm controller for configuring slurm responses. +func (s *Server) MockSlurm() *mockslurm.MockSlurm { return s.mockSlurm } + +// MockSlurmURL returns the URL of the mock slurm HTTP server. +func (s *Server) MockSlurmURL() string { return s.mockSlurmSrv.URL } + +// MockSlurmHTTPClient returns the HTTP client wired to the mock slurm server. +func (s *Server) MockSlurmHTTPClient() *http.Client { return s.mockSlurmSrv.Client() } + +// ApplicationService returns the wired ApplicationService. +func (s *Server) ApplicationService() *service.ApplicationService { return s.appSvc }