feat(store): add blob, file, folder, and upload stores
Add BlobStore (ref counting), FileStore (soft delete + pagination), FolderStore (materialized path), UploadStore (idempotent upsert), and update AutoMigrate. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
117
internal/store/upload_store.go
Normal file
117
internal/store/upload_store.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gcy_hpc_server/internal/model"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
// UploadStore manages upload sessions and chunks with idempotent upsert support.
|
||||
type UploadStore struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewUploadStore creates a new UploadStore.
|
||||
func NewUploadStore(db *gorm.DB) *UploadStore {
|
||||
return &UploadStore{db: db}
|
||||
}
|
||||
|
||||
// CreateSession inserts a new upload session.
|
||||
func (s *UploadStore) CreateSession(ctx context.Context, session *model.UploadSession) error {
|
||||
return s.db.WithContext(ctx).Create(session).Error
|
||||
}
|
||||
|
||||
// GetSession returns an upload session by ID. Returns (nil, nil) if not found.
|
||||
func (s *UploadStore) GetSession(ctx context.Context, id int64) (*model.UploadSession, error) {
|
||||
var session model.UploadSession
|
||||
err := s.db.WithContext(ctx).First(&session, id).Error
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &session, nil
|
||||
}
|
||||
|
||||
// GetSessionWithChunks returns an upload session along with its chunks ordered by chunk_index.
|
||||
func (s *UploadStore) GetSessionWithChunks(ctx context.Context, id int64) (*model.UploadSession, []model.UploadChunk, error) {
|
||||
session, err := s.GetSession(ctx, id)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if session == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
var chunks []model.UploadChunk
|
||||
if err := s.db.WithContext(ctx).Where("session_id = ?", id).Order("chunk_index ASC").Find(&chunks).Error; err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return session, chunks, nil
|
||||
}
|
||||
|
||||
// UpdateSessionStatus updates the status field of an upload session.
|
||||
// Returns gorm.ErrRecordNotFound if no row was affected.
|
||||
func (s *UploadStore) UpdateSessionStatus(ctx context.Context, id int64, status string) error {
|
||||
result := s.db.WithContext(ctx).Model(&model.UploadSession{}).Where("id = ?", id).Update("status", status)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return gorm.ErrRecordNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListExpiredSessions returns sessions that are not in a terminal state and have expired.
|
||||
func (s *UploadStore) ListExpiredSessions(ctx context.Context) ([]model.UploadSession, error) {
|
||||
var sessions []model.UploadSession
|
||||
err := s.db.WithContext(ctx).
|
||||
Where("status NOT IN ?", []string{"completed", "cancelled", "expired"}).
|
||||
Where("expires_at < ?", time.Now()).
|
||||
Find(&sessions).Error
|
||||
return sessions, err
|
||||
}
|
||||
|
||||
// DeleteSession removes all chunks for a session, then the session itself.
|
||||
func (s *UploadStore) DeleteSession(ctx context.Context, id int64) error {
|
||||
if err := s.db.WithContext(ctx).Where("session_id = ?", id).Delete(&model.UploadChunk{}).Error; err != nil {
|
||||
return fmt.Errorf("delete chunks: %w", err)
|
||||
}
|
||||
result := s.db.WithContext(ctx).Delete(&model.UploadSession{}, id)
|
||||
return result.Error
|
||||
}
|
||||
|
||||
// UpsertChunk inserts a chunk or updates it if the (session_id, chunk_index) pair already exists.
|
||||
// Uses GORM clause.OnConflict for dialect-neutral upsert (works with both SQLite and MySQL).
|
||||
func (s *UploadStore) UpsertChunk(ctx context.Context, chunk *model.UploadChunk) error {
|
||||
return s.db.WithContext(ctx).Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "session_id"}, {Name: "chunk_index"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"minio_key", "sha256", "size", "status", "updated_at"}),
|
||||
}).Create(chunk).Error
|
||||
}
|
||||
|
||||
// GetUploadedChunkIndices returns the chunk indices that have been successfully uploaded.
|
||||
func (s *UploadStore) GetUploadedChunkIndices(ctx context.Context, sessionID int64) ([]int, error) {
|
||||
var indices []int
|
||||
err := s.db.WithContext(ctx).Model(&model.UploadChunk{}).
|
||||
Where("session_id = ? AND status = ?", sessionID, "uploaded").
|
||||
Pluck("chunk_index", &indices).Error
|
||||
return indices, err
|
||||
}
|
||||
|
||||
// CountUploadedChunks returns the number of chunks with status "uploaded" for a session.
|
||||
func (s *UploadStore) CountUploadedChunks(ctx context.Context, sessionID int64) (int, error) {
|
||||
var count int64
|
||||
err := s.db.WithContext(ctx).Model(&model.UploadChunk{}).
|
||||
Where("session_id = ? AND status = ?", sessionID, "uploaded").
|
||||
Count(&count).Error
|
||||
return int(count), err
|
||||
}
|
||||
Reference in New Issue
Block a user