From 435ab285c117394b8699665e853f086ce0c4894b Mon Sep 17 00:00:00 2001 From: dailz Date: Tue, 21 Apr 2026 17:19:10 +0800 Subject: [PATCH] fix(task): prevent RecoverStuckTasks from re-enqueueing in-flight tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RecoverStuckTasks scans for tasks with updated_at > 5min ago and re-enqueues them. This incorrectly matched tasks actively being processed by the worker (e.g. slow downloads), causing double-processing. Add inflight sync.Map to track taskIDs currently inside ProcessTask. RecoverStuckTasks skips tasks found in inflight. On server restart inflight is empty (in-memory), so genuinely stuck tasks are still correctly recovered. Also: increase taskCh buffer 16→10000, add periodic RecoverStuckTasks goroutine in TaskPoller (every 5min), and add status guard in ProcessTask as defense-in-depth against duplicate enqueues. --- internal/app/task_poller.go | 28 +++++++++-- internal/app/task_poller_test.go | 2 + internal/service/task_service.go | 80 ++++++++++++++++++++++++++++++-- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/internal/app/task_poller.go b/internal/app/task_poller.go index 4c2c313..8c2aa98 100644 --- a/internal/app/task_poller.go +++ b/internal/app/task_poller.go @@ -8,12 +8,15 @@ import ( "go.uber.org/zap" ) -// TaskPollable defines the interface for refreshing stale task statuses. +// TaskPollable defines the interface for refreshing stale task statuses +// and recovering stuck tasks. type TaskPollable interface { RefreshStaleTasks(ctx context.Context) error + RecoverStuckTasks(ctx context.Context) } -// TaskPoller periodically polls Slurm for task status updates via TaskPollable. +// TaskPoller periodically polls Slurm for task status updates and recovers +// stuck tasks via TaskPollable. type TaskPoller struct { taskSvc TaskPollable interval time.Duration @@ -31,9 +34,11 @@ func NewTaskPoller(taskSvc TaskPollable, interval time.Duration, logger *zap.Log } } -// Start launches the background goroutine that periodically refreshes stale tasks. +// Start launches background goroutines that periodically refresh stale tasks +// and recover stuck tasks. func (p *TaskPoller) Start(ctx context.Context) { ctx, p.cancel = context.WithCancel(ctx) + p.wg.Add(1) go func() { defer p.wg.Done() @@ -50,9 +55,24 @@ func (p *TaskPoller) Start(ctx context.Context) { } } }() + + p.wg.Add(1) + go func() { + defer p.wg.Done() + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + p.taskSvc.RecoverStuckTasks(ctx) + } + } + }() } -// Stop cancels the background goroutine and waits for it to finish. +// Stop cancels the background goroutines and waits for them to finish. func (p *TaskPoller) Stop() { if p.cancel != nil { p.cancel() diff --git a/internal/app/task_poller_test.go b/internal/app/task_poller_test.go index 9786c19..22de17c 100644 --- a/internal/app/task_poller_test.go +++ b/internal/app/task_poller_test.go @@ -25,6 +25,8 @@ func (m *mockTaskPollable) RefreshStaleTasks(ctx context.Context) error { return nil } +func (m *mockTaskPollable) RecoverStuckTasks(ctx context.Context) {} + func (m *mockTaskPollable) getCallCount() int { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/service/task_service.go b/internal/service/task_service.go index d40e7e3..c922a08 100644 --- a/internal/service/task_service.go +++ b/internal/service/task_service.go @@ -29,12 +29,29 @@ type TaskService struct { logger *zap.Logger // async processing - taskCh chan int64 // buffered channel, cap=16 + taskCh chan int64 // buffered channel for task IDs awaiting processing cancelFn context.CancelFunc wg sync.WaitGroup mu sync.Mutex // protects taskCh from send-on-closed started bool // prevent double-start stopped bool + // inflight tracks task IDs currently being processed by the worker goroutine. + // + // Why it exists: taskCh is an in-memory Go channel — all pending taskIDs are + // lost when the server restarts. RecoverStuckTasks is responsible for + // recovering those lost tasks from the DB. However, GetStuckTasks uses a + // broad query (status NOT IN completed/failed AND updated_at < 5min ago) that + // also matches tasks being actively processed by the worker (e.g. a slow + // download). Without inflight, RecoverStuckTasks would reset those tasks to + // "submitted" and re-enqueue them, causing double-processing. + // + // How it works: + // - ProcessTask stores the taskID on entry, deletes on exit (via defer). + // - RecoverStuckTasks checks inflight before re-enqueueing; in-flight tasks + // are skipped. + // - On server restart inflight is empty (in-memory), so all genuinely stuck + // tasks are correctly recovered without false negatives. + inflight sync.Map // map[int64]struct{} } func NewTaskService( @@ -56,7 +73,7 @@ func NewTaskService( jobSvc: jobSvc, workDirBase: workDirBase, logger: logger, - taskCh: make(chan int64, 16), + taskCh: make(chan int64, 10000), } } @@ -169,6 +186,9 @@ func (s *TaskService) CreateTask(ctx context.Context, req *model.CreateTaskReque // ProcessTask runs the full synchronous processing pipeline for a task. func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error { + s.inflight.Store(taskID, struct{}{}) + defer s.inflight.Delete(taskID) + // 1. Fetch task task, err := s.taskStore.GetByID(ctx, taskID) if err != nil { @@ -178,6 +198,24 @@ func (s *TaskService) ProcessTask(ctx context.Context, taskID int64) error { return fmt.Errorf("task %d not found", taskID) } + // Defense-in-depth against duplicate processing. When the same taskID enters + // taskCh multiple times (e.g. submitted normally + RecoverStuckTasks also + // enqueues it before the worker picks up the first copy), the worker processes + // them sequentially. The first invocation changes status from "submitted" to + // "preparing"; the second invocation reads the latest DB status, sees + // non-submitted, and safely skips. + // + // This does NOT block retries: processWithRetry sets status back to "submitted" + // before re-enqueueing, so the retried invocation passes this check and + // continues from the saved currentStep. + if task.Status != model.TaskStatusSubmitted { + s.logger.Debug("skipping task with non-submitted status", + zap.Int64("task_id", taskID), + zap.String("status", string(task.Status)), + ) + return nil + } + fail := func(step, msg string) error { _ = s.taskStore.UpdateStatus(ctx, taskID, model.TaskStatusFailed, msg) _ = s.taskStore.UpdateRetryState(ctx, taskID, model.TaskStatusFailed, step, task.RetryCount) @@ -664,7 +702,7 @@ func (s *TaskService) StopProcessor() { s.mu.Lock() drainCh := s.taskCh - s.taskCh = make(chan int64, 16) + s.taskCh = make(chan int64, 10000) s.mu.Unlock() for taskID := range drainCh { @@ -698,6 +736,36 @@ func (s *TaskService) processWithRetry(ctx context.Context, taskID int64) { } func (s *TaskService) RecoverStuckTasks(ctx context.Context) { + // RecoverStuckTasks recovers tasks that are "stuck" — they exist in the DB + // with a non-terminal status but are not being processed. + // + // Scenarios that create stuck tasks: + // + // 1. Server restart: taskCh is an in-memory Go channel, all pending IDs are + // lost on process exit. Tasks that were queued but never picked up by the + // worker remain in "submitted" status in DB with no one to process them. + // + // 2. Server crash mid-processing: the worker had advanced a task to + // "preparing"/"downloading" and then died. The task sits in that + // intermediate state with no SlurmJobID and no worker to continue. + // + // 3. Channel full: SubmitAsync dropped a task because taskCh was at + // capacity. The task stays "submitted" but was never enqueued. + // + // The bug this fix addresses: + // + // GetStuckTasks queries: status NOT IN (completed, failed) AND updated_at < + // 5min ago. This also matches tasks currently being processed by the worker + // whose step is slow (>5 min, e.g. downloading large files) and hasn't + // refreshed updated_at. Without the inflight check below, this function + // would reset such a task to "submitted" and re-enqueue it, causing the + // same task to be processed by two concurrent invocations of ProcessTask. + // + // Fix: the inflight sync.Map tracks taskIDs currently inside ProcessTask. + // Tasks found in inflight are skipped here. On server restart inflight is + // empty (it's in-memory), so all genuinely stuck tasks from scenarios 1-3 + // above are correctly recovered. + tasks, err := s.taskStore.GetStuckTasks(ctx, 5*time.Minute) if err != nil { s.logger.Error("failed to get stuck tasks", zap.Error(err)) @@ -711,6 +779,12 @@ func (s *TaskService) RecoverStuckTasks(ctx context.Context) { ) continue } + if _, ok := s.inflight.Load(tasks[i].ID); ok { + s.logger.Debug("skipping in-flight task", + zap.Int64("taskID", tasks[i].ID), + ) + continue + } _ = s.taskStore.UpdateStatus(ctx, tasks[i].ID, model.TaskStatusSubmitted, "") s.mu.Lock() if !s.stopped {