17370845950

如何在 Golang 中构建任务优先级调度器_Golang 并发优先队列实现方案
答案:Golang中通过container/heap实现优先队列,结合sync.Mutex和sync.Cond保证并发安全,使用goroutine执行任务,构建高效优先级调度器,适用于消息系统等场景。

在 Golang 中构建任务优先级调度器,核心是实现一个并发安全的优先队列,并结合 goroutine 和 channel 进行任务调度。优先级调度器适用于需要按紧急程度或重要性处理任务的场景,比如消息系统、作业调度平台、后台任务处理等。

优先队列的数据结构设计

优先队列通常基于最小堆或最大堆实现。Golang 标准库 container/heap 提供了堆操作接口,我们可以结合它来构建带优先级的任务队列。

定义任务结构体,包含执行函数和优先级字段:

type Task struct {
    Priority int
    ExecFunc func()
}

// 任务切片,实现 heap.Interface
type PriorityQueue []*Task

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
    // 最大堆:高优先级(数值小)排前面,若需低数值优先,则用 <
    return pq[i].Priority < pq[j].Priority
}

func (pq PriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
    *pq = append(*pq, x.(*Task))
}

func (pq *PriorityQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    item := old[n-1]
    *pq = old[0 : n-1]
    return item
}

并发安全的调度器封装

多个 goroutine 可能同时添加任务或调度执行,因此需要用互斥锁保护优先队列。调度器启动固定数量的工作协程,从队列中取出最高优先级任务执行。

type Scheduler struct {
    tasks   *PriorityQueue
    lock    sync.Mutex
    cond    *sync.Cond
    running bool
}

func NewScheduler() *Scheduler {
    pq := &PriorityQueue{}
    heap.Init(pq)
    s := &Scheduler{
        tasks:   pq,
        running: false,
    }
    s.cond = sync.NewCond(&s.lock)
    return s
}

func (s *Scheduler) AddTask(priority int, exec func()) {
    s.lock.Lock()
    defer s.lock.Unlock()
    heap.Push(s.tasks, &Task{Priority: priority, ExecFunc: exec})
    s.cond.Signal() // 唤醒等待的 worker
}

调度器启动后,worker 循环等待新任务:

func (s *Scheduler) Start(workers int) {
    s.running = true
    for i := 0; i < workers; i++ {
        go s.worker()
    }
}

func (s *Scheduler) worker() {
    for {
        s.lock.Lock()
        for s.tasks.Len() == 0 && s.running {
            s.cond.Wait() // 阻塞等待新任务
        }
        
        if !s.running && s.tasks.Len() == 0 {
            s.lock.Unlock()
            return
        }

        task := heap.Pop(s.tasks).(*Task)
        s.lock.Unlock()

        task.ExecFunc() // 执行任务
    }
}

func (s *Scheduler) Stop() {
    s.lock.Lock()
    s.running = false
    s.cond.Broadcast() // 唤醒所有 worker 退出
    s.lock.Unlock()
}

使用示例与性能考量

以下是一个简单使用示例,提交不同优先级的任务并观察执行顺序:

func main() {
    scheduler := NewScheduler()
    defer scheduler.Stop()

    scheduler.Start(2)

    // 提交任务:优先级越小,越早执行
    scheduler.AddTask(3, func() { fmt.Println("Low priority task") })
    scheduler.AddTask(1, func() { fmt.Println("High priority task") })
    scheduler.AddTask(2, func() { fmt.Println("Medium priority task") })

    time.Sleep(time.Second)
}

输出预期:

High priority task
Medium priority task
Low priority task

性能优化建议:

  • 避免频繁加锁:可考虑使用无锁数据结构或分片队列提升高并发吞吐量
  • 任务过多时,定期清理过期任务或支持超时丢弃
  • 可扩展支持周期性任务、延迟执行等高级特性
基本上就这些。通过 container/heap + sync.Cond + goroutine 协作,可以高效实现一个轻量级优先级调度器,适合大多数并发任务调度需求。