From 4d85ba72597c436161091447346729e4d8eefb50 Mon Sep 17 00:00:00 2001 From: ken Date: Sun, 6 Apr 2025 07:18:00 +0800 Subject: [PATCH] taskPool loop --- src/workflow/taskPool.go | 57 +++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/workflow/taskPool.go b/src/workflow/taskPool.go index 72e6f3e..f4a6c8b 100644 --- a/src/workflow/taskPool.go +++ b/src/workflow/taskPool.go @@ -30,7 +30,7 @@ func InitTaskPool(ctx context.Context) { for model, g := range llm.LLMGroups { pool := newTaskPool(ctx, g) TaskPools[model] = pool - go pool.loop() + pool.startloop() } }) } @@ -38,8 +38,8 @@ func InitTaskPool(ctx context.Context) { func newTaskPool(ctx context.Context, g *llm.LLMGroup) *TaskPool { var queueCap, workers int if g.IsLocal() { - queueCap = 100*g.Cap() - workers = 100 + queueCap = 20*g.Cap() + workers = 20 } else { workers = 500 queueCap = 10_000*g.Cap() @@ -72,28 +72,37 @@ func (t *TaskPool) remove(client common.WsClient) { } -func (t *TaskPool) loop() { +func (t *TaskPool) startloop() { done := t.ctx.Done() tick := time.NewTicker(PendingTimeOut) - var task *Task - for { - select { - case <-done: return - case <-tick.C: - // 清除pending太久的task - t.queue.RemoveTimeout(PendingTimeOut) - log.Info("[TaskPool] stat", "queueLen", t.queue.Len()) - default: + go func(){ + var task *Task + for { + task = t.queue.WaitForNew(t.ctx) + select { + case <-done: return + case <-task.ctx.Done(): + task.pool.remove(task.client) + case t.sem <- struct{}{}: + go task.do() + } + } + }() + go func(){ + for { + select { + case <-done: return + case <-tick.C: + t.stat() + default: + } + } + }() +} - task = t.queue.WaitForNew(t.ctx) - select { - case <-done: return - case <-task.ctx.Done(): - task.pool.remove(task.client) - case t.sem <- struct{}{}: - go task.do() - } - - } -} \ No newline at end of file +func (t *TaskPool) stat() { + // 清除pending太久的task + t.queue.RemoveTimeout(PendingTimeOut) + log.Info("[TaskPool] stat", "queueLen", t.queue.Len()) +}