Add BlobStore (ref counting), FileStore (soft delete + pagination), FolderStore (materialized path), UploadStore (idempotent upsert), and update AutoMigrate. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
118 lines
4.0 KiB
Go
118 lines
4.0 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"gcy_hpc_server/internal/model"
|
|
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
// UploadStore manages upload sessions and chunks with idempotent upsert support.
|
|
type UploadStore struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
// NewUploadStore creates a new UploadStore.
|
|
func NewUploadStore(db *gorm.DB) *UploadStore {
|
|
return &UploadStore{db: db}
|
|
}
|
|
|
|
// CreateSession inserts a new upload session.
|
|
func (s *UploadStore) CreateSession(ctx context.Context, session *model.UploadSession) error {
|
|
return s.db.WithContext(ctx).Create(session).Error
|
|
}
|
|
|
|
// GetSession returns an upload session by ID. Returns (nil, nil) if not found.
|
|
func (s *UploadStore) GetSession(ctx context.Context, id int64) (*model.UploadSession, error) {
|
|
var session model.UploadSession
|
|
err := s.db.WithContext(ctx).First(&session, id).Error
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &session, nil
|
|
}
|
|
|
|
// GetSessionWithChunks returns an upload session along with its chunks ordered by chunk_index.
|
|
func (s *UploadStore) GetSessionWithChunks(ctx context.Context, id int64) (*model.UploadSession, []model.UploadChunk, error) {
|
|
session, err := s.GetSession(ctx, id)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if session == nil {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
var chunks []model.UploadChunk
|
|
if err := s.db.WithContext(ctx).Where("session_id = ?", id).Order("chunk_index ASC").Find(&chunks).Error; err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return session, chunks, nil
|
|
}
|
|
|
|
// UpdateSessionStatus updates the status field of an upload session.
|
|
// Returns gorm.ErrRecordNotFound if no row was affected.
|
|
func (s *UploadStore) UpdateSessionStatus(ctx context.Context, id int64, status string) error {
|
|
result := s.db.WithContext(ctx).Model(&model.UploadSession{}).Where("id = ?", id).Update("status", status)
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return gorm.ErrRecordNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListExpiredSessions returns sessions that are not in a terminal state and have expired.
|
|
func (s *UploadStore) ListExpiredSessions(ctx context.Context) ([]model.UploadSession, error) {
|
|
var sessions []model.UploadSession
|
|
err := s.db.WithContext(ctx).
|
|
Where("status NOT IN ?", []string{"completed", "cancelled", "expired"}).
|
|
Where("expires_at < ?", time.Now()).
|
|
Find(&sessions).Error
|
|
return sessions, err
|
|
}
|
|
|
|
// DeleteSession removes all chunks for a session, then the session itself.
|
|
func (s *UploadStore) DeleteSession(ctx context.Context, id int64) error {
|
|
if err := s.db.WithContext(ctx).Where("session_id = ?", id).Delete(&model.UploadChunk{}).Error; err != nil {
|
|
return fmt.Errorf("delete chunks: %w", err)
|
|
}
|
|
result := s.db.WithContext(ctx).Delete(&model.UploadSession{}, id)
|
|
return result.Error
|
|
}
|
|
|
|
// UpsertChunk inserts a chunk or updates it if the (session_id, chunk_index) pair already exists.
|
|
// Uses GORM clause.OnConflict for dialect-neutral upsert (works with both SQLite and MySQL).
|
|
func (s *UploadStore) UpsertChunk(ctx context.Context, chunk *model.UploadChunk) error {
|
|
return s.db.WithContext(ctx).Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "session_id"}, {Name: "chunk_index"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"minio_key", "sha256", "size", "status", "updated_at"}),
|
|
}).Create(chunk).Error
|
|
}
|
|
|
|
// GetUploadedChunkIndices returns the chunk indices that have been successfully uploaded.
|
|
func (s *UploadStore) GetUploadedChunkIndices(ctx context.Context, sessionID int64) ([]int, error) {
|
|
var indices []int
|
|
err := s.db.WithContext(ctx).Model(&model.UploadChunk{}).
|
|
Where("session_id = ? AND status = ?", sessionID, "uploaded").
|
|
Pluck("chunk_index", &indices).Error
|
|
return indices, err
|
|
}
|
|
|
|
// CountUploadedChunks returns the number of chunks with status "uploaded" for a session.
|
|
func (s *UploadStore) CountUploadedChunks(ctx context.Context, sessionID int64) (int, error) {
|
|
var count int64
|
|
err := s.db.WithContext(ctx).Model(&model.UploadChunk{}).
|
|
Where("session_id = ? AND status = ?", sessionID, "uploaded").
|
|
Count(&count).Error
|
|
return int(count), err
|
|
}
|