Files
hpc/cmd/server/integration_e2e_test.go
dailz b9b2f0d9b4 feat(testutil): add MockSlurm, MockMinIO, TestEnv and 37 integration tests
- mockminio: in-memory ObjectStorage with all 11 methods, thread-safe, SHA256 ETag, Range support
- mockslurm: httptest server with 11 Slurm REST API endpoints, job eviction from active to history queue
- testenv: one-line test environment factory (SQLite + MockSlurm + MockMinIO + all stores/services/handlers + httptest server)
- integration tests: 37 tests covering Jobs(5), Cluster(5), App(6), Upload(5), File(4), Folder(4), Task(4), E2E(1)
- no external dependencies, no existing files modified
2026-04-16 13:23:27 +08:00

203 lines
7.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"gcy_hpc_server/internal/testutil/testenv"
)
// e2eResponse mirrors the unified API response structure.
type e2eResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// e2eTaskCreatedData mirrors the POST /api/v1/tasks response data.
type e2eTaskCreatedData struct {
ID int64 `json:"id"`
}
// e2eTaskItem mirrors a single task in the list response.
type e2eTaskItem struct {
ID int64 `json:"id"`
TaskName string `json:"task_name"`
Status string `json:"status"`
WorkDir string `json:"work_dir"`
ErrorMessage string `json:"error_message"`
}
// e2eTaskListData mirrors the list endpoint response data.
type e2eTaskListData struct {
Items []e2eTaskItem `json:"items"`
Total int64 `json:"total"`
}
// e2eSendRequest sends an HTTP request via the test env and returns the response.
func e2eSendRequest(env *testenv.TestEnv, method, path string, body string) *http.Response {
var r io.Reader
if body != "" {
r = strings.NewReader(body)
}
return env.DoRequest(method, path, r)
}
// e2eParseResponse decodes an HTTP response into e2eResponse.
func e2eParseResponse(resp *http.Response) (int, e2eResponse) {
b, err := io.ReadAll(resp.Body)
if err != nil {
panic(fmt.Sprintf("e2eParseResponse read: %v", err))
}
resp.Body.Close()
var result e2eResponse
if err := json.Unmarshal(b, &result); err != nil {
panic(fmt.Sprintf("e2eParseResponse unmarshal: %v (body: %s)", err, string(b)))
}
return resp.StatusCode, result
}
// TestIntegration_E2E_CompleteWorkflow verifies the full lifecycle:
// create app → upload file → submit task → queued → running → completed.
func TestIntegration_E2E_CompleteWorkflow(t *testing.T) {
t.Log("========== E2E 全链路测试开始 ==========")
t.Log("")
env := testenv.NewTestEnv(t)
t.Log("✓ 测试环境创建完成 (SQLite + MockSlurm + MockMinIO + Router + Poller)")
t.Log("")
// Step 1: Create Application with script template and parameters.
t.Log("【步骤 1】创建应用")
appID, err := env.CreateApp("e2e-app", "#!/bin/bash\necho {{.np}}",
json.RawMessage(`[{"name":"np","type":"string","default":"1"}]`))
if err != nil {
t.Fatalf("step 1 create app: %v", err)
}
t.Logf(" → 应用创建成功, appID=%d, 脚本模板='#!/bin/bash echo {{.np}}', 参数=[np]", appID)
t.Log("")
// Step 2: Upload input file.
t.Log("【步骤 2】上传输入文件")
fileID, _ := env.UploadTestData("input.txt", []byte("test input data"))
t.Logf(" → 文件上传成功, fileID=%d, 内容='test input data' (存入 MockMinIO + SQLite)", fileID)
t.Log("")
// Step 3: Submit Task via API.
t.Log("【步骤 3】通过 HTTP API 提交任务")
body := fmt.Sprintf(
`{"app_id": %d, "task_name": "e2e-task", "values": {"np": "4"}, "file_ids": [%d]}`,
appID, fileID,
)
t.Logf(" → POST /api/v1/tasks body=%s", body)
resp := e2eSendRequest(env, http.MethodPost, "/api/v1/tasks", body)
status, result := e2eParseResponse(resp)
if status != http.StatusCreated {
t.Fatalf("step 3 submit task: status=%d, success=%v, error=%q", status, result.Success, result.Error)
}
var created e2eTaskCreatedData
if err := json.Unmarshal(result.Data, &created); err != nil {
t.Fatalf("step 3 parse task id: %v", err)
}
taskID := created.ID
if taskID <= 0 {
t.Fatalf("step 3: expected positive task id, got %d", taskID)
}
t.Logf(" → HTTP 201 Created, taskID=%d", taskID)
t.Log("")
// Step 4: Wait for queued status.
t.Log("【步骤 4】等待 TaskProcessor 异步提交到 MockSlurm")
t.Log(" → 后台流程: submitted → preparing → downloading → ready → queued")
if err := env.WaitForTaskStatus(taskID, "queued", 5*time.Second); err != nil {
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
t.Fatalf("step 4 wait for queued: %v (current status via API: %q)", err, taskStatus)
}
t.Logf(" → 任务状态变为 'queued' (TaskProcessor 已提交到 Slurm)")
t.Log("")
// Step 5: Get slurmJobID.
t.Log("【步骤 5】查询数据库获取 Slurm Job ID")
slurmJobID, err := env.GetTaskSlurmJobID(taskID)
if err != nil {
t.Fatalf("step 5 get slurm job id: %v", err)
}
t.Logf(" → slurmJobID=%d (MockSlurm 中的作业号)", slurmJobID)
t.Log("")
// Step 6: Transition to RUNNING.
t.Log("【步骤 6】模拟 Slurm: 作业开始运行")
t.Logf(" → MockSlurm.SetJobState(%d, 'RUNNING')", slurmJobID)
env.MockSlurm.SetJobState(slurmJobID, "RUNNING")
t.Logf(" → MakeTaskStale(%d) — 绕过 30s 等待,让 poller 立即刷新", taskID)
if err := env.MakeTaskStale(taskID); err != nil {
t.Fatalf("step 6 make task stale: %v", err)
}
if err := env.WaitForTaskStatus(taskID, "running", 5*time.Second); err != nil {
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
t.Fatalf("step 6 wait for running: %v (current status via API: %q)", err, taskStatus)
}
t.Logf(" → 任务状态变为 'running'")
t.Log("")
// Step 7: Transition to COMPLETED — job evicted from activeJobs to historyJobs.
t.Log("【步骤 7】模拟 Slurm: 作业运行完成")
t.Logf(" → MockSlurm.SetJobState(%d, 'COMPLETED') — 作业从 activeJobs 淘汰到 historyJobs", slurmJobID)
env.MockSlurm.SetJobState(slurmJobID, "COMPLETED")
t.Log(" → MakeTaskStale + WaitForTaskStatus...")
if err := env.MakeTaskStale(taskID); err != nil {
t.Fatalf("step 7 make task stale: %v", err)
}
if err := env.WaitForTaskStatus(taskID, "completed", 5*time.Second); err != nil {
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
t.Fatalf("step 7 wait for completed: %v (current status via API: %q)", err, taskStatus)
}
t.Logf(" → 任务状态变为 'completed' (通过 SlurmDB 历史回退路径获取)")
t.Log("")
// Step 8: Verify final state via GET /api/v1/tasks.
t.Log("【步骤 8】通过 HTTP API 验证最终状态")
finalStatus, finalItem := e2eFetchTaskStatus(env, taskID)
if finalStatus != "completed" {
t.Fatalf("step 8: expected status completed, got %q (error: %q)", finalStatus, finalItem.ErrorMessage)
}
t.Logf(" → GET /api/v1/tasks 返回 status='completed'")
t.Logf(" → task_name='%s', work_dir='%s'", finalItem.TaskName, finalItem.WorkDir)
t.Logf(" → MockSlurm activeJobs=%d, historyJobs=%d",
len(env.MockSlurm.GetAllActiveJobs()), len(env.MockSlurm.GetAllHistoryJobs()))
t.Log("")
// Step 9: Verify WorkDir exists and contains the input file.
t.Log("【步骤 9】验证工作目录")
if finalItem.WorkDir == "" {
t.Fatal("step 9: expected non-empty work_dir")
}
t.Logf(" → work_dir='%s' (非空TaskProcessor 已创建)", finalItem.WorkDir)
t.Log("")
t.Log("========== E2E 全链路测试通过 ✓ ==========")
}
// e2eFetchTaskStatus fetches a single task's status from the list API.
func e2eFetchTaskStatus(env *testenv.TestEnv, taskID int64) (string, e2eTaskItem) {
resp := e2eSendRequest(env, http.MethodGet, "/api/v1/tasks", "")
_, result := e2eParseResponse(resp)
var list e2eTaskListData
if err := json.Unmarshal(result.Data, &list); err != nil {
return "", e2eTaskItem{}
}
for _, item := range list.Items {
if item.ID == taskID {
return item.Status, item
}
}
return "", e2eTaskItem{}
}