taskPool loop
This commit is contained in:
parent
b5ca131619
commit
4d85ba7259
@ -30,7 +30,7 @@ func InitTaskPool(ctx context.Context) {
|
|||||||
for model, g := range llm.LLMGroups {
|
for model, g := range llm.LLMGroups {
|
||||||
pool := newTaskPool(ctx, g)
|
pool := newTaskPool(ctx, g)
|
||||||
TaskPools[model] = pool
|
TaskPools[model] = pool
|
||||||
go pool.loop()
|
pool.startloop()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -38,8 +38,8 @@ func InitTaskPool(ctx context.Context) {
|
|||||||
func newTaskPool(ctx context.Context, g *llm.LLMGroup) *TaskPool {
|
func newTaskPool(ctx context.Context, g *llm.LLMGroup) *TaskPool {
|
||||||
var queueCap, workers int
|
var queueCap, workers int
|
||||||
if g.IsLocal() {
|
if g.IsLocal() {
|
||||||
queueCap = 100*g.Cap()
|
queueCap = 20*g.Cap()
|
||||||
workers = 100
|
workers = 20
|
||||||
} else {
|
} else {
|
||||||
workers = 500
|
workers = 500
|
||||||
queueCap = 10_000*g.Cap()
|
queueCap = 10_000*g.Cap()
|
||||||
@ -72,28 +72,37 @@ func (t *TaskPool) remove(client common.WsClient) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (t *TaskPool) loop() {
|
func (t *TaskPool) startloop() {
|
||||||
done := t.ctx.Done()
|
done := t.ctx.Done()
|
||||||
tick := time.NewTicker(PendingTimeOut)
|
tick := time.NewTicker(PendingTimeOut)
|
||||||
var task *Task
|
go func(){
|
||||||
for {
|
var task *Task
|
||||||
select {
|
for {
|
||||||
case <-done: return
|
task = t.queue.WaitForNew(t.ctx)
|
||||||
case <-tick.C:
|
select {
|
||||||
// 清除pending太久的task
|
case <-done: return
|
||||||
t.queue.RemoveTimeout(PendingTimeOut)
|
case <-task.ctx.Done():
|
||||||
log.Info("[TaskPool] stat", "queueLen", t.queue.Len())
|
task.pool.remove(task.client)
|
||||||
default:
|
case t.sem <- struct{}{}:
|
||||||
|
go task.do()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
go func(){
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done: return
|
||||||
|
case <-tick.C:
|
||||||
|
t.stat()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
task = t.queue.WaitForNew(t.ctx)
|
func (t *TaskPool) stat() {
|
||||||
select {
|
// 清除pending太久的task
|
||||||
case <-done: return
|
t.queue.RemoveTimeout(PendingTimeOut)
|
||||||
case <-task.ctx.Done():
|
log.Info("[TaskPool] stat", "queueLen", t.queue.Len())
|
||||||
task.pool.remove(task.client)
|
}
|
||||||
case t.sem <- struct{}{}:
|
|
||||||
go task.do()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user