feat(storage): add ObjectStorage interface and MinIO client
Add ObjectStorage interface (11 methods) with MinioClient implementation using minio-go Core. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
286
internal/storage/minio.go
Normal file
286
internal/storage/minio.go
Normal file
@@ -0,0 +1,286 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"gcy_hpc_server/internal/config"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
// ObjectInfo contains metadata about a stored object.
|
||||
type ObjectInfo struct {
|
||||
Key string
|
||||
Size int64
|
||||
LastModified time.Time
|
||||
ETag string
|
||||
ContentType string
|
||||
}
|
||||
|
||||
// UploadInfo contains metadata about an uploaded object.
|
||||
type UploadInfo struct {
|
||||
ETag string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// GetOptions specifies parameters for GetObject, including optional Range.
|
||||
type GetOptions struct {
|
||||
Start *int64 // Range start byte offset (nil = no range)
|
||||
End *int64 // Range end byte offset (nil = no range)
|
||||
}
|
||||
|
||||
// MultipartUpload represents an incomplete multipart upload.
|
||||
type MultipartUpload struct {
|
||||
ObjectName string
|
||||
UploadID string
|
||||
Initiated time.Time
|
||||
}
|
||||
|
||||
// RemoveObjectsOptions specifies options for removing multiple objects.
|
||||
type RemoveObjectsOptions struct {
|
||||
ForceDelete bool
|
||||
}
|
||||
|
||||
// PutObjectOptions for PutObject.
|
||||
type PutObjectOptions struct {
|
||||
ContentType string
|
||||
DisableMultipart bool // true for small chunks (already pre-split)
|
||||
}
|
||||
|
||||
// RemoveObjectOptions for RemoveObject.
|
||||
type RemoveObjectOptions struct {
|
||||
ForceDelete bool
|
||||
}
|
||||
|
||||
// MakeBucketOptions for MakeBucket.
|
||||
type MakeBucketOptions struct {
|
||||
Region string
|
||||
}
|
||||
|
||||
// StatObjectOptions for StatObject.
|
||||
type StatObjectOptions struct{}
|
||||
|
||||
// ObjectStorage defines the interface for object storage operations.
|
||||
// Implementations should wrap MinIO SDK calls with custom transfer types.
|
||||
type ObjectStorage interface {
|
||||
// PutObject uploads an object from a reader.
|
||||
PutObject(ctx context.Context, bucket, key string, reader io.Reader, size int64, opts PutObjectOptions) (UploadInfo, error)
|
||||
|
||||
// GetObject retrieves an object. opts may contain Range parameters.
|
||||
GetObject(ctx context.Context, bucket, key string, opts GetOptions) (io.ReadCloser, ObjectInfo, error)
|
||||
|
||||
// ComposeObject merges multiple source objects into a single destination.
|
||||
ComposeObject(ctx context.Context, bucket, dst string, sources []string) (UploadInfo, error)
|
||||
|
||||
// AbortMultipartUpload aborts an incomplete multipart upload by upload ID.
|
||||
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
|
||||
|
||||
// RemoveIncompleteUpload removes all incomplete uploads for an object.
|
||||
// This is the preferred cleanup method — it encapsulates list + abort logic.
|
||||
RemoveIncompleteUpload(ctx context.Context, bucket, object string) error
|
||||
|
||||
// RemoveObject deletes a single object.
|
||||
RemoveObject(ctx context.Context, bucket, key string, opts RemoveObjectOptions) error
|
||||
|
||||
// ListObjects lists objects with a given prefix.
|
||||
ListObjects(ctx context.Context, bucket, prefix string, recursive bool) ([]ObjectInfo, error)
|
||||
|
||||
// RemoveObjects deletes multiple objects.
|
||||
RemoveObjects(ctx context.Context, bucket string, keys []string, opts RemoveObjectsOptions) error
|
||||
|
||||
// BucketExists checks if a bucket exists.
|
||||
BucketExists(ctx context.Context, bucket string) (bool, error)
|
||||
|
||||
// MakeBucket creates a new bucket.
|
||||
MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error
|
||||
|
||||
// StatObject gets metadata about an object without downloading it.
|
||||
StatObject(ctx context.Context, bucket, key string, opts StatObjectOptions) (ObjectInfo, error)
|
||||
}
|
||||
|
||||
var _ ObjectStorage = (*MinioClient)(nil)
|
||||
|
||||
type MinioClient struct {
|
||||
core *minio.Core
|
||||
bucket string
|
||||
}
|
||||
|
||||
func NewMinioClient(cfg config.MinioConfig) (*MinioClient, error) {
|
||||
transport, err := minio.DefaultTransport(cfg.UseSSL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create default transport: %w", err)
|
||||
}
|
||||
transport.MaxIdleConnsPerHost = 100
|
||||
transport.IdleConnTimeout = 90 * time.Second
|
||||
|
||||
core, err := minio.NewCore(cfg.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(cfg.AccessKey, cfg.SecretKey, ""),
|
||||
Secure: cfg.UseSSL,
|
||||
Transport: transport,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create minio client: %w", err)
|
||||
}
|
||||
|
||||
mc := &MinioClient{core: core, bucket: cfg.Bucket}
|
||||
|
||||
ctx := context.Background()
|
||||
exists, err := core.BucketExists(ctx, cfg.Bucket)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("check bucket: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
if err := core.MakeBucket(ctx, cfg.Bucket, minio.MakeBucketOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) PutObject(ctx context.Context, bucket, key string, reader io.Reader, size int64, opts PutObjectOptions) (UploadInfo, error) {
|
||||
info, err := m.core.PutObject(ctx, bucket, key, reader, size, "", "", minio.PutObjectOptions{
|
||||
ContentType: opts.ContentType,
|
||||
DisableMultipart: opts.DisableMultipart,
|
||||
})
|
||||
if err != nil {
|
||||
return UploadInfo{}, fmt.Errorf("put object %s/%s: %w", bucket, key, err)
|
||||
}
|
||||
return UploadInfo{ETag: info.ETag, Size: info.Size}, nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) GetObject(ctx context.Context, bucket, key string, opts GetOptions) (io.ReadCloser, ObjectInfo, error) {
|
||||
var gopts minio.GetObjectOptions
|
||||
if opts.Start != nil || opts.End != nil {
|
||||
start := int64(0)
|
||||
end := int64(0)
|
||||
if opts.Start != nil {
|
||||
start = *opts.Start
|
||||
}
|
||||
if opts.End != nil {
|
||||
end = *opts.End
|
||||
}
|
||||
if err := gopts.SetRange(start, end); err != nil {
|
||||
return nil, ObjectInfo{}, fmt.Errorf("set range: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
body, info, _, err := m.core.GetObject(ctx, bucket, key, gopts)
|
||||
if err != nil {
|
||||
return nil, ObjectInfo{}, fmt.Errorf("get object %s/%s: %w", bucket, key, err)
|
||||
}
|
||||
return body, toObjectInfo(info), nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) ComposeObject(ctx context.Context, bucket, dst string, sources []string) (UploadInfo, error) {
|
||||
srcs := make([]minio.CopySrcOptions, len(sources))
|
||||
for i, src := range sources {
|
||||
srcs[i] = minio.CopySrcOptions{Bucket: bucket, Object: src}
|
||||
}
|
||||
|
||||
do := minio.CopyDestOptions{
|
||||
Bucket: bucket,
|
||||
Object: dst,
|
||||
ReplaceMetadata: true,
|
||||
}
|
||||
|
||||
info, err := m.core.ComposeObject(ctx, do, srcs...)
|
||||
if err != nil {
|
||||
return UploadInfo{}, fmt.Errorf("compose object %s/%s: %w", bucket, dst, err)
|
||||
}
|
||||
return UploadInfo{ETag: info.ETag, Size: info.Size}, nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
|
||||
if err := m.core.AbortMultipartUpload(ctx, bucket, object, uploadID); err != nil {
|
||||
return fmt.Errorf("abort multipart upload %s/%s %s: %w", bucket, object, uploadID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) RemoveIncompleteUpload(ctx context.Context, bucket, object string) error {
|
||||
if err := m.core.RemoveIncompleteUpload(ctx, bucket, object); err != nil {
|
||||
return fmt.Errorf("remove incomplete upload %s/%s: %w", bucket, object, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) RemoveObject(ctx context.Context, bucket, key string, opts RemoveObjectOptions) error {
|
||||
if err := m.core.RemoveObject(ctx, bucket, key, minio.RemoveObjectOptions{
|
||||
ForceDelete: opts.ForceDelete,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("remove object %s/%s: %w", bucket, key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) ListObjects(ctx context.Context, bucket, prefix string, recursive bool) ([]ObjectInfo, error) {
|
||||
ch := m.core.Client.ListObjects(ctx, bucket, minio.ListObjectsOptions{
|
||||
Prefix: prefix,
|
||||
Recursive: recursive,
|
||||
})
|
||||
|
||||
var result []ObjectInfo
|
||||
for obj := range ch {
|
||||
if obj.Err != nil {
|
||||
return result, fmt.Errorf("list objects %s/%s: %w", bucket, prefix, obj.Err)
|
||||
}
|
||||
result = append(result, toObjectInfo(obj))
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) RemoveObjects(ctx context.Context, bucket string, keys []string, opts RemoveObjectsOptions) error {
|
||||
objectsCh := make(chan minio.ObjectInfo, len(keys))
|
||||
for _, key := range keys {
|
||||
objectsCh <- minio.ObjectInfo{Key: key}
|
||||
}
|
||||
close(objectsCh)
|
||||
|
||||
errCh := m.core.RemoveObjects(ctx, bucket, objectsCh, minio.RemoveObjectsOptions{})
|
||||
|
||||
for err := range errCh {
|
||||
if err.Err != nil {
|
||||
return fmt.Errorf("remove object %s: %w", err.ObjectName, err.Err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) BucketExists(ctx context.Context, bucket string) (bool, error) {
|
||||
ok, err := m.core.BucketExists(ctx, bucket)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("bucket exists %s: %w", bucket, err)
|
||||
}
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
if err := m.core.MakeBucket(ctx, bucket, minio.MakeBucketOptions{
|
||||
Region: opts.Region,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("make bucket %s: %w", bucket, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinioClient) StatObject(ctx context.Context, bucket, key string, _ StatObjectOptions) (ObjectInfo, error) {
|
||||
info, err := m.core.StatObject(ctx, bucket, key, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
return ObjectInfo{}, fmt.Errorf("stat object %s/%s: %w", bucket, key, err)
|
||||
}
|
||||
return toObjectInfo(info), nil
|
||||
}
|
||||
|
||||
func toObjectInfo(info minio.ObjectInfo) ObjectInfo {
|
||||
return ObjectInfo{
|
||||
Key: info.Key,
|
||||
Size: info.Size,
|
||||
LastModified: info.LastModified,
|
||||
ETag: info.ETag,
|
||||
ContentType: info.ContentType,
|
||||
}
|
||||
}
|
||||
7
internal/storage/minio_test.go
Normal file
7
internal/storage/minio_test.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package storage
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestMinioClientImplementsObjectStorage(t *testing.T) {
|
||||
var _ ObjectStorage = (*MinioClient)(nil)
|
||||
}
|
||||
Reference in New Issue
Block a user