Files
hpc/cmd/client/main.go
2026-04-16 13:24:21 +08:00

663 lines
17 KiB
Go

package main
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
// ── API response types ───────────────────────────────────────────────
type apiResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data"`
Error string `json:"error,omitempty"`
}
type sessionResponse struct {
ID int64 `json:"id"`
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
ChunkSize int64 `json:"chunk_size"`
TotalChunks int `json:"total_chunks"`
SHA256 string `json:"sha256"`
Status string `json:"status"`
UploadedChunks []int `json:"uploaded_chunks"`
}
type fileResponse struct {
ID int64 `json:"id"`
Name string `json:"name"`
Size int64 `json:"size"`
MimeType string `json:"mime_type"`
SHA256 string `json:"sha256"`
CreatedAt string `json:"created_at"`
}
type folderResponse struct {
ID int64 `json:"id"`
Name string `json:"name"`
ParentID *int64 `json:"parent_id,omitempty"`
Path string `json:"path"`
CreatedAt string `json:"created_at"`
}
type listFilesResponse struct {
Files []fileResponse `json:"files"`
Total int64 `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
// ── Helpers ──────────────────────────────────────────────────────────
func formatSize(b int64) string {
const (
KB = 1024
MB = KB * 1024
GB = MB * 1024
)
switch {
case b >= GB:
return fmt.Sprintf("%.2f GB", float64(b)/float64(GB))
case b >= MB:
return fmt.Sprintf("%.2f MB", float64(b)/float64(MB))
case b >= KB:
return fmt.Sprintf("%.2f KB", float64(b)/float64(KB))
default:
return fmt.Sprintf("%d B", b)
}
}
func doRequest(server, method, path string, body io.Reader, contentType string) (*apiResponse, error) {
url := server + path
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("sending request: %w", err)
}
defer resp.Body.Close()
raw, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response: %w", err)
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(raw))
}
var apiResp apiResponse
if err := json.Unmarshal(raw, &apiResp); err != nil {
return nil, fmt.Errorf("parsing response: %w\nbody: %s", err, string(raw))
}
if !apiResp.Success {
return &apiResp, fmt.Errorf("API error: %s", apiResp.Error)
}
return &apiResp, nil
}
func computeSHA256(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}
// ── Commands ─────────────────────────────────────────────────────────
func cmdMkdir(server string, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Usage: client -server <addr> mkdir <name> [-parent <id>]")
os.Exit(1)
}
name := args[0]
var parentID *int64
for i := 1; i+1 < len(args); i += 2 {
if args[i] == "-parent" {
v, err := strconv.ParseInt(args[i+1], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid parent id: %s\n", args[i+1])
os.Exit(1)
}
parentID = &v
}
}
payload := map[string]interface{}{"name": name}
if parentID != nil {
payload["parent_id"] = *parentID
}
body, _ := json.Marshal(payload)
resp, err := doRequest(server, http.MethodPost, "/api/v1/files/folders", bytes.NewReader(body), "application/json")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
var folder folderResponse
if err := json.Unmarshal(resp.Data, &folder); err != nil {
fmt.Fprintf(os.Stderr, "parsing folder response: %v\n", err)
os.Exit(1)
}
fmt.Printf("Folder created: id=%d name=%q path=%q\n", folder.ID, folder.Name, folder.Path)
}
func cmdListFolders(server string, args []string) {
path := "/api/v1/files/folders?"
for i := 0; i+1 < len(args); i += 2 {
if args[i] == "-parent" {
path += "parent_id=" + args[i+1] + "&"
}
}
resp, err := doRequest(server, http.MethodGet, path, nil, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
var folders []folderResponse
if err := json.Unmarshal(resp.Data, &folders); err != nil {
fmt.Fprintf(os.Stderr, "parsing folders response: %v\n", err)
os.Exit(1)
}
if len(folders) == 0 {
fmt.Println("No folders found.")
return
}
fmt.Printf("%-8s %-30s %-10s %s\n", "ID", "Name", "ParentID", "Path")
fmt.Println(strings.Repeat("-", 80))
for _, f := range folders {
pid := "<root>"
if f.ParentID != nil {
pid = strconv.FormatInt(*f.ParentID, 10)
}
fmt.Printf("%-8d %-30s %-10s %s\n", f.ID, f.Name, pid, f.Path)
}
}
func cmdDeleteFolder(server string, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Usage: client -server <addr> delete-folder <id>")
os.Exit(1)
}
id := args[0]
_, err := doRequest(server, http.MethodDelete, "/api/v1/files/folders/"+id, nil, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Printf("Folder %s deleted.\n", id)
}
func cmdUpload(server string, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Usage: client -server <addr> upload <file> [-folder <id>] [-chunk-size <bytes>]")
os.Exit(1)
}
filePath := args[0]
var folderID *int64
chunkSize := int64(8 * 1024 * 1024) // 8 MB default
for i := 1; i+1 < len(args); i += 2 {
switch args[i] {
case "-folder":
v, err := strconv.ParseInt(args[i+1], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid folder id: %s\n", args[i+1])
os.Exit(1)
}
folderID = &v
case "-chunk-size":
v, err := strconv.ParseInt(args[i+1], 10, 64)
if err != nil || v <= 0 {
fmt.Fprintf(os.Stderr, "invalid chunk size: %s\n", args[i+1])
os.Exit(1)
}
chunkSize = v
}
}
start := time.Now()
// Open file and get size
f, err := os.Open(filePath)
if err != nil {
fmt.Fprintf(os.Stderr, "cannot open file: %v\n", err)
os.Exit(1)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
fmt.Fprintf(os.Stderr, "cannot stat file: %v\n", err)
os.Exit(1)
}
fileSize := fi.Size()
fileName := filepath.Base(filePath)
// Compute SHA256
fmt.Printf("Computing SHA256 for %s (%s)...\n", fileName, formatSize(fileSize))
sha, err := computeSHA256(filePath)
if err != nil {
fmt.Fprintf(os.Stderr, "sha256 error: %v\n", err)
os.Exit(1)
}
fmt.Printf("SHA256: %s\n", sha)
// Init upload
payload := map[string]interface{}{
"file_name": fileName,
"file_size": fileSize,
"sha256": sha,
"chunk_size": chunkSize,
}
if folderID != nil {
payload["folder_id"] = *folderID
}
body, _ := json.Marshal(payload)
resp, err := doRequest(server, http.MethodPost, "/api/v1/files/uploads", bytes.NewReader(body), "application/json")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
// Check for dedup (秒传) — response is a file, not a session
var sess sessionResponse
if err := json.Unmarshal(resp.Data, &sess); err == nil && sess.Status != "" && sess.TotalChunks > 0 {
// It's a session, proceed with chunk uploads
} else {
// Try to parse as file (dedup hit)
var fr fileResponse
if err := json.Unmarshal(resp.Data, &fr); err == nil && fr.Name != "" {
elapsed := time.Since(start)
fmt.Printf("⚡ 秒传 (instant upload)! File already exists.\n")
fmt.Printf(" ID: %d\n", fr.ID)
fmt.Printf(" Name: %s\n", fr.Name)
fmt.Printf(" Size: %s\n", formatSize(fr.Size))
fmt.Printf(" SHA256: %s\n", fr.SHA256)
fmt.Printf(" Elapsed: %s\n", elapsed.Truncate(time.Millisecond))
return
}
// If neither, just try the session path anyway
if err := json.Unmarshal(resp.Data, &sess); err != nil {
fmt.Fprintf(os.Stderr, "unexpected response: %s\n", string(resp.Data))
os.Exit(1)
}
}
sessionID := sess.ID
totalChunks := sess.TotalChunks
fmt.Printf("Upload session created: id=%d total_chunks=%d chunk_size=%s\n",
sessionID, totalChunks, formatSize(chunkSize))
// Upload chunks (4 concurrent workers)
const uploadWorkers = 4
type chunkResult struct {
index int
err error
}
work := make(chan int, totalChunks)
results := make(chan chunkResult, totalChunks)
for i := 0; i < totalChunks; i++ {
work <- i
}
close(work)
var uploaded int64 // atomic counter for progress
var wg sync.WaitGroup
for w := 0; w < uploadWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for idx := range work {
offset := int64(idx) * chunkSize
remaining := fileSize - offset
thisChunkSize := chunkSize
if remaining < chunkSize {
thisChunkSize = remaining
}
sectionReader := io.NewSectionReader(f, offset, thisChunkSize)
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)
part, err := writer.CreateFormFile("chunk", fileName)
if err != nil {
results <- chunkResult{index: idx, err: err}
continue
}
if _, err := io.Copy(part, sectionReader); err != nil {
results <- chunkResult{index: idx, err: err}
continue
}
writer.Close()
chunkURL := fmt.Sprintf("%s/api/v1/files/uploads/%d/chunks/%d", server, sessionID, idx)
chunkReq, err := http.NewRequest(http.MethodPut, chunkURL, &buf)
if err != nil {
results <- chunkResult{index: idx, err: err}
continue
}
chunkReq.Header.Set("Content-Type", writer.FormDataContentType())
chunkResp, err := http.DefaultClient.Do(chunkReq)
if err != nil {
results <- chunkResult{index: idx, err: err}
continue
}
raw, _ := io.ReadAll(chunkResp.Body)
chunkResp.Body.Close()
if chunkResp.StatusCode >= 400 {
results <- chunkResult{index: idx, err: fmt.Errorf("HTTP %d: %s", chunkResp.StatusCode, string(raw))}
continue
}
var chunkAPIResp apiResponse
if err := json.Unmarshal(raw, &chunkAPIResp); err == nil && !chunkAPIResp.Success {
results <- chunkResult{index: idx, err: fmt.Errorf("%s", chunkAPIResp.Error)}
continue
}
results <- chunkResult{index: idx, err: nil}
}
}()
}
go func() {
wg.Wait()
close(results)
}()
var firstErr error
for res := range results {
if res.err != nil {
if firstErr == nil {
firstErr = res.err
}
fmt.Fprintf(os.Stderr, "chunk %d failed: %v\n", res.index, res.err)
}
done := atomic.AddInt64(&uploaded, 1)
pct := float64(done) / float64(totalChunks) * 100
doneBytes := done * chunkSize
if doneBytes > fileSize {
doneBytes = fileSize
}
fmt.Printf("\rUploading: %.1f%% (%s / %s)", pct, formatSize(doneBytes), formatSize(fileSize))
}
fmt.Println()
if firstErr != nil {
fmt.Fprintf(os.Stderr, "upload failed: %v\n", firstErr)
os.Exit(1)
}
// Complete upload
completeURL := fmt.Sprintf("/api/v1/files/uploads/%d/complete", sessionID)
resp, err = doRequest(server, http.MethodPost, completeURL, nil, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
var result fileResponse
if err := json.Unmarshal(resp.Data, &result); err != nil {
fmt.Fprintf(os.Stderr, "parsing complete response: %v\n", err)
os.Exit(1)
}
elapsed := time.Since(start)
speed := float64(fileSize) / elapsed.Seconds()
fmt.Println("✓ Upload complete!")
fmt.Printf(" ID: %d\n", result.ID)
fmt.Printf(" Name: %s\n", result.Name)
fmt.Printf(" Size: %s\n", formatSize(result.Size))
fmt.Printf(" SHA256: %s\n", result.SHA256)
fmt.Printf(" Elapsed: %s\n", elapsed.Truncate(time.Millisecond))
fmt.Printf(" Speed: %s/s\n", formatSize(int64(speed)))
}
func cmdDownload(server string, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Usage: client -server <addr> download <file_id> [-o <output>]")
os.Exit(1)
}
fileID := args[0]
var output string
for i := 1; i+1 < len(args); i += 2 {
if args[i] == "-o" {
output = args[i+1]
}
}
start := time.Now()
url := server + "/api/v1/files/" + fileID + "/download"
resp, err := http.Get(url)
if err != nil {
fmt.Fprintf(os.Stderr, "download error: %v\n", err)
os.Exit(1)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
fmt.Fprintf(os.Stderr, "download failed (HTTP %d): %s\n", resp.StatusCode, string(body))
os.Exit(1)
}
// Determine output filename
if output == "" {
// Try Content-Disposition header
if cd := resp.Header.Get("Content-Disposition"); cd != "" {
if idx := strings.Index(cd, "filename="); idx != -1 {
output = strings.Trim(cd[idx+9:], `"`)
}
}
if output == "" {
output = "download_" + fileID
}
}
outFile, err := os.Create(output)
if err != nil {
fmt.Fprintf(os.Stderr, "cannot create output file: %v\n", err)
os.Exit(1)
}
defer outFile.Close()
written, err := io.Copy(outFile, resp.Body)
if err != nil {
fmt.Fprintf(os.Stderr, "download write error: %v\n", err)
os.Exit(1)
}
elapsed := time.Since(start)
speed := float64(written) / elapsed.Seconds()
fmt.Println("✓ Download complete!")
fmt.Printf(" File: %s\n", output)
fmt.Printf(" Size: %s\n", formatSize(written))
fmt.Printf(" Elapsed: %s\n", elapsed.Truncate(time.Millisecond))
fmt.Printf(" Speed: %s/s\n", formatSize(int64(speed)))
}
func cmdListFiles(server string, args []string) {
var folderID, page, pageSize string
for i := 0; i+1 < len(args); i += 2 {
switch args[i] {
case "-folder":
folderID = args[i+1]
case "-page":
page = args[i+1]
case "-page-size":
pageSize = args[i+1]
}
}
path := "/api/v1/files?"
if folderID != "" {
path += "folder_id=" + folderID + "&"
}
if page != "" {
path += "page=" + page + "&"
}
if pageSize != "" {
path += "page_size=" + pageSize + "&"
}
resp, err := doRequest(server, http.MethodGet, path, nil, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
var list listFilesResponse
if err := json.Unmarshal(resp.Data, &list); err != nil {
fmt.Fprintf(os.Stderr, "parsing files response: %v\n", err)
os.Exit(1)
}
if len(list.Files) == 0 {
fmt.Println("No files found.")
return
}
fmt.Printf("%-8s %-30s %-12s %-10s %s\n", "ID", "Name", "Size", "MIME", "Created")
fmt.Println(strings.Repeat("-", 90))
for _, f := range list.Files {
name := f.Name
if len(name) > 28 {
name = name[:25] + "..."
}
fmt.Printf("%-8d %-30s %-12s %-10s %s\n", f.ID, name, formatSize(f.Size), f.MimeType, f.CreatedAt)
}
fmt.Printf("\nTotal: %d Page: %d PageSize: %d\n", list.Total, list.Page, list.PageSize)
}
func cmdDeleteFile(server string, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "Usage: client -server <addr> delete-file <file_id>")
os.Exit(1)
}
fileID := args[0]
_, err := doRequest(server, http.MethodDelete, "/api/v1/files/"+fileID, nil, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Printf("File %s deleted.\n", fileID)
}
// ── Main ─────────────────────────────────────────────────────────────
func usage() {
fmt.Fprintf(os.Stderr, `HPC File Storage Client
Usage: client -server <addr> <command> [args]
Commands:
mkdir <name> [-parent <id>] 创建文件夹
ls-folders [-parent <id>] 列出文件夹
delete-folder <id> 删除文件夹
upload <file> [-folder <id>] [-chunk-size <bytes>] 上传文件(分片)
download <file_id> [-o <output>] 下载文件
ls-files [-folder <id>] [-page <n>] [-page-size <n>] 列出文件
delete-file <file_id> 删除文件
Global flags:
-server <addr> 服务器地址 (默认 http://localhost:8080)
`)
}
func main() {
server := "http://localhost:8080"
var command string
var cmdArgs []string
args := os.Args[1:]
i := 0
for i < len(args) {
if args[i] == "-server" && i+1 < len(args) {
server = args[i+1]
i += 2
continue
}
if command == "" {
command = args[i]
} else {
cmdArgs = append(cmdArgs, args[i])
}
i++
}
if command == "" {
usage()
os.Exit(1)
}
switch command {
case "mkdir":
cmdMkdir(server, cmdArgs)
case "ls-folders":
cmdListFolders(server, cmdArgs)
case "delete-folder":
cmdDeleteFolder(server, cmdArgs)
case "upload":
cmdUpload(server, cmdArgs)
case "download":
cmdDownload(server, cmdArgs)
case "ls-files":
cmdListFiles(server, cmdArgs)
case "delete-file":
cmdDeleteFile(server, cmdArgs)
default:
fmt.Fprintf(os.Stderr, "Unknown command: %s\n", command)
usage()
os.Exit(1)
}
}