1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| package nsqd
//优先队列、小顶堆 type inFlightPqueue []*Message
func newInFlightPqueue(capacity int) inFlightPqueue { return make(inFlightPqueue, 0, capacity) }
//元素交换 func (pq inFlightPqueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j }
//向最小堆插入新值 func (pq *inFlightPqueue) Push(x *Message) { n := len(*pq) c := cap(*pq)
//如果队列长度达到slice cap,进行扩容 if n+1 > c { //cap * 2 npq := make(inFlightPqueue, n, c*2) copy(npq, *pq) *pq = npq } //队列length + 1 *pq = (*pq)[0 : n+1]
//追加到堆尾 x.index = n (*pq)[n] = x
//上浮 pq.up(n) }
//推出堆顶元素,即最小元素 func (pq *inFlightPqueue) Pop() *Message { n := len(*pq) c := cap(*pq) //把堆顶元素和堆尾元素互换 pq.Swap(0, n-1) //新的堆顶元素下沉重构小顶堆 pq.down(0, n-1) //如果新的队列长度小于容量的一半,进行缩容(容量小于25的队列不缩容) if n < (c/2) && c > 25 { npq := make(inFlightPqueue, n, c/2) copy(npq, *pq) *pq = npq } //被置换到堆尾元素的最小元素作为返回值 x := (*pq)[n-1] x.index = -1 //队列长度-1 *pq = (*pq)[0 : n-1] return x }
// 移除指定元素 // 思路:把元素和堆尾元素交换,再重构除堆尾元素外的小顶堆 func (pq *inFlightPqueue) Remove(i int) *Message { n := len(*pq) if n-1 != i { pq.Swap(i, n-1) pq.down(i, n-1) pq.up(i) } x := (*pq)[n-1] x.index = -1 *pq = (*pq)[0 : n-1] return x }
//将入参和堆最小元素比较,小于则返回两者差值,大于则进行pop操作 func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { if len(*pq) == 0 { return nil, 0 }
x := (*pq)[0] if x.pri > max { return nil, x.pri - max } pq.Pop()
return x, 0 }
//节点上浮 func (pq *inFlightPqueue) up(j int) { for { //获取父节点索引 i := (j - 1) / 2 // parent //节点关键字大小比较,子更小,则上浮,否则结束循环 if i == j || (*pq)[j].pri >= (*pq)[i].pri { break } pq.Swap(i, j) j = i } }
//节点下沉 func (pq *inFlightPqueue) down(i, n int) { for { //左子节点索引 j1 := 2*i + 1 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow break } j := j1 // left child
//左子节点和右子节点关键字值做比较,选出最小子节点 if j2 := j1 + 1; j2 < n && (*pq)[j1].pri >= (*pq)[j2].pri { j = j2 // = 2*i + 2 // right child } //节点和最小子节点比较,子节点更小则进行位置交换,否则结束循环 if (*pq)[j].pri >= (*pq)[i].pri { break } pq.Swap(i, j) i = j } }
|