- mockminio: in-memory ObjectStorage with all 11 methods, thread-safe, SHA256 ETag, Range support - mockslurm: httptest server with 11 Slurm REST API endpoints, job eviction from active to history queue - testenv: one-line test environment factory (SQLite + MockSlurm + MockMinIO + all stores/services/handlers + httptest server) - integration tests: 37 tests covering Jobs(5), Cluster(5), App(6), Upload(5), File(4), Folder(4), Task(4), E2E(1) - no external dependencies, no existing files modified
203 lines
7.2 KiB
Go
203 lines
7.2 KiB
Go
package main
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"testing"
|
||
"time"
|
||
|
||
"gcy_hpc_server/internal/testutil/testenv"
|
||
)
|
||
|
||
// e2eResponse mirrors the unified API response structure.
|
||
type e2eResponse struct {
|
||
Success bool `json:"success"`
|
||
Data json.RawMessage `json:"data,omitempty"`
|
||
Error string `json:"error,omitempty"`
|
||
}
|
||
|
||
// e2eTaskCreatedData mirrors the POST /api/v1/tasks response data.
|
||
type e2eTaskCreatedData struct {
|
||
ID int64 `json:"id"`
|
||
}
|
||
|
||
// e2eTaskItem mirrors a single task in the list response.
|
||
type e2eTaskItem struct {
|
||
ID int64 `json:"id"`
|
||
TaskName string `json:"task_name"`
|
||
Status string `json:"status"`
|
||
WorkDir string `json:"work_dir"`
|
||
ErrorMessage string `json:"error_message"`
|
||
}
|
||
|
||
// e2eTaskListData mirrors the list endpoint response data.
|
||
type e2eTaskListData struct {
|
||
Items []e2eTaskItem `json:"items"`
|
||
Total int64 `json:"total"`
|
||
}
|
||
|
||
// e2eSendRequest sends an HTTP request via the test env and returns the response.
|
||
func e2eSendRequest(env *testenv.TestEnv, method, path string, body string) *http.Response {
|
||
var r io.Reader
|
||
if body != "" {
|
||
r = strings.NewReader(body)
|
||
}
|
||
return env.DoRequest(method, path, r)
|
||
}
|
||
|
||
// e2eParseResponse decodes an HTTP response into e2eResponse.
|
||
func e2eParseResponse(resp *http.Response) (int, e2eResponse) {
|
||
b, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("e2eParseResponse read: %v", err))
|
||
}
|
||
resp.Body.Close()
|
||
|
||
var result e2eResponse
|
||
if err := json.Unmarshal(b, &result); err != nil {
|
||
panic(fmt.Sprintf("e2eParseResponse unmarshal: %v (body: %s)", err, string(b)))
|
||
}
|
||
return resp.StatusCode, result
|
||
}
|
||
|
||
// TestIntegration_E2E_CompleteWorkflow verifies the full lifecycle:
|
||
// create app → upload file → submit task → queued → running → completed.
|
||
func TestIntegration_E2E_CompleteWorkflow(t *testing.T) {
|
||
t.Log("========== E2E 全链路测试开始 ==========")
|
||
t.Log("")
|
||
env := testenv.NewTestEnv(t)
|
||
t.Log("✓ 测试环境创建完成 (SQLite + MockSlurm + MockMinIO + Router + Poller)")
|
||
t.Log("")
|
||
|
||
// Step 1: Create Application with script template and parameters.
|
||
t.Log("【步骤 1】创建应用")
|
||
appID, err := env.CreateApp("e2e-app", "#!/bin/bash\necho {{.np}}",
|
||
json.RawMessage(`[{"name":"np","type":"string","default":"1"}]`))
|
||
if err != nil {
|
||
t.Fatalf("step 1 create app: %v", err)
|
||
}
|
||
t.Logf(" → 应用创建成功, appID=%d, 脚本模板='#!/bin/bash echo {{.np}}', 参数=[np]", appID)
|
||
t.Log("")
|
||
|
||
// Step 2: Upload input file.
|
||
t.Log("【步骤 2】上传输入文件")
|
||
fileID, _ := env.UploadTestData("input.txt", []byte("test input data"))
|
||
t.Logf(" → 文件上传成功, fileID=%d, 内容='test input data' (存入 MockMinIO + SQLite)", fileID)
|
||
t.Log("")
|
||
|
||
// Step 3: Submit Task via API.
|
||
t.Log("【步骤 3】通过 HTTP API 提交任务")
|
||
body := fmt.Sprintf(
|
||
`{"app_id": %d, "task_name": "e2e-task", "values": {"np": "4"}, "file_ids": [%d]}`,
|
||
appID, fileID,
|
||
)
|
||
t.Logf(" → POST /api/v1/tasks body=%s", body)
|
||
resp := e2eSendRequest(env, http.MethodPost, "/api/v1/tasks", body)
|
||
status, result := e2eParseResponse(resp)
|
||
if status != http.StatusCreated {
|
||
t.Fatalf("step 3 submit task: status=%d, success=%v, error=%q", status, result.Success, result.Error)
|
||
}
|
||
|
||
var created e2eTaskCreatedData
|
||
if err := json.Unmarshal(result.Data, &created); err != nil {
|
||
t.Fatalf("step 3 parse task id: %v", err)
|
||
}
|
||
taskID := created.ID
|
||
if taskID <= 0 {
|
||
t.Fatalf("step 3: expected positive task id, got %d", taskID)
|
||
}
|
||
t.Logf(" → HTTP 201 Created, taskID=%d", taskID)
|
||
t.Log("")
|
||
|
||
// Step 4: Wait for queued status.
|
||
t.Log("【步骤 4】等待 TaskProcessor 异步提交到 MockSlurm")
|
||
t.Log(" → 后台流程: submitted → preparing → downloading → ready → queued")
|
||
if err := env.WaitForTaskStatus(taskID, "queued", 5*time.Second); err != nil {
|
||
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
|
||
t.Fatalf("step 4 wait for queued: %v (current status via API: %q)", err, taskStatus)
|
||
}
|
||
t.Logf(" → 任务状态变为 'queued' (TaskProcessor 已提交到 Slurm)")
|
||
t.Log("")
|
||
|
||
// Step 5: Get slurmJobID.
|
||
t.Log("【步骤 5】查询数据库获取 Slurm Job ID")
|
||
slurmJobID, err := env.GetTaskSlurmJobID(taskID)
|
||
if err != nil {
|
||
t.Fatalf("step 5 get slurm job id: %v", err)
|
||
}
|
||
t.Logf(" → slurmJobID=%d (MockSlurm 中的作业号)", slurmJobID)
|
||
t.Log("")
|
||
|
||
// Step 6: Transition to RUNNING.
|
||
t.Log("【步骤 6】模拟 Slurm: 作业开始运行")
|
||
t.Logf(" → MockSlurm.SetJobState(%d, 'RUNNING')", slurmJobID)
|
||
env.MockSlurm.SetJobState(slurmJobID, "RUNNING")
|
||
t.Logf(" → MakeTaskStale(%d) — 绕过 30s 等待,让 poller 立即刷新", taskID)
|
||
if err := env.MakeTaskStale(taskID); err != nil {
|
||
t.Fatalf("step 6 make task stale: %v", err)
|
||
}
|
||
if err := env.WaitForTaskStatus(taskID, "running", 5*time.Second); err != nil {
|
||
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
|
||
t.Fatalf("step 6 wait for running: %v (current status via API: %q)", err, taskStatus)
|
||
}
|
||
t.Logf(" → 任务状态变为 'running'")
|
||
t.Log("")
|
||
|
||
// Step 7: Transition to COMPLETED — job evicted from activeJobs to historyJobs.
|
||
t.Log("【步骤 7】模拟 Slurm: 作业运行完成")
|
||
t.Logf(" → MockSlurm.SetJobState(%d, 'COMPLETED') — 作业从 activeJobs 淘汰到 historyJobs", slurmJobID)
|
||
env.MockSlurm.SetJobState(slurmJobID, "COMPLETED")
|
||
t.Log(" → MakeTaskStale + WaitForTaskStatus...")
|
||
if err := env.MakeTaskStale(taskID); err != nil {
|
||
t.Fatalf("step 7 make task stale: %v", err)
|
||
}
|
||
if err := env.WaitForTaskStatus(taskID, "completed", 5*time.Second); err != nil {
|
||
taskStatus, _ := e2eFetchTaskStatus(env, taskID)
|
||
t.Fatalf("step 7 wait for completed: %v (current status via API: %q)", err, taskStatus)
|
||
}
|
||
t.Logf(" → 任务状态变为 'completed' (通过 SlurmDB 历史回退路径获取)")
|
||
t.Log("")
|
||
|
||
// Step 8: Verify final state via GET /api/v1/tasks.
|
||
t.Log("【步骤 8】通过 HTTP API 验证最终状态")
|
||
finalStatus, finalItem := e2eFetchTaskStatus(env, taskID)
|
||
if finalStatus != "completed" {
|
||
t.Fatalf("step 8: expected status completed, got %q (error: %q)", finalStatus, finalItem.ErrorMessage)
|
||
}
|
||
t.Logf(" → GET /api/v1/tasks 返回 status='completed'")
|
||
t.Logf(" → task_name='%s', work_dir='%s'", finalItem.TaskName, finalItem.WorkDir)
|
||
t.Logf(" → MockSlurm activeJobs=%d, historyJobs=%d",
|
||
len(env.MockSlurm.GetAllActiveJobs()), len(env.MockSlurm.GetAllHistoryJobs()))
|
||
t.Log("")
|
||
|
||
// Step 9: Verify WorkDir exists and contains the input file.
|
||
t.Log("【步骤 9】验证工作目录")
|
||
if finalItem.WorkDir == "" {
|
||
t.Fatal("step 9: expected non-empty work_dir")
|
||
}
|
||
t.Logf(" → work_dir='%s' (非空,TaskProcessor 已创建)", finalItem.WorkDir)
|
||
t.Log("")
|
||
t.Log("========== E2E 全链路测试通过 ✓ ==========")
|
||
}
|
||
|
||
// e2eFetchTaskStatus fetches a single task's status from the list API.
|
||
func e2eFetchTaskStatus(env *testenv.TestEnv, taskID int64) (string, e2eTaskItem) {
|
||
resp := e2eSendRequest(env, http.MethodGet, "/api/v1/tasks", "")
|
||
_, result := e2eParseResponse(resp)
|
||
|
||
var list e2eTaskListData
|
||
if err := json.Unmarshal(result.Data, &list); err != nil {
|
||
return "", e2eTaskItem{}
|
||
}
|
||
|
||
for _, item := range list.Items {
|
||
if item.ID == taskID {
|
||
return item.Status, item
|
||
}
|
||
}
|
||
return "", e2eTaskItem{}
|
||
}
|