From f0847d397890dd1bde024168fcd2e8ee23f0c3c4 Mon Sep 17 00:00:00 2001 From: dailz Date: Wed, 15 Apr 2026 09:23:09 +0800 Subject: [PATCH] feat(service): add upload, download, file, and folder services Add UploadService (dedup, chunk lifecycle, ComposeObject), DownloadService (Range support), FileService (ref counting), FolderService (path validation). Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- internal/service/download_service.go | 98 ++++ internal/service/download_service_test.go | 260 +++++++++ internal/service/file_service.go | 178 ++++++ internal/service/file_service_test.go | 484 +++++++++++++++ internal/service/folder_service.go | 142 +++++ internal/service/folder_service_test.go | 230 ++++++++ internal/service/upload_service.go | 441 ++++++++++++++ internal/service/upload_service_test.go | 678 ++++++++++++++++++++++ 8 files changed, 2511 insertions(+) create mode 100644 internal/service/download_service.go create mode 100644 internal/service/download_service_test.go create mode 100644 internal/service/file_service.go create mode 100644 internal/service/file_service_test.go create mode 100644 internal/service/folder_service.go create mode 100644 internal/service/folder_service_test.go create mode 100644 internal/service/upload_service.go create mode 100644 internal/service/upload_service_test.go diff --git a/internal/service/download_service.go b/internal/service/download_service.go new file mode 100644 index 0000000..69eb7a2 --- /dev/null +++ b/internal/service/download_service.go @@ -0,0 +1,98 @@ +package service + +import ( + "context" + "fmt" + "io" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/server" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" +) + +// DownloadService handles file downloads with streaming and Range support. +type DownloadService struct { + storage storage.ObjectStorage + blobStore *store.BlobStore + fileStore *store.FileStore + bucket string + logger *zap.Logger +} + +// NewDownloadService creates a new DownloadService. +func NewDownloadService(storage storage.ObjectStorage, blobStore *store.BlobStore, fileStore *store.FileStore, bucket string, logger *zap.Logger) *DownloadService { + return &DownloadService{ + storage: storage, + blobStore: blobStore, + fileStore: fileStore, + bucket: bucket, + logger: logger, + } +} + +// Download returns a stream reader for the given file, optionally limited to a byte range. +// Returns (reader, file, blob, start, end, error). +func (s *DownloadService) Download(ctx context.Context, fileID int64, rangeHeader string) (io.ReadCloser, *model.File, *model.FileBlob, int64, int64, error) { + file, err := s.fileStore.GetByID(ctx, fileID) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("get file: %w", err) + } + if file == nil { + return nil, nil, nil, 0, 0, fmt.Errorf("file not found") + } + + blob, err := s.blobStore.GetBySHA256(ctx, file.BlobSHA256) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("get blob: %w", err) + } + if blob == nil { + return nil, nil, nil, 0, 0, fmt.Errorf("blob not found") + } + + var start, end int64 + if rangeHeader != "" { + start, end, err = server.ParseRange(rangeHeader, blob.FileSize) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("parse range: %w", err) + } + } else { + start = 0 + end = blob.FileSize - 1 + } + + opts := storage.GetOptions{ + Start: &start, + End: &end, + } + + reader, _, err := s.storage.GetObject(ctx, s.bucket, blob.MinioKey, opts) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("get object: %w", err) + } + + return reader, file, blob, start, end, nil +} + +// GetFileMetadata returns the file and its associated blob metadata. +func (s *DownloadService) GetFileMetadata(ctx context.Context, fileID int64) (*model.File, *model.FileBlob, error) { + file, err := s.fileStore.GetByID(ctx, fileID) + if err != nil { + return nil, nil, fmt.Errorf("get file: %w", err) + } + if file == nil { + return nil, nil, fmt.Errorf("file not found") + } + + blob, err := s.blobStore.GetBySHA256(ctx, file.BlobSHA256) + if err != nil { + return nil, nil, fmt.Errorf("get blob: %w", err) + } + if blob == nil { + return nil, nil, fmt.Errorf("blob not found") + } + + return file, blob, nil +} diff --git a/internal/service/download_service_test.go b/internal/service/download_service_test.go new file mode 100644 index 0000000..39562cd --- /dev/null +++ b/internal/service/download_service_test.go @@ -0,0 +1,260 @@ +package service + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + gormlogger "gorm.io/gorm/logger" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +type mockDownloadStorage struct { + getObjectFn func(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) +} + +func (m *mockDownloadStorage) PutObject(_ context.Context, _ string, _ string, _ io.Reader, _ int64, _ storage.PutObjectOptions) (storage.UploadInfo, error) { + return storage.UploadInfo{}, nil +} + +func (m *mockDownloadStorage) GetObject(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if m.getObjectFn != nil { + return m.getObjectFn(ctx, bucket, key, opts) + } + return nil, storage.ObjectInfo{}, nil +} + +func (m *mockDownloadStorage) ComposeObject(_ context.Context, _ string, _ string, _ []string) (storage.UploadInfo, error) { + return storage.UploadInfo{}, nil +} + +func (m *mockDownloadStorage) AbortMultipartUpload(_ context.Context, _ string, _ string, _ string) error { + return nil +} + +func (m *mockDownloadStorage) RemoveIncompleteUpload(_ context.Context, _ string, _ string) error { + return nil +} + +func (m *mockDownloadStorage) RemoveObject(_ context.Context, _ string, _ string, _ storage.RemoveObjectOptions) error { + return nil +} + +func (m *mockDownloadStorage) ListObjects(_ context.Context, _ string, _ string, _ bool) ([]storage.ObjectInfo, error) { + return nil, nil +} + +func (m *mockDownloadStorage) RemoveObjects(_ context.Context, _ string, _ []string, _ storage.RemoveObjectsOptions) error { + return nil +} + +func (m *mockDownloadStorage) BucketExists(_ context.Context, _ string) (bool, error) { + return true, nil +} + +func (m *mockDownloadStorage) MakeBucket(_ context.Context, _ string, _ storage.MakeBucketOptions) error { + return nil +} + +func (m *mockDownloadStorage) StatObject(_ context.Context, _ string, _ string, _ storage.StatObjectOptions) (storage.ObjectInfo, error) { + return storage.ObjectInfo{}, nil +} + +func setupDownloadService(t *testing.T) (*DownloadService, *store.FileStore, *store.BlobStore, *mockDownloadStorage) { + t.Helper() + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: gormlogger.Default.LogMode(gormlogger.Silent), + }) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.File{}, &model.FileBlob{}); err != nil { + t.Fatalf("auto migrate: %v", err) + } + + mockStorage := &mockDownloadStorage{} + blobStore := store.NewBlobStore(db) + fileStore := store.NewFileStore(db) + svc := NewDownloadService(mockStorage, blobStore, fileStore, "test-bucket", zap.NewNop()) + + return svc, fileStore, blobStore, mockStorage +} + +func createTestFileAndBlob(t *testing.T, fileStore *store.FileStore, blobStore *store.BlobStore) (*model.File, *model.FileBlob) { + t.Helper() + + blob := &model.FileBlob{ + SHA256: "abc123def456abc123def456abc123def456abc123def456abc123def456abcd", + MinioKey: "chunks/session1/part-0", + FileSize: 5000, + MimeType: "application/octet-stream", + RefCount: 1, + } + if err := blobStore.Create(context.Background(), blob); err != nil { + t.Fatalf("create blob: %v", err) + } + + file := &model.File{ + Name: "test.dat", + BlobSHA256: blob.SHA256, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := fileStore.Create(context.Background(), file); err != nil { + t.Fatalf("create file: %v", err) + } + + return file, blob +} + +func TestDownload_FullFile(t *testing.T) { + svc, fileStore, blobStore, mockStorage := setupDownloadService(t) + file, blob := createTestFileAndBlob(t, fileStore, blobStore) + + content := make([]byte, blob.FileSize) + for i := range content { + content[i] = byte(i % 256) + } + + mockStorage.getObjectFn = func(_ context.Context, _, _ string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if opts.Start == nil || opts.End == nil { + t.Fatal("expected Start and End to be set") + } + if *opts.Start != 0 { + t.Fatalf("expected start=0, got %d", *opts.Start) + } + if *opts.End != blob.FileSize-1 { + t.Fatalf("expected end=%d, got %d", blob.FileSize-1, *opts.End) + } + return io.NopCloser(bytes.NewReader(content)), storage.ObjectInfo{Size: blob.FileSize}, nil + } + + reader, gotFile, gotBlob, start, end, err := svc.Download(context.Background(), file.ID, "") + if err != nil { + t.Fatalf("Download: %v", err) + } + defer reader.Close() + + if gotFile.ID != file.ID { + t.Fatalf("expected file ID %d, got %d", file.ID, gotFile.ID) + } + if gotBlob.SHA256 != blob.SHA256 { + t.Fatalf("expected blob SHA256 %s, got %s", blob.SHA256, gotBlob.SHA256) + } + if start != 0 { + t.Fatalf("expected start=0, got %d", start) + } + if end != blob.FileSize-1 { + t.Fatalf("expected end=%d, got %d", blob.FileSize-1, end) + } + + read, _ := io.ReadAll(reader) + if int64(len(read)) != blob.FileSize { + t.Fatalf("expected %d bytes, got %d", blob.FileSize, len(read)) + } +} + +func TestDownload_WithRange(t *testing.T) { + svc, fileStore, blobStore, mockStorage := setupDownloadService(t) + file, _ := createTestFileAndBlob(t, fileStore, blobStore) + + content := make([]byte, 1024) + for i := range content { + content[i] = byte(i % 256) + } + + mockStorage.getObjectFn = func(_ context.Context, _, _ string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if opts.Start == nil || opts.End == nil { + t.Fatal("expected Start and End to be set") + } + if *opts.Start != 0 { + t.Fatalf("expected start=0, got %d", *opts.Start) + } + if *opts.End != 1023 { + t.Fatalf("expected end=1023, got %d", *opts.End) + } + return io.NopCloser(bytes.NewReader(content[:1024])), storage.ObjectInfo{Size: 1024}, nil + } + + reader, _, _, start, end, err := svc.Download(context.Background(), file.ID, "bytes=0-1023") + if err != nil { + t.Fatalf("Download: %v", err) + } + defer reader.Close() + + if start != 0 { + t.Fatalf("expected start=0, got %d", start) + } + if end != 1023 { + t.Fatalf("expected end=1023, got %d", end) + } + + read, _ := io.ReadAll(reader) + if len(read) != 1024 { + t.Fatalf("expected 1024 bytes, got %d", len(read)) + } +} + +func TestDownload_FileNotFound(t *testing.T) { + svc, _, _, _ := setupDownloadService(t) + + _, _, _, _, _, err := svc.Download(context.Background(), 99999, "") + if err == nil { + t.Fatal("expected error for missing file") + } + if err.Error() != "file not found" { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestDownload_BlobNotFound(t *testing.T) { + svc, fileStore, _, _ := setupDownloadService(t) + + file := &model.File{ + Name: "orphan.dat", + BlobSHA256: "nonexistent_hash_0000000000000000000000000000000000000000", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := fileStore.Create(context.Background(), file); err != nil { + t.Fatalf("create file: %v", err) + } + + _, _, _, _, _, err := svc.Download(context.Background(), file.ID, "") + if err == nil { + t.Fatal("expected error for missing blob") + } + if err.Error() != "blob not found" { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestGetFileMetadata(t *testing.T) { + svc, fileStore, blobStore, _ := setupDownloadService(t) + file, _ := createTestFileAndBlob(t, fileStore, blobStore) + + gotFile, gotBlob, err := svc.GetFileMetadata(context.Background(), file.ID) + if err != nil { + t.Fatalf("GetFileMetadata: %v", err) + } + + if gotFile.ID != file.ID { + t.Fatalf("expected file ID %d, got %d", file.ID, gotFile.ID) + } + if gotBlob.FileSize != 5000 { + t.Fatalf("expected file size 5000, got %d", gotBlob.FileSize) + } + if gotBlob.MimeType != "application/octet-stream" { + t.Fatalf("expected mime type application/octet-stream, got %s", gotBlob.MimeType) + } +} diff --git a/internal/service/file_service.go b/internal/service/file_service.go new file mode 100644 index 0000000..586ae92 --- /dev/null +++ b/internal/service/file_service.go @@ -0,0 +1,178 @@ +package service + +import ( + "context" + "fmt" + "io" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/server" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + "gorm.io/gorm" +) + +// FileService handles file listing, metadata, download, and deletion operations. +type FileService struct { + storage storage.ObjectStorage + blobStore *store.BlobStore + fileStore *store.FileStore + bucket string + db *gorm.DB + logger *zap.Logger +} + +// NewFileService creates a new FileService. +func NewFileService(storage storage.ObjectStorage, blobStore *store.BlobStore, fileStore *store.FileStore, bucket string, db *gorm.DB, logger *zap.Logger) *FileService { + return &FileService{ + storage: storage, + blobStore: blobStore, + fileStore: fileStore, + bucket: bucket, + db: db, + logger: logger, + } +} + +// ListFiles returns a paginated list of files, optionally filtered by folder or search query. +func (s *FileService) ListFiles(ctx context.Context, folderID *int64, page, pageSize int, search string) ([]model.FileResponse, int64, error) { + var files []model.File + var total int64 + var err error + + if search != "" { + files, total, err = s.fileStore.Search(ctx, search, page, pageSize) + } else { + files, total, err = s.fileStore.List(ctx, folderID, page, pageSize) + } + if err != nil { + return nil, 0, fmt.Errorf("list files: %w", err) + } + + responses := make([]model.FileResponse, 0, len(files)) + for _, f := range files { + blob, err := s.blobStore.GetBySHA256(ctx, f.BlobSHA256) + if err != nil { + return nil, 0, fmt.Errorf("get blob for file %d: %w", f.ID, err) + } + if blob == nil { + return nil, 0, fmt.Errorf("blob not found for file %d", f.ID) + } + + responses = append(responses, model.FileResponse{ + ID: f.ID, + Name: f.Name, + FolderID: f.FolderID, + Size: blob.FileSize, + MimeType: blob.MimeType, + SHA256: f.BlobSHA256, + CreatedAt: f.CreatedAt, + UpdatedAt: f.UpdatedAt, + }) + } + + return responses, total, nil +} + +// GetFileMetadata returns the file and its associated blob metadata. +func (s *FileService) GetFileMetadata(ctx context.Context, fileID int64) (*model.File, *model.FileBlob, error) { + file, err := s.fileStore.GetByID(ctx, fileID) + if err != nil { + return nil, nil, fmt.Errorf("get file: %w", err) + } + if file == nil { + return nil, nil, fmt.Errorf("file not found: %d", fileID) + } + + blob, err := s.blobStore.GetBySHA256(ctx, file.BlobSHA256) + if err != nil { + return nil, nil, fmt.Errorf("get blob: %w", err) + } + if blob == nil { + return nil, nil, fmt.Errorf("blob not found for file %d", fileID) + } + + return file, blob, nil +} + +// DownloadFile returns a reader for the file content, along with file and blob metadata. +// If rangeHeader is non-empty, it parses the range and returns partial content. +func (s *FileService) DownloadFile(ctx context.Context, fileID int64, rangeHeader string) (io.ReadCloser, *model.File, *model.FileBlob, int64, int64, error) { + file, blob, err := s.GetFileMetadata(ctx, fileID) + if err != nil { + return nil, nil, nil, 0, 0, err + } + + var start, end int64 + if rangeHeader != "" { + start, end, err = server.ParseRange(rangeHeader, blob.FileSize) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("parse range: %w", err) + } + } else { + start = 0 + end = blob.FileSize - 1 + } + + reader, _, err := s.storage.GetObject(ctx, s.bucket, blob.MinioKey, storage.GetOptions{ + Start: &start, + End: &end, + }) + if err != nil { + return nil, nil, nil, 0, 0, fmt.Errorf("get object: %w", err) + } + + return reader, file, blob, start, end, nil +} + +// DeleteFile soft-deletes a file. If no other active files reference the same blob, +// it decrements the blob ref count and removes the object from storage when ref count reaches 0. +func (s *FileService) DeleteFile(ctx context.Context, fileID int64) error { + return s.db.Transaction(func(tx *gorm.DB) error { + txFileStore := store.NewFileStore(tx) + txBlobStore := store.NewBlobStore(tx) + + blobSHA256, err := txFileStore.GetBlobSHA256ByID(ctx, fileID) + if err != nil { + return fmt.Errorf("get blob sha256: %w", err) + } + if blobSHA256 == "" { + return fmt.Errorf("file not found: %d", fileID) + } + + if err := tx.Delete(&model.File{}, fileID).Error; err != nil { + return fmt.Errorf("soft delete file: %w", err) + } + + activeCount, err := txFileStore.CountByBlobSHA256(ctx, blobSHA256) + if err != nil { + return fmt.Errorf("count active refs: %w", err) + } + + if activeCount == 0 { + newRefCount, err := txBlobStore.DecrementRef(ctx, blobSHA256) + if err != nil { + return fmt.Errorf("decrement ref: %w", err) + } + + if newRefCount == 0 { + blob, err := txBlobStore.GetBySHA256(ctx, blobSHA256) + if err != nil { + return fmt.Errorf("get blob for cleanup: %w", err) + } + if blob != nil { + if err := s.storage.RemoveObject(ctx, s.bucket, blob.MinioKey, storage.RemoveObjectOptions{}); err != nil { + return fmt.Errorf("remove object: %w", err) + } + if err := txBlobStore.Delete(ctx, blobSHA256); err != nil { + return fmt.Errorf("delete blob: %w", err) + } + } + } + } + + return nil + }) +} diff --git a/internal/service/file_service_test.go b/internal/service/file_service_test.go new file mode 100644 index 0000000..1f7875d --- /dev/null +++ b/internal/service/file_service_test.go @@ -0,0 +1,484 @@ +package service + +import ( + "bytes" + "context" + "errors" + "io" + "strings" + "testing" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +type mockFileStorage struct { + getObjectFn func(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) + removeObjectFn func(ctx context.Context, bucket, key string, opts storage.RemoveObjectOptions) error +} + +func (m *mockFileStorage) PutObject(_ context.Context, _, _ string, _ io.Reader, _ int64, _ storage.PutObjectOptions) (storage.UploadInfo, error) { + return storage.UploadInfo{}, nil +} + +func (m *mockFileStorage) GetObject(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if m.getObjectFn != nil { + return m.getObjectFn(ctx, bucket, key, opts) + } + return io.NopCloser(strings.NewReader("data")), storage.ObjectInfo{}, nil +} + +func (m *mockFileStorage) ComposeObject(_ context.Context, _ string, _ string, _ []string) (storage.UploadInfo, error) { + return storage.UploadInfo{}, nil +} + +func (m *mockFileStorage) AbortMultipartUpload(_ context.Context, _, _, _ string) error { + return nil +} + +func (m *mockFileStorage) RemoveIncompleteUpload(_ context.Context, _, _ string) error { + return nil +} + +func (m *mockFileStorage) RemoveObject(ctx context.Context, bucket, key string, opts storage.RemoveObjectOptions) error { + if m.removeObjectFn != nil { + return m.removeObjectFn(ctx, bucket, key, opts) + } + return nil +} + +func (m *mockFileStorage) ListObjects(_ context.Context, _ string, _ string, _ bool) ([]storage.ObjectInfo, error) { + return nil, nil +} + +func (m *mockFileStorage) RemoveObjects(_ context.Context, _ string, _ []string, _ storage.RemoveObjectsOptions) error { + return nil +} + +func (m *mockFileStorage) BucketExists(_ context.Context, _ string) (bool, error) { + return true, nil +} + +func (m *mockFileStorage) MakeBucket(_ context.Context, _ string, _ storage.MakeBucketOptions) error { + return nil +} + +func (m *mockFileStorage) StatObject(_ context.Context, _, _ string, _ storage.StatObjectOptions) (storage.ObjectInfo, error) { + return storage.ObjectInfo{}, nil +} + +func setupFileTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.File{}, &model.FileBlob{}); err != nil { + t.Fatalf("migrate: %v", err) + } + return db +} + +func setupFileService(t *testing.T) (*FileService, *mockFileStorage, *gorm.DB) { + t.Helper() + db := setupFileTestDB(t) + ms := &mockFileStorage{} + svc := NewFileService(ms, store.NewBlobStore(db), store.NewFileStore(db), "test-bucket", db, zap.NewNop()) + return svc, ms, db +} + +func createTestBlob(t *testing.T, db *gorm.DB, sha256, minioKey, mimeType string, fileSize int64, refCount int) *model.FileBlob { + t.Helper() + blob := &model.FileBlob{ + SHA256: sha256, + MinioKey: minioKey, + FileSize: fileSize, + MimeType: mimeType, + RefCount: refCount, + } + if err := db.Create(blob).Error; err != nil { + t.Fatalf("create blob: %v", err) + } + return blob +} + +func createTestFile(t *testing.T, db *gorm.DB, name, blobSHA256 string, folderID *int64) *model.File { + t.Helper() + file := &model.File{ + Name: name, + FolderID: folderID, + BlobSHA256: blobSHA256, + } + if err := db.Create(file).Error; err != nil { + t.Fatalf("create file: %v", err) + } + return file +} + +func TestListFiles_Empty(t *testing.T) { + svc, _, _ := setupFileService(t) + + files, total, err := svc.ListFiles(context.Background(), nil, 1, 10, "") + if err != nil { + t.Fatalf("ListFiles: %v", err) + } + if total != 0 { + t.Errorf("expected total 0, got %d", total) + } + if len(files) != 0 { + t.Errorf("expected empty files, got %d", len(files)) + } +} + +func TestListFiles_WithFiles(t *testing.T) { + svc, _, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256abc", "blobs/abc", "text/plain", 1024, 2) + createTestFile(t, db, "file1.txt", blob.SHA256, nil) + createTestFile(t, db, "file2.txt", blob.SHA256, nil) + + files, total, err := svc.ListFiles(context.Background(), nil, 1, 10, "") + if err != nil { + t.Fatalf("ListFiles: %v", err) + } + if total != 2 { + t.Errorf("expected total 2, got %d", total) + } + if len(files) != 2 { + t.Fatalf("expected 2 files, got %d", len(files)) + } + for _, f := range files { + if f.Size != 1024 { + t.Errorf("expected size 1024, got %d", f.Size) + } + if f.MimeType != "text/plain" { + t.Errorf("expected mime text/plain, got %s", f.MimeType) + } + if f.SHA256 != "sha256abc" { + t.Errorf("expected sha256 sha256abc, got %s", f.SHA256) + } + } +} + +func TestListFiles_Search(t *testing.T) { + svc, _, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256search", "blobs/search", "image/png", 2048, 1) + createTestFile(t, db, "photo.png", blob.SHA256, nil) + createTestFile(t, db, "document.pdf", "sha256other", nil) + createTestBlob(t, db, "sha256other", "blobs/other", "application/pdf", 512, 1) + + files, total, err := svc.ListFiles(context.Background(), nil, 1, 10, "photo") + if err != nil { + t.Fatalf("ListFiles: %v", err) + } + if total != 1 { + t.Errorf("expected total 1, got %d", total) + } + if len(files) != 1 { + t.Fatalf("expected 1 file, got %d", len(files)) + } + if files[0].Name != "photo.png" { + t.Errorf("expected photo.png, got %s", files[0].Name) + } +} + +func TestGetFileMetadata_Found(t *testing.T) { + svc, _, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256meta", "blobs/meta", "application/json", 42, 1) + file := createTestFile(t, db, "data.json", blob.SHA256, nil) + + gotFile, gotBlob, err := svc.GetFileMetadata(context.Background(), file.ID) + if err != nil { + t.Fatalf("GetFileMetadata: %v", err) + } + if gotFile.ID != file.ID { + t.Errorf("expected file id %d, got %d", file.ID, gotFile.ID) + } + if gotBlob.SHA256 != blob.SHA256 { + t.Errorf("expected blob sha256 %s, got %s", blob.SHA256, gotBlob.SHA256) + } +} + +func TestGetFileMetadata_NotFound(t *testing.T) { + svc, _, _ := setupFileService(t) + + _, _, err := svc.GetFileMetadata(context.Background(), 9999) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestDownloadFile_Full(t *testing.T) { + svc, ms, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256dl", "blobs/dl", "text/plain", 100, 1) + file := createTestFile(t, db, "download.txt", blob.SHA256, nil) + + content := []byte("hello world") + ms.getObjectFn = func(_ context.Context, _ string, _ string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if opts.Start == nil || opts.End == nil { + t.Error("expected start and end to be set") + } else if *opts.Start != 0 || *opts.End != 99 { + t.Errorf("expected range 0-99, got %d-%d", *opts.Start, *opts.End) + } + return io.NopCloser(bytes.NewReader(content)), storage.ObjectInfo{}, nil + } + + reader, gotFile, gotBlob, start, end, err := svc.DownloadFile(context.Background(), file.ID, "") + if err != nil { + t.Fatalf("DownloadFile: %v", err) + } + defer reader.Close() + + if gotFile.ID != file.ID { + t.Errorf("expected file id %d, got %d", file.ID, gotFile.ID) + } + if gotBlob.SHA256 != blob.SHA256 { + t.Errorf("expected blob sha256 %s, got %s", blob.SHA256, gotBlob.SHA256) + } + if start != 0 { + t.Errorf("expected start 0, got %d", start) + } + if end != 99 { + t.Errorf("expected end 99, got %d", end) + } + + data, _ := io.ReadAll(reader) + if string(data) != "hello world" { + t.Errorf("expected 'hello world', got %q", string(data)) + } +} + +func TestDownloadFile_WithRange(t *testing.T) { + svc, ms, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256range", "blobs/range", "text/plain", 1000, 1) + file := createTestFile(t, db, "range.txt", blob.SHA256, nil) + + ms.getObjectFn = func(_ context.Context, _ string, _ string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if opts.Start != nil && *opts.Start != 100 { + t.Errorf("expected start 100, got %d", *opts.Start) + } + if opts.End != nil && *opts.End != 199 { + t.Errorf("expected end 199, got %d", *opts.End) + } + return io.NopCloser(strings.NewReader("partial")), storage.ObjectInfo{}, nil + } + + reader, _, _, start, end, err := svc.DownloadFile(context.Background(), file.ID, "bytes=100-199") + if err != nil { + t.Fatalf("DownloadFile: %v", err) + } + defer reader.Close() + + if start != 100 { + t.Errorf("expected start 100, got %d", start) + } + if end != 199 { + t.Errorf("expected end 199, got %d", end) + } +} + +func TestDeleteFile_LastRef(t *testing.T) { + svc, ms, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256del", "blobs/del", "text/plain", 50, 1) + file := createTestFile(t, db, "delete-me.txt", blob.SHA256, nil) + + removed := false + ms.removeObjectFn = func(_ context.Context, bucket, key string, _ storage.RemoveObjectOptions) error { + if bucket != "test-bucket" { + t.Errorf("expected bucket 'test-bucket', got %q", bucket) + } + if key != "blobs/del" { + t.Errorf("expected key 'blobs/del', got %q", key) + } + removed = true + return nil + } + + if err := svc.DeleteFile(context.Background(), file.ID); err != nil { + t.Fatalf("DeleteFile: %v", err) + } + + if !removed { + t.Error("expected RemoveObject to be called") + } + + var count int64 + db.Model(&model.FileBlob{}).Where("sha256 = ?", "sha256del").Count(&count) + if count != 0 { + t.Errorf("expected blob to be hard deleted, found %d records", count) + } + + var fileCount int64 + db.Unscoped().Model(&model.File{}).Where("id = ?", file.ID).Count(&fileCount) + if fileCount != 1 { + t.Errorf("expected file to still exist (soft deleted), found %d", fileCount) + } + + var deletedFile model.File + db.Unscoped().First(&deletedFile, file.ID) + if deletedFile.DeletedAt.Time.IsZero() { + t.Error("expected deleted_at to be set") + } +} + +func TestDeleteFile_OtherRefsExist(t *testing.T) { + svc, ms, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256multi", "blobs/multi", "text/plain", 50, 3) + file1 := createTestFile(t, db, "ref1.txt", blob.SHA256, nil) + createTestFile(t, db, "ref2.txt", blob.SHA256, nil) + createTestFile(t, db, "ref3.txt", blob.SHA256, nil) + + removed := false + ms.removeObjectFn = func(_ context.Context, _, _ string, _ storage.RemoveObjectOptions) error { + removed = true + return nil + } + + if err := svc.DeleteFile(context.Background(), file1.ID); err != nil { + t.Fatalf("DeleteFile: %v", err) + } + + if removed { + t.Error("expected RemoveObject NOT to be called since other refs exist") + } + + var updatedBlob model.FileBlob + db.Where("sha256 = ?", "sha256multi").First(&updatedBlob) + if updatedBlob.RefCount != 3 { + t.Errorf("expected ref_count to remain 3, got %d", updatedBlob.RefCount) + } +} + +func TestDeleteFile_SoftDeleteNotAffectRefcount(t *testing.T) { + svc, _, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256soft", "blobs/soft", "text/plain", 50, 2) + file1 := createTestFile(t, db, "soft1.txt", blob.SHA256, nil) + file2 := createTestFile(t, db, "soft2.txt", blob.SHA256, nil) + + if err := svc.DeleteFile(context.Background(), file1.ID); err != nil { + t.Fatalf("DeleteFile: %v", err) + } + + var updatedBlob model.FileBlob + db.Where("sha256 = ?", "sha256soft").First(&updatedBlob) + if updatedBlob.RefCount != 2 { + t.Errorf("expected ref_count to remain 2 (soft delete should not decrement), got %d", updatedBlob.RefCount) + } + + activeCount, err := store.NewFileStore(db).CountByBlobSHA256(context.Background(), "sha256soft") + if err != nil { + t.Fatalf("CountByBlobSHA256: %v", err) + } + if activeCount != 1 { + t.Errorf("expected 1 active ref after soft delete, got %d", activeCount) + } + + var allFiles []model.File + db.Unscoped().Where("blob_sha256 = ?", "sha256soft").Find(&allFiles) + if len(allFiles) != 2 { + t.Errorf("expected 2 total files (one soft deleted), got %d", len(allFiles)) + } + + if err := svc.DeleteFile(context.Background(), file2.ID); err != nil { + t.Fatalf("DeleteFile second: %v", err) + } + + activeCount2, err := store.NewFileStore(db).CountByBlobSHA256(context.Background(), "sha256soft") + if err != nil { + t.Fatalf("CountByBlobSHA256 after second delete: %v", err) + } + if activeCount2 != 0 { + t.Errorf("expected 0 active refs after both deleted, got %d", activeCount2) + } + + var finalBlob model.FileBlob + db.Where("sha256 = ?", "sha256soft").First(&finalBlob) + if finalBlob.RefCount != 1 { + t.Errorf("expected ref_count=1 (decremented once from 2), got %d", finalBlob.RefCount) + } +} + +func TestDeleteFile_NotFound(t *testing.T) { + svc, _, _ := setupFileService(t) + + err := svc.DeleteFile(context.Background(), 9999) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestDownloadFile_StorageError(t *testing.T) { + svc, ms, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256err", "blobs/err", "text/plain", 100, 1) + file := createTestFile(t, db, "error.txt", blob.SHA256, nil) + + ms.getObjectFn = func(_ context.Context, _ string, _ string, _ storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + return nil, storage.ObjectInfo{}, errors.New("storage unavailable") + } + + _, _, _, _, _, err := svc.DownloadFile(context.Background(), file.ID, "") + if err == nil { + t.Fatal("expected error from storage") + } + if !strings.Contains(err.Error(), "storage unavailable") { + t.Errorf("expected storage error, got: %v", err) + } +} + +func TestListFiles_WithFolderFilter(t *testing.T) { + svc, _, db := setupFileService(t) + + blob := createTestBlob(t, db, "sha256folder", "blobs/folder", "text/plain", 100, 2) + folderID := int64(1) + createTestFile(t, db, "in_folder.txt", blob.SHA256, &folderID) + createTestFile(t, db, "root.txt", blob.SHA256, nil) + + files, total, err := svc.ListFiles(context.Background(), &folderID, 1, 10, "") + if err != nil { + t.Fatalf("ListFiles: %v", err) + } + if total != 1 { + t.Errorf("expected total 1, got %d", total) + } + if len(files) != 1 { + t.Fatalf("expected 1 file, got %d", len(files)) + } + if files[0].Name != "in_folder.txt" { + t.Errorf("expected in_folder.txt, got %s", files[0].Name) + } + + rootFiles, rootTotal, err := svc.ListFiles(context.Background(), nil, 1, 10, "") + if err != nil { + t.Fatalf("ListFiles root: %v", err) + } + if rootTotal != 1 { + t.Errorf("expected root total 1, got %d", rootTotal) + } + if len(rootFiles) != 1 || rootFiles[0].Name != "root.txt" { + t.Errorf("expected root.txt in root listing") + } +} + +func TestGetFileMetadata_BlobMissing(t *testing.T) { + svc, _, db := setupFileService(t) + + file := createTestFile(t, db, "orphan.txt", "nonexistent_sha256", nil) + + _, _, err := svc.GetFileMetadata(context.Background(), file.ID) + if err == nil { + t.Fatal("expected error when blob is missing") + } +} diff --git a/internal/service/folder_service.go b/internal/service/folder_service.go new file mode 100644 index 0000000..ec9cf9e --- /dev/null +++ b/internal/service/folder_service.go @@ -0,0 +1,142 @@ +package service + +import ( + "context" + "fmt" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" +) + +// FolderService provides CRUD operations for folders with path validation +// and directory tree management. +type FolderService struct { + folderStore *store.FolderStore + fileStore *store.FileStore + logger *zap.Logger +} + +// NewFolderService creates a new FolderService. +func NewFolderService(folderStore *store.FolderStore, fileStore *store.FileStore, logger *zap.Logger) *FolderService { + return &FolderService{ + folderStore: folderStore, + fileStore: fileStore, + logger: logger, + } +} + +// CreateFolder validates the name, computes a materialized path, checks for +// duplicates, and persists the folder. +func (s *FolderService) CreateFolder(ctx context.Context, name string, parentID *int64) (*model.FolderResponse, error) { + if err := model.ValidateFolderName(name); err != nil { + return nil, fmt.Errorf("invalid folder name: %w", err) + } + + var path string + if parentID == nil { + path = "/" + name + "/" + } else { + parent, err := s.folderStore.GetByID(ctx, *parentID) + if err != nil { + return nil, fmt.Errorf("get parent folder: %w", err) + } + if parent == nil { + return nil, fmt.Errorf("parent folder %d not found", *parentID) + } + path = parent.Path + name + "/" + } + + existing, err := s.folderStore.GetByPath(ctx, path) + if err != nil { + return nil, fmt.Errorf("check duplicate path: %w", err) + } + if existing != nil { + return nil, fmt.Errorf("folder with path %q already exists", path) + } + + folder := &model.Folder{ + Name: name, + ParentID: parentID, + Path: path, + } + if err := s.folderStore.Create(ctx, folder); err != nil { + return nil, fmt.Errorf("create folder: %w", err) + } + + return s.toFolderResponse(ctx, folder) +} + +// GetFolder retrieves a folder by ID with file and subfolder counts. +func (s *FolderService) GetFolder(ctx context.Context, id int64) (*model.FolderResponse, error) { + folder, err := s.folderStore.GetByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get folder: %w", err) + } + if folder == nil { + return nil, fmt.Errorf("folder %d not found", id) + } + + return s.toFolderResponse(ctx, folder) +} + +// ListFolders returns all direct children of the given parent folder (or root +// if parentID is nil). +func (s *FolderService) ListFolders(ctx context.Context, parentID *int64) ([]model.FolderResponse, error) { + folders, err := s.folderStore.ListByParentID(ctx, parentID) + if err != nil { + return nil, fmt.Errorf("list folders: %w", err) + } + + result := make([]model.FolderResponse, 0, len(folders)) + for i := range folders { + resp, err := s.toFolderResponse(ctx, &folders[i]) + if err != nil { + return nil, err + } + result = append(result, *resp) + } + return result, nil +} + +// DeleteFolder soft-deletes a folder only if it has no children (sub-folders +// or files). +func (s *FolderService) DeleteFolder(ctx context.Context, id int64) error { + hasChildren, err := s.folderStore.HasChildren(ctx, id) + if err != nil { + return fmt.Errorf("check children: %w", err) + } + if hasChildren { + return fmt.Errorf("folder is not empty") + } + + if err := s.folderStore.Delete(ctx, id); err != nil { + return fmt.Errorf("delete folder: %w", err) + } + return nil +} + +// toFolderResponse converts a Folder model into a FolderResponse DTO with +// computed file and subfolder counts. +func (s *FolderService) toFolderResponse(ctx context.Context, f *model.Folder) (*model.FolderResponse, error) { + subFolders, err := s.folderStore.ListByParentID(ctx, &f.ID) + if err != nil { + return nil, fmt.Errorf("count subfolders: %w", err) + } + + _, fileCount, err := s.fileStore.List(ctx, &f.ID, 1, 1) + if err != nil { + return nil, fmt.Errorf("count files: %w", err) + } + + return &model.FolderResponse{ + ID: f.ID, + Name: f.Name, + ParentID: f.ParentID, + Path: f.Path, + FileCount: fileCount, + SubFolderCount: int64(len(subFolders)), + CreatedAt: f.CreatedAt, + }, nil +} diff --git a/internal/service/folder_service_test.go b/internal/service/folder_service_test.go new file mode 100644 index 0000000..43cc4b8 --- /dev/null +++ b/internal/service/folder_service_test.go @@ -0,0 +1,230 @@ +package service + +import ( + "context" + "strings" + "testing" + + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + gormlogger "gorm.io/gorm/logger" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupFolderService(t *testing.T) *FolderService { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: gormlogger.Default.LogMode(gormlogger.Silent), + }) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.Folder{}, &model.File{}); err != nil { + t.Fatalf("auto migrate: %v", err) + } + return NewFolderService( + store.NewFolderStore(db), + store.NewFileStore(db), + zap.NewNop(), + ) +} + +func TestCreateFolder_ValidName(t *testing.T) { + svc := setupFolderService(t) + resp, err := svc.CreateFolder(context.Background(), "datasets", nil) + if err != nil { + t.Fatalf("CreateFolder() error = %v", err) + } + if resp.Name != "datasets" { + t.Errorf("Name = %q, want %q", resp.Name, "datasets") + } + if resp.Path != "/datasets/" { + t.Errorf("Path = %q, want %q", resp.Path, "/datasets/") + } + if resp.ParentID != nil { + t.Errorf("ParentID should be nil for root folder, got %d", *resp.ParentID) + } +} + +func TestCreateFolder_SubFolder(t *testing.T) { + svc := setupFolderService(t) + parent, err := svc.CreateFolder(context.Background(), "datasets", nil) + if err != nil { + t.Fatalf("create parent: %v", err) + } + + child, err := svc.CreateFolder(context.Background(), "images", &parent.ID) + if err != nil { + t.Fatalf("CreateFolder() error = %v", err) + } + if child.Path != "/datasets/images/" { + t.Errorf("Path = %q, want %q", child.Path, "/datasets/images/") + } + if child.ParentID == nil || *child.ParentID != parent.ID { + t.Errorf("ParentID = %v, want %d", child.ParentID, parent.ID) + } +} + +func TestCreateFolder_RejectPathTraversal(t *testing.T) { + svc := setupFolderService(t) + for _, name := range []string{"..", "../etc", "/absolute", "a/b"} { + _, err := svc.CreateFolder(context.Background(), name, nil) + if err == nil { + t.Errorf("expected error for folder name %q, got nil", name) + } + } +} + +func TestCreateFolder_DuplicatePath(t *testing.T) { + svc := setupFolderService(t) + _, err := svc.CreateFolder(context.Background(), "datasets", nil) + if err != nil { + t.Fatalf("first create: %v", err) + } + _, err = svc.CreateFolder(context.Background(), "datasets", nil) + if err == nil { + t.Fatal("expected error for duplicate folder name") + } + if !strings.Contains(err.Error(), "already exists") { + t.Errorf("error should mention 'already exists', got: %v", err) + } +} + +func TestCreateFolder_ParentNotFound(t *testing.T) { + svc := setupFolderService(t) + badID := int64(99999) + _, err := svc.CreateFolder(context.Background(), "orphan", &badID) + if err == nil { + t.Fatal("expected error for non-existent parent") + } + if !strings.Contains(err.Error(), "not found") { + t.Errorf("error should mention 'not found', got: %v", err) + } +} + +func TestGetFolder(t *testing.T) { + svc := setupFolderService(t) + created, err := svc.CreateFolder(context.Background(), "datasets", nil) + if err != nil { + t.Fatalf("CreateFolder() error = %v", err) + } + + resp, err := svc.GetFolder(context.Background(), created.ID) + if err != nil { + t.Fatalf("GetFolder() error = %v", err) + } + if resp.ID != created.ID { + t.Errorf("ID = %d, want %d", resp.ID, created.ID) + } + if resp.Name != "datasets" { + t.Errorf("Name = %q, want %q", resp.Name, "datasets") + } + if resp.FileCount != 0 { + t.Errorf("FileCount = %d, want 0", resp.FileCount) + } + if resp.SubFolderCount != 0 { + t.Errorf("SubFolderCount = %d, want 0", resp.SubFolderCount) + } +} + +func TestGetFolder_NotFound(t *testing.T) { + svc := setupFolderService(t) + _, err := svc.GetFolder(context.Background(), 99999) + if err == nil { + t.Fatal("expected error for non-existent folder") + } + if !strings.Contains(err.Error(), "not found") { + t.Errorf("error should mention 'not found', got: %v", err) + } +} + +func TestListFolders(t *testing.T) { + svc := setupFolderService(t) + parent, err := svc.CreateFolder(context.Background(), "root", nil) + if err != nil { + t.Fatalf("create root: %v", err) + } + _, err = svc.CreateFolder(context.Background(), "child1", &parent.ID) + if err != nil { + t.Fatalf("create child1: %v", err) + } + _, err = svc.CreateFolder(context.Background(), "child2", &parent.ID) + if err != nil { + t.Fatalf("create child2: %v", err) + } + + list, err := svc.ListFolders(context.Background(), &parent.ID) + if err != nil { + t.Fatalf("ListFolders() error = %v", err) + } + if len(list) != 2 { + t.Fatalf("len(list) = %d, want 2", len(list)) + } + + names := make(map[string]bool) + for _, f := range list { + names[f.Name] = true + } + if !names["child1"] || !names["child2"] { + t.Errorf("expected child1 and child2, got %v", names) + } +} + +func TestListFolders_Root(t *testing.T) { + svc := setupFolderService(t) + _, err := svc.CreateFolder(context.Background(), "alpha", nil) + if err != nil { + t.Fatalf("create alpha: %v", err) + } + _, err = svc.CreateFolder(context.Background(), "beta", nil) + if err != nil { + t.Fatalf("create beta: %v", err) + } + + list, err := svc.ListFolders(context.Background(), nil) + if err != nil { + t.Fatalf("ListFolders() error = %v", err) + } + if len(list) != 2 { + t.Errorf("len(list) = %d, want 2", len(list)) + } +} + +func TestDeleteFolder_Success(t *testing.T) { + svc := setupFolderService(t) + created, err := svc.CreateFolder(context.Background(), "temp", nil) + if err != nil { + t.Fatalf("CreateFolder() error = %v", err) + } + if err := svc.DeleteFolder(context.Background(), created.ID); err != nil { + t.Fatalf("DeleteFolder() error = %v", err) + } + _, err = svc.GetFolder(context.Background(), created.ID) + if err == nil { + t.Error("expected error after deletion") + } +} + +func TestDeleteFolder_NonEmpty(t *testing.T) { + svc := setupFolderService(t) + parent, err := svc.CreateFolder(context.Background(), "haschild", nil) + if err != nil { + t.Fatalf("create parent: %v", err) + } + _, err = svc.CreateFolder(context.Background(), "child", &parent.ID) + if err != nil { + t.Fatalf("create child: %v", err) + } + + err = svc.DeleteFolder(context.Background(), parent.ID) + if err == nil { + t.Fatal("expected error when deleting non-empty folder") + } + if !strings.Contains(err.Error(), "not empty") { + t.Errorf("error should mention 'not empty', got: %v", err) + } +} diff --git a/internal/service/upload_service.go b/internal/service/upload_service.go new file mode 100644 index 0000000..6ebfbc5 --- /dev/null +++ b/internal/service/upload_service.go @@ -0,0 +1,441 @@ +package service + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io" + "time" + + "gcy_hpc_server/internal/config" + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type UploadService struct { + storage storage.ObjectStorage + blobStore *store.BlobStore + fileStore *store.FileStore + uploadStore *store.UploadStore + cfg config.MinioConfig + db *gorm.DB + logger *zap.Logger +} + +func NewUploadService( + st storage.ObjectStorage, + blobStore *store.BlobStore, + fileStore *store.FileStore, + uploadStore *store.UploadStore, + cfg config.MinioConfig, + db *gorm.DB, + logger *zap.Logger, +) *UploadService { + return &UploadService{ + storage: st, + blobStore: blobStore, + fileStore: fileStore, + uploadStore: uploadStore, + cfg: cfg, + db: db, + logger: logger, + } +} + +func (s *UploadService) InitUpload(ctx context.Context, req model.InitUploadRequest) (interface{}, error) { + if err := model.ValidateFileName(req.FileName); err != nil { + return nil, fmt.Errorf("invalid file name: %w", err) + } + + if req.FileSize < 0 { + return nil, fmt.Errorf("file size cannot be negative") + } + + if req.FileSize > s.cfg.MaxFileSize { + return nil, fmt.Errorf("file size %d exceeds maximum %d", req.FileSize, s.cfg.MaxFileSize) + } + + chunkSize := s.cfg.ChunkSize + if req.ChunkSize != nil { + chunkSize = *req.ChunkSize + } + if chunkSize < s.cfg.MinChunkSize { + return nil, fmt.Errorf("chunk size %d is below minimum %d", chunkSize, s.cfg.MinChunkSize) + } + + totalChunks := int((req.FileSize + chunkSize - 1) / chunkSize) + if req.FileSize == 0 { + totalChunks = 0 + } + if totalChunks > 10000 { + return nil, fmt.Errorf("total chunks %d exceeds limit of 10000", totalChunks) + } + + blob, err := s.blobStore.GetBySHA256(ctx, req.SHA256) + if err != nil { + return nil, fmt.Errorf("check dedup: %w", err) + } + if blob != nil { + if err := s.blobStore.IncrementRef(ctx, req.SHA256); err != nil { + return nil, fmt.Errorf("increment ref: %w", err) + } + file := &model.File{ + Name: req.FileName, + FolderID: req.FolderID, + BlobSHA256: req.SHA256, + } + if err := s.fileStore.Create(ctx, file); err != nil { + return nil, fmt.Errorf("create file: %w", err) + } + return model.FileResponse{ + ID: file.ID, + Name: file.Name, + FolderID: file.FolderID, + Size: blob.FileSize, + MimeType: blob.MimeType, + SHA256: blob.SHA256, + CreatedAt: file.CreatedAt, + UpdatedAt: file.UpdatedAt, + }, nil + } + + mimeType := req.MimeType + if mimeType == "" { + mimeType = "application/octet-stream" + } + + session := &model.UploadSession{ + FileName: req.FileName, + FileSize: req.FileSize, + ChunkSize: chunkSize, + TotalChunks: totalChunks, + SHA256: req.SHA256, + FolderID: req.FolderID, + Status: "pending", + MinioPrefix: fmt.Sprintf("uploads/%d/", time.Now().UnixNano()), + MimeType: mimeType, + ExpiresAt: time.Now().Add(time.Duration(s.cfg.SessionTTL) * time.Hour), + } + + if err := s.uploadStore.CreateSession(ctx, session); err != nil { + return nil, fmt.Errorf("create session: %w", err) + } + + return model.UploadSessionResponse{ + ID: session.ID, + FileName: session.FileName, + FileSize: session.FileSize, + ChunkSize: session.ChunkSize, + TotalChunks: session.TotalChunks, + SHA256: session.SHA256, + Status: session.Status, + UploadedChunks: []int{}, + ExpiresAt: session.ExpiresAt, + CreatedAt: session.CreatedAt, + }, nil +} + +func (s *UploadService) UploadChunk(ctx context.Context, sessionID int64, chunkIndex int, reader io.Reader, size int64) error { + session, err := s.uploadStore.GetSession(ctx, sessionID) + if err != nil { + return fmt.Errorf("get session: %w", err) + } + if session == nil { + return fmt.Errorf("session not found") + } + + switch session.Status { + case "pending", "uploading", "failed": + default: + return fmt.Errorf("cannot upload to session with status %q", session.Status) + } + + if chunkIndex < 0 || chunkIndex >= session.TotalChunks { + return fmt.Errorf("chunk index %d out of range [0, %d)", chunkIndex, session.TotalChunks) + } + + key := fmt.Sprintf("%schunk_%05d", session.MinioPrefix, chunkIndex) + + hasher := sha256.New() + teeReader := io.TeeReader(reader, hasher) + + _, err = s.storage.PutObject(ctx, s.cfg.Bucket, key, teeReader, size, storage.PutObjectOptions{ + DisableMultipart: true, + }) + if err != nil { + return fmt.Errorf("put object: %w", err) + } + + chunkSHA256 := hex.EncodeToString(hasher.Sum(nil)) + + chunk := &model.UploadChunk{ + SessionID: sessionID, + ChunkIndex: chunkIndex, + MinioKey: key, + SHA256: chunkSHA256, + Size: size, + Status: "uploaded", + } + + if err := s.uploadStore.UpsertChunk(ctx, chunk); err != nil { + return fmt.Errorf("upsert chunk: %w", err) + } + + if session.Status == "pending" { + if err := s.uploadStore.UpdateSessionStatus(ctx, sessionID, "uploading"); err != nil { + return fmt.Errorf("update status to uploading: %w", err) + } + } + + return nil +} + +func (s *UploadService) GetUploadStatus(ctx context.Context, sessionID int64) (*model.UploadSessionResponse, error) { + session, chunks, err := s.uploadStore.GetSessionWithChunks(ctx, sessionID) + if err != nil { + return nil, fmt.Errorf("get session: %w", err) + } + if session == nil { + return nil, fmt.Errorf("session not found") + } + + uploadedChunks := make([]int, 0, len(chunks)) + for _, c := range chunks { + if c.Status == "uploaded" { + uploadedChunks = append(uploadedChunks, c.ChunkIndex) + } + } + + return &model.UploadSessionResponse{ + ID: session.ID, + FileName: session.FileName, + FileSize: session.FileSize, + ChunkSize: session.ChunkSize, + TotalChunks: session.TotalChunks, + SHA256: session.SHA256, + Status: session.Status, + UploadedChunks: uploadedChunks, + ExpiresAt: session.ExpiresAt, + CreatedAt: session.CreatedAt, + }, nil +} + +func (s *UploadService) CompleteUpload(ctx context.Context, sessionID int64) (*model.FileResponse, error) { + var fileResp *model.FileResponse + var totalChunks int + var minioPrefix string + + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var session model.UploadSession + query := tx.WithContext(ctx) + if !isSQLite(tx) { + query = query.Clauses(clause.Locking{Strength: "UPDATE"}) + } + if err := query.First(&session, sessionID).Error; err != nil { + return fmt.Errorf("get session: %w", err) + } + + switch session.Status { + case "uploading", "failed": + default: + return fmt.Errorf("cannot complete session with status %q", session.Status) + } + + if session.TotalChunks > 0 { + var chunkCount int64 + if err := tx.WithContext(ctx).Model(&model.UploadChunk{}). + Where("session_id = ? AND status = ?", sessionID, "uploaded"). + Count(&chunkCount).Error; err != nil { + return fmt.Errorf("count chunks: %w", err) + } + if int(chunkCount) != session.TotalChunks { + return fmt.Errorf("not all chunks uploaded: %d/%d", chunkCount, session.TotalChunks) + } + } + + totalChunks = session.TotalChunks + minioPrefix = session.MinioPrefix + + if err := updateStatusTx(tx, ctx, sessionID, "merging"); err != nil { + return fmt.Errorf("update status to merging: %w", err) + } + + blob, err := s.blobStore.GetBySHA256ForUpdate(ctx, tx, session.SHA256) + if err != nil { + return fmt.Errorf("get blob for update: %w", err) + } + + if blob != nil { + result := tx.WithContext(ctx).Model(&model.FileBlob{}). + Where("sha256 = ?", session.SHA256). + UpdateColumn("ref_count", gorm.Expr("ref_count + 1")) + if result.Error != nil { + return fmt.Errorf("increment ref: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("blob not found for ref increment") + } + } else { + if session.TotalChunks > 0 { + chunkKeys := make([]string, session.TotalChunks) + for i := 0; i < session.TotalChunks; i++ { + chunkKeys[i] = fmt.Sprintf("%schunk_%05d", session.MinioPrefix, i) + } + + dstKey := "files/" + session.SHA256 + if _, err := s.storage.ComposeObject(ctx, s.cfg.Bucket, dstKey, chunkKeys); err != nil { + return errComposeFailed{err: err} + } + } + + blobRecord := &model.FileBlob{ + SHA256: session.SHA256, + MinioKey: "files/" + session.SHA256, + FileSize: session.FileSize, + MimeType: session.MimeType, + RefCount: 1, + } + if session.TotalChunks == 0 { + blobRecord.MinioKey = "files/" + session.SHA256 + blobRecord.FileSize = 0 + } + if err := tx.WithContext(ctx).Create(blobRecord).Error; err != nil { + return fmt.Errorf("create blob: %w", err) + } + } + + file := &model.File{ + Name: session.FileName, + FolderID: session.FolderID, + BlobSHA256: session.SHA256, + } + if err := tx.WithContext(ctx).Create(file).Error; err != nil { + return fmt.Errorf("create file: %w", err) + } + + if err := updateStatusTx(tx, ctx, sessionID, "completed"); err != nil { + return fmt.Errorf("update status to completed: %w", err) + } + + fileResp = &model.FileResponse{ + ID: file.ID, + Name: file.Name, + FolderID: file.FolderID, + Size: session.FileSize, + MimeType: session.MimeType, + SHA256: session.SHA256, + CreatedAt: file.CreatedAt, + UpdatedAt: file.UpdatedAt, + } + + return nil + }) + + if err != nil { + var cfe errComposeFailed + if errors.As(err, &cfe) { + if statusErr := s.uploadStore.UpdateSessionStatus(ctx, sessionID, "failed"); statusErr != nil { + s.logger.Warn("failed to mark session as failed", zap.Int64("session_id", sessionID), zap.Error(statusErr)) + } + return nil, fmt.Errorf("compose object: %w", cfe.err) + } + return nil, err + } + + if totalChunks > 0 { + keys := make([]string, totalChunks) + for i := 0; i < totalChunks; i++ { + keys[i] = fmt.Sprintf("%schunk_%05d", minioPrefix, i) + } + go func() { + bgCtx := context.Background() + if delErr := s.storage.RemoveObjects(bgCtx, s.cfg.Bucket, keys, storage.RemoveObjectsOptions{}); delErr != nil { + s.logger.Warn("delete temp chunks", zap.Error(delErr)) + } + }() + } + + return fileResp, nil +} + +func (s *UploadService) CancelUpload(ctx context.Context, sessionID int64) error { + session, err := s.uploadStore.GetSession(ctx, sessionID) + if err != nil { + return fmt.Errorf("get session: %w", err) + } + if session == nil { + return fmt.Errorf("session not found") + } + + if err := s.uploadStore.UpdateSessionStatus(ctx, sessionID, "cancelled"); err != nil { + return fmt.Errorf("update status to cancelled: %w", err) + } + + if session.TotalChunks > 0 { + keys, listErr := s.listChunkKeys(ctx, sessionID) + if listErr != nil { + s.logger.Warn("list chunk keys for cancel", zap.Error(listErr)) + } else if len(keys) > 0 { + if delErr := s.storage.RemoveObjects(ctx, s.cfg.Bucket, keys, storage.RemoveObjectsOptions{}); delErr != nil { + s.logger.Warn("remove chunk objects for cancel", zap.Error(delErr)) + } + } + } + + if err := s.uploadStore.DeleteSession(ctx, sessionID); err != nil { + return fmt.Errorf("delete session: %w", err) + } + + return nil +} + +func (s *UploadService) listChunkKeys(ctx context.Context, sessionID int64) ([]string, error) { + session, err := s.uploadStore.GetSession(ctx, sessionID) + if err != nil || session == nil { + return nil, fmt.Errorf("get session: %w", err) + } + + keys := make([]string, session.TotalChunks) + for i := 0; i < session.TotalChunks; i++ { + keys[i] = fmt.Sprintf("%schunk_%05d", session.MinioPrefix, i) + } + return keys, nil +} + +func updateStatusTx(tx *gorm.DB, ctx context.Context, id int64, status string) error { + result := tx.WithContext(ctx).Model(&model.UploadSession{}).Where("id = ?", id).Update("status", status) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +func isSQLite(db *gorm.DB) bool { + return db.Dialector.Name() == "sqlite" +} + +func objectKeys(objects []storage.ObjectInfo) []string { + keys := make([]string, len(objects)) + for i, o := range objects { + keys[i] = o.Key + } + return keys +} + +type errComposeFailed struct { + err error +} + +func (e errComposeFailed) Error() string { + return fmt.Sprintf("compose failed: %v", e.err) +} diff --git a/internal/service/upload_service_test.go b/internal/service/upload_service_test.go new file mode 100644 index 0000000..08ff8cf --- /dev/null +++ b/internal/service/upload_service_test.go @@ -0,0 +1,678 @@ +package service + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "strings" + "testing" + "time" + + "gcy_hpc_server/internal/config" + "gcy_hpc_server/internal/model" + "gcy_hpc_server/internal/storage" + "gcy_hpc_server/internal/store" + + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +type uploadMockStorage struct { + putObjectFn func(ctx context.Context, bucket, key string, reader io.Reader, size int64, opts storage.PutObjectOptions) (storage.UploadInfo, error) + composeObjectFn func(ctx context.Context, bucket, dst string, sources []string) (storage.UploadInfo, error) + listObjectsFn func(ctx context.Context, bucket, prefix string, recursive bool) ([]storage.ObjectInfo, error) + removeObjectsFn func(ctx context.Context, bucket string, keys []string, opts storage.RemoveObjectsOptions) error + removeObjectFn func(ctx context.Context, bucket, key string, opts storage.RemoveObjectOptions) error + getObjectFn func(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) + bucketExistsFn func(ctx context.Context, bucket string) (bool, error) + makeBucketFn func(ctx context.Context, bucket string, opts storage.MakeBucketOptions) error + statObjectFn func(ctx context.Context, bucket, key string, opts storage.StatObjectOptions) (storage.ObjectInfo, error) + abortMultipartFn func(ctx context.Context, bucket, object, uploadID string) error + removeIncompleteFn func(ctx context.Context, bucket, object string) error +} + +func (m *uploadMockStorage) PutObject(ctx context.Context, bucket, key string, reader io.Reader, size int64, opts storage.PutObjectOptions) (storage.UploadInfo, error) { + if m.putObjectFn != nil { + return m.putObjectFn(ctx, bucket, key, reader, size, opts) + } + io.Copy(io.Discard, reader) + return storage.UploadInfo{ETag: "etag", Size: size}, nil +} + +func (m *uploadMockStorage) GetObject(ctx context.Context, bucket, key string, opts storage.GetOptions) (io.ReadCloser, storage.ObjectInfo, error) { + if m.getObjectFn != nil { + return m.getObjectFn(ctx, bucket, key, opts) + } + return nil, storage.ObjectInfo{}, nil +} + +func (m *uploadMockStorage) ComposeObject(ctx context.Context, bucket, dst string, sources []string) (storage.UploadInfo, error) { + if m.composeObjectFn != nil { + return m.composeObjectFn(ctx, bucket, dst, sources) + } + return storage.UploadInfo{ETag: "composed", Size: 0}, nil +} + +func (m *uploadMockStorage) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { + if m.abortMultipartFn != nil { + return m.abortMultipartFn(ctx, bucket, object, uploadID) + } + return nil +} + +func (m *uploadMockStorage) RemoveIncompleteUpload(ctx context.Context, bucket, object string) error { + if m.removeIncompleteFn != nil { + return m.removeIncompleteFn(ctx, bucket, object) + } + return nil +} + +func (m *uploadMockStorage) RemoveObject(ctx context.Context, bucket, key string, opts storage.RemoveObjectOptions) error { + if m.removeObjectFn != nil { + return m.removeObjectFn(ctx, bucket, key, opts) + } + return nil +} + +func (m *uploadMockStorage) ListObjects(ctx context.Context, bucket, prefix string, recursive bool) ([]storage.ObjectInfo, error) { + if m.listObjectsFn != nil { + return m.listObjectsFn(ctx, bucket, prefix, recursive) + } + return nil, nil +} + +func (m *uploadMockStorage) RemoveObjects(ctx context.Context, bucket string, keys []string, opts storage.RemoveObjectsOptions) error { + if m.removeObjectsFn != nil { + return m.removeObjectsFn(ctx, bucket, keys, opts) + } + return nil +} + +func (m *uploadMockStorage) BucketExists(ctx context.Context, bucket string) (bool, error) { + if m.bucketExistsFn != nil { + return m.bucketExistsFn(ctx, bucket) + } + return true, nil +} + +func (m *uploadMockStorage) MakeBucket(ctx context.Context, bucket string, opts storage.MakeBucketOptions) error { + if m.makeBucketFn != nil { + return m.makeBucketFn(ctx, bucket, opts) + } + return nil +} + +func (m *uploadMockStorage) StatObject(ctx context.Context, bucket, key string, opts storage.StatObjectOptions) (storage.ObjectInfo, error) { + if m.statObjectFn != nil { + return m.statObjectFn(ctx, bucket, key, opts) + } + return storage.ObjectInfo{}, nil +} + +func setupUploadTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + if err := db.AutoMigrate(&model.FileBlob{}, &model.File{}, &model.UploadSession{}, &model.UploadChunk{}); err != nil { + t.Fatalf("migrate: %v", err) + } + return db +} + +func uploadTestConfig() config.MinioConfig { + return config.MinioConfig{ + Bucket: "test-bucket", + ChunkSize: 16 << 20, + MaxFileSize: 50 << 30, + MinChunkSize: 5 << 20, + SessionTTL: 48, + } +} + +func newUploadTestService(t *testing.T, st storage.ObjectStorage, db *gorm.DB) *UploadService { + t.Helper() + return NewUploadService( + st, + store.NewBlobStore(db), + store.NewFileStore(db), + store.NewUploadStore(db), + uploadTestConfig(), + db, + zap.NewNop(), + ) +} + +func sha256Of(data []byte) string { + h := sha256.Sum256(data) + return hex.EncodeToString(h[:]) +} + +func TestInitUpload_CreatesSession(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, err := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.txt", + FileSize: 32 << 20, + SHA256: "abc123", + }) + if err != nil { + t.Fatalf("InitUpload: %v", err) + } + + sessResp, ok := resp.(model.UploadSessionResponse) + if !ok { + t.Fatalf("expected UploadSessionResponse, got %T", resp) + } + if sessResp.Status != "pending" { + t.Errorf("status = %q, want pending", sessResp.Status) + } + if sessResp.TotalChunks != 2 { + t.Errorf("TotalChunks = %d, want 2", sessResp.TotalChunks) + } + if sessResp.FileName != "test.txt" { + t.Errorf("FileName = %q, want test.txt", sessResp.FileName) + } +} + +func TestInitUpload_DedupBlobExists(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + blobSHA := sha256Of([]byte("hello")) + blob := &model.FileBlob{ + SHA256: blobSHA, + MinioKey: "files/" + blobSHA, + FileSize: 5, + MimeType: "text/plain", + RefCount: 1, + } + if err := db.Create(blob).Error; err != nil { + t.Fatalf("create blob: %v", err) + } + + resp, err := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "mydoc.txt", + FileSize: 5, + SHA256: blobSHA, + }) + if err != nil { + t.Fatalf("InitUpload: %v", err) + } + + fileResp, ok := resp.(model.FileResponse) + if !ok { + t.Fatalf("expected FileResponse, got %T", resp) + } + if fileResp.Name != "mydoc.txt" { + t.Errorf("Name = %q, want mydoc.txt", fileResp.Name) + } + if fileResp.SHA256 != blobSHA { + t.Errorf("SHA256 mismatch") + } + + var after model.FileBlob + db.First(&after, blob.ID) + if after.RefCount != 2 { + t.Errorf("RefCount = %d, want 2", after.RefCount) + } +} + +func TestInitUpload_ChunkSizeTooSmall(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + tiny := int64(1024) + _, err := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.txt", + FileSize: 10 << 20, + SHA256: "abc", + ChunkSize: &tiny, + }) + if err == nil { + t.Fatal("expected error for chunk size too small") + } + if !strings.Contains(err.Error(), "below minimum") { + t.Errorf("error = %q, want 'below minimum'", err.Error()) + } +} + +func TestInitUpload_TooManyChunks(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + + cfg := uploadTestConfig() + cfg.ChunkSize = 1 + cfg.MinChunkSize = 1 + svc2 := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + _, err := svc2.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "big.txt", + FileSize: 10002, + SHA256: "abc", + }) + if err == nil { + t.Fatal("expected error for too many chunks") + } + if !strings.Contains(err.Error(), "exceeds limit") { + t.Errorf("error = %q, want 'exceeds limit'", err.Error()) + } +} + +func TestInitUpload_DangerousFilename(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + for _, name := range []string{"", "..", "foo/bar", "foo\\bar", " name"} { + _, err := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: name, + FileSize: 100, + SHA256: "abc", + }) + if err == nil { + t.Errorf("expected error for filename %q", name) + } + } +} + +func TestUploadChunk_UploadsAndStoresSHA256(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, _ := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.bin", + FileSize: 10 << 20, + SHA256: "deadbeef", + }) + sessResp := resp.(model.UploadSessionResponse) + + data := []byte("chunk data here") + chunkSHA := sha256Of(data) + + err := svc.UploadChunk(context.Background(), sessResp.ID, 0, bytes.NewReader(data), int64(len(data))) + if err != nil { + t.Fatalf("UploadChunk: %v", err) + } + + var chunk model.UploadChunk + db.Where("session_id = ? AND chunk_index = ?", sessResp.ID, 0).First(&chunk) + if chunk.SHA256 != chunkSHA { + t.Errorf("chunk SHA256 = %q, want %q", chunk.SHA256, chunkSHA) + } + if chunk.Status != "uploaded" { + t.Errorf("chunk status = %q, want uploaded", chunk.Status) + } + + session, _ := svc.uploadStore.GetSession(context.Background(), sessResp.ID) + if session.Status != "uploading" { + t.Errorf("session status = %q, want uploading", session.Status) + } +} + +func TestUploadChunk_RejectsCompletedSession(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, _ := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.bin", + FileSize: 10 << 20, + SHA256: "deadbeef", + }) + sessResp := resp.(model.UploadSessionResponse) + + svc.uploadStore.UpdateSessionStatus(context.Background(), sessResp.ID, "completed") + + err := svc.UploadChunk(context.Background(), sessResp.ID, 0, bytes.NewReader([]byte("x")), 1) + if err == nil { + t.Fatal("expected error for completed session") + } + if !strings.Contains(err.Error(), "cannot upload") { + t.Errorf("error = %q", err.Error()) + } +} + +func TestUploadChunk_RejectsExpiredSession(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, _ := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.bin", + FileSize: 10 << 20, + SHA256: "deadbeef", + }) + sessResp := resp.(model.UploadSessionResponse) + + svc.uploadStore.UpdateSessionStatus(context.Background(), sessResp.ID, "expired") + + err := svc.UploadChunk(context.Background(), sessResp.ID, 0, bytes.NewReader([]byte("x")), 1) + if err == nil { + t.Fatal("expected error for expired session") + } +} + +func TestGetUploadStatus_ReturnsChunkIndices(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, _ := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "test.bin", + FileSize: 32 << 20, + SHA256: "abc", + }) + sessResp := resp.(model.UploadSessionResponse) + + data := []byte("chunk0data") + svc.UploadChunk(context.Background(), sessResp.ID, 0, bytes.NewReader(data), int64(len(data))) + + status, err := svc.GetUploadStatus(context.Background(), sessResp.ID) + if err != nil { + t.Fatalf("GetUploadStatus: %v", err) + } + if len(status.UploadedChunks) != 1 || status.UploadedChunks[0] != 0 { + t.Errorf("UploadedChunks = %v, want [0]", status.UploadedChunks) + } + if status.TotalChunks != 2 { + t.Errorf("TotalChunks = %d, want 2", status.TotalChunks) + } +} + +func TestCompleteUpload_CreatesBlobAndFile(t *testing.T) { + db := setupUploadTestDB(t) + cfg := uploadTestConfig() + cfg.ChunkSize = 5 << 20 + cfg.MinChunkSize = 1 + st := &uploadMockStorage{} + svc := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + fileSHA := "aaa111" + + sess := &model.UploadSession{ + FileName: "test.bin", + FileSize: 10 << 20, + ChunkSize: 5 << 20, + TotalChunks: 2, + SHA256: fileSHA, + Status: "uploading", + MinioPrefix: "uploads/testcomplete/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 0, MinioKey: "uploads/testcomplete/chunk_00000", SHA256: "c0", Size: 5 << 20, Status: "uploaded"}) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 1, MinioKey: "uploads/testcomplete/chunk_00001", SHA256: "c1", Size: 5 << 20, Status: "uploaded"}) + + fileResp, err := svc.CompleteUpload(context.Background(), sess.ID) + if err != nil { + t.Fatalf("CompleteUpload: %v", err) + } + if fileResp.Name != "test.bin" { + t.Errorf("Name = %q, want test.bin", fileResp.Name) + } + if fileResp.SHA256 != fileSHA { + t.Errorf("SHA256 = %q, want %q", fileResp.SHA256, fileSHA) + } + + var blob model.FileBlob + db.Where("sha256 = ?", fileSHA).First(&blob) + if blob.RefCount != 1 { + t.Errorf("blob RefCount = %d, want 1", blob.RefCount) + } + + session, _ := svc.uploadStore.GetSession(context.Background(), sess.ID) + if session == nil { + t.Fatal("session should exist") + } + if session.Status != "completed" { + t.Errorf("session status = %q, want completed", session.Status) + } +} + +func TestCompleteUpload_ReusesExistingBlob(t *testing.T) { + db := setupUploadTestDB(t) + cfg := uploadTestConfig() + cfg.ChunkSize = 5 << 20 + cfg.MinChunkSize = 1 + st := &uploadMockStorage{} + + composeCalled := false + st.composeObjectFn = func(ctx context.Context, bucket, dst string, sources []string) (storage.UploadInfo, error) { + composeCalled = true + return storage.UploadInfo{}, nil + } + + svc := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + fileSHA := "reuse123" + db.Create(&model.FileBlob{ + SHA256: fileSHA, + MinioKey: "files/" + fileSHA, + FileSize: 10 << 20, + MimeType: "application/octet-stream", + RefCount: 1, + }) + + sess := &model.UploadSession{ + FileName: "reuse.bin", + FileSize: 10 << 20, + ChunkSize: 5 << 20, + TotalChunks: 2, + SHA256: fileSHA, + Status: "uploading", + MinioPrefix: "uploads/reuse/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 0, MinioKey: "uploads/reuse/chunk_00000", SHA256: "c0", Size: 5 << 20, Status: "uploaded"}) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 1, MinioKey: "uploads/reuse/chunk_00001", SHA256: "c1", Size: 5 << 20, Status: "uploaded"}) + + fileResp, err := svc.CompleteUpload(context.Background(), sess.ID) + if err != nil { + t.Fatalf("CompleteUpload: %v", err) + } + if fileResp.SHA256 != fileSHA { + t.Errorf("SHA256 mismatch") + } + if composeCalled { + t.Error("ComposeObject should not be called when blob exists") + } + + var blob model.FileBlob + db.Where("sha256 = ?", fileSHA).First(&blob) + if blob.RefCount != 2 { + t.Errorf("RefCount = %d, want 2", blob.RefCount) + } +} + +func TestCompleteUpload_NotAllChunks(t *testing.T) { + db := setupUploadTestDB(t) + cfg := uploadTestConfig() + cfg.ChunkSize = 5 << 20 + cfg.MinChunkSize = 1 + st := &uploadMockStorage{} + svc := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + sess := &model.UploadSession{ + FileName: "partial.bin", + FileSize: 10 << 20, + ChunkSize: 5 << 20, + TotalChunks: 2, + SHA256: "partial123", + Status: "uploading", + MinioPrefix: "uploads/partial/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 0, MinioKey: "uploads/partial/chunk_00000", SHA256: "c0", Size: 5 << 20, Status: "uploaded"}) + + _, err := svc.CompleteUpload(context.Background(), sess.ID) + if err == nil { + t.Fatal("expected error for incomplete chunks") + } + if !strings.Contains(err.Error(), "not all chunks uploaded") { + t.Errorf("error = %q", err.Error()) + } +} + +func TestCompleteUpload_ComposeObjectFails(t *testing.T) { + db := setupUploadTestDB(t) + cfg := uploadTestConfig() + cfg.ChunkSize = 5 << 20 + cfg.MinChunkSize = 1 + st := &uploadMockStorage{} + + st.composeObjectFn = func(ctx context.Context, bucket, dst string, sources []string) (storage.UploadInfo, error) { + return storage.UploadInfo{}, fmt.Errorf("compose failed") + } + + svc := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + fileSHA := "fail123" + sess := &model.UploadSession{ + FileName: "fail.bin", + FileSize: 10 << 20, + ChunkSize: 5 << 20, + TotalChunks: 2, + SHA256: fileSHA, + Status: "uploading", + MinioPrefix: "uploads/fail/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 0, MinioKey: "uploads/fail/chunk_00000", SHA256: "c0", Size: 5 << 20, Status: "uploaded"}) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 1, MinioKey: "uploads/fail/chunk_00001", SHA256: "c1", Size: 5 << 20, Status: "uploaded"}) + + _, err := svc.CompleteUpload(context.Background(), sess.ID) + if err == nil { + t.Fatal("expected error for compose failure") + } + + session, _ := svc.uploadStore.GetSession(context.Background(), sess.ID) + if session.Status != "failed" { + t.Errorf("session status = %q, want failed", session.Status) + } +} + +func TestCompleteUpload_RetriesFailedSession(t *testing.T) { + db := setupUploadTestDB(t) + cfg := uploadTestConfig() + cfg.ChunkSize = 5 << 20 + cfg.MinChunkSize = 1 + st := &uploadMockStorage{} + svc := NewUploadService(st, store.NewBlobStore(db), store.NewFileStore(db), store.NewUploadStore(db), cfg, db, zap.NewNop()) + + fileSHA := "retry123" + sess := &model.UploadSession{ + FileName: "retry.bin", + FileSize: 10 << 20, + ChunkSize: 5 << 20, + TotalChunks: 2, + SHA256: fileSHA, + Status: "failed", + MinioPrefix: "uploads/retry/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 0, MinioKey: "uploads/retry/chunk_00000", SHA256: "c0", Size: 5 << 20, Status: "uploaded"}) + db.Create(&model.UploadChunk{SessionID: sess.ID, ChunkIndex: 1, MinioKey: "uploads/retry/chunk_00001", SHA256: "c1", Size: 5 << 20, Status: "uploaded"}) + + fileResp, err := svc.CompleteUpload(context.Background(), sess.ID) + if err != nil { + t.Fatalf("CompleteUpload on retry: %v", err) + } + if fileResp.SHA256 != fileSHA { + t.Errorf("SHA256 mismatch") + } + + session, _ := svc.uploadStore.GetSession(context.Background(), sess.ID) + if session.Status != "completed" { + t.Errorf("session status = %q, want completed", session.Status) + } +} + +func TestCancelUpload_CleansUp(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + resp, _ := svc.InitUpload(context.Background(), model.InitUploadRequest{ + FileName: "cancel.bin", + FileSize: 10 << 20, + SHA256: "cancel123", + }) + sessResp := resp.(model.UploadSessionResponse) + + err := svc.CancelUpload(context.Background(), sessResp.ID) + if err != nil { + t.Fatalf("CancelUpload: %v", err) + } + + session, _ := svc.uploadStore.GetSession(context.Background(), sessResp.ID) + if session != nil { + t.Error("session should be deleted") + } +} + +func TestZeroByteFile_CompletesImmediately(t *testing.T) { + db := setupUploadTestDB(t) + st := &uploadMockStorage{} + svc := newUploadTestService(t, st, db) + + fileSHA := "empty123" + sess := &model.UploadSession{ + FileName: "empty.bin", + FileSize: 0, + ChunkSize: 16 << 20, + TotalChunks: 0, + SHA256: fileSHA, + Status: "uploading", + MinioPrefix: "uploads/empty/", + MimeType: "application/octet-stream", + ExpiresAt: time.Now().Add(48 * time.Hour), + } + db.Create(sess) + + fileResp, err := svc.CompleteUpload(context.Background(), sess.ID) + if err != nil { + t.Fatalf("CompleteUpload zero byte: %v", err) + } + if fileResp.SHA256 != fileSHA { + t.Errorf("SHA256 mismatch") + } + if fileResp.Size != 0 { + t.Errorf("Size = %d, want 0", fileResp.Size) + } + + var blob model.FileBlob + db.Where("sha256 = ?", fileSHA).First(&blob) + if blob.RefCount != 1 { + t.Errorf("RefCount = %d, want 1", blob.RefCount) + } + + uploadStore := store.NewUploadStore(db) + session, _ := uploadStore.GetSession(context.Background(), sess.ID) + if session == nil { + t.Fatal("session should exist") + } + if session.Status != "completed" { + t.Errorf("session status = %q, want completed", session.Status) + } +}