0%

一致性哈希

由于hash算法的均匀性,可以用hash(object) % N得到数据的存储位置,使其平均存储到N个节点上。如果每个数据的访问量比较平均,负载自然也是均衡的。这样的问题在于分布式系统的节点数目通常是在动态变化的, 因此传统的哈希会有大量的数据迁移, 可扩展性很差.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"crypto/md5"
"strconv"
"encoding/hex"
)

func main() {
NODE_NUMBER := 100
DATA_MAX_NUMBER := 10000000
cnts := make([]int, NODE_NUMBER)
for i := 0; i < DATA_MAX_NUMBER; i++ {
data := md5.Sum([]byte(strconv.Itoa(i)))
dataStr := hex.EncodeToString(data[0:4])
hash, err := strconv.ParseInt(dataStr, 16, 64)
if err == nil {
id := hash % int64(NODE_NUMBER)
cnts[id]++
}
}
desiredLoad := DATA_MAX_NUMBER / NODE_NUMBER
maxLoad := cnts[0]
minLoad := cnts[0]
for i := 1; i < NODE_NUMBER; i++ {
if cnts[i] > maxLoad {
maxLoad = cnts[i]
}
if cnts[i] < minLoad {
minLoad = cnts[i]
}
}
maxPercent := 100.0 * float64(maxLoad - desiredLoad) / float64(desiredLoad)
minPercent := 100.0 * float64(minLoad - desiredLoad) / float64(desiredLoad)
fmt.Printf("Desired loading: %d\n", desiredLoad)
fmt.Printf("Most loading in one node: %d %+.4f%%\n", maxLoad, maxPercent)
fmt.Printf("Least loading in one node: %d %+.4f%%\n", minLoad, minPercent)
}
1
2
3
Desired loading: 100000
Most loading in one node: 100695 +0.6950%
Least loading in one node: 99073 -0.9270%

可以看到负载整体上是比较均衡的,此时如果增加一个节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"crypto/md5"
"strconv"
"encoding/hex"
)

func main() {
NODE_NUMBER := 100
NEW_NODE_NUMBER := 101
DATA_MAX_NUMBER := 10000000
moveCnt := 0
for i := 0; i < DATA_MAX_NUMBER; i++ {
data := md5.Sum([]byte(strconv.Itoa(i)))
dataStr := hex.EncodeToString(data[0:4])
hash, err := strconv.ParseInt(dataStr, 16, 64)
if err == nil {
id := hash % int64(NODE_NUMBER)
newid := hash % int64(NEW_NODE_NUMBER)
if id != newid {
moveCnt++
}
}
}
movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER)
fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent)
}

1
Moved items: 9900989 +99.0099%

可以看到:只增加了一个节点,竟然需要移动99%的数据,这是完全不能接受的!一致性Hash主要用来解决服务器的负载均衡问题. 一致性hash不仅需要计算存储对象的hash值, 还要计算每个节点的hash值hash(name/IP), 并将其分配到圆环区间\([0,2^{32}-1]\). 对于存储请求, 将hash(obj)利用二分法顺时针分配到第一个节点, 从而提高单调性, 即增删节点后, 原有数据的哈希结果不迁移或迁移到新节点, 但不会迁移到旧节点. 在Swift云存储系统中, 负责存储对象与物理位置映射关系的组件叫做Ring, 也许就有环hash的意思.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"fmt"
"crypto/md5"
"strconv"
"encoding/hex"
"sort"
)

func main() {
NODE_NUMBER := 100
NEW_NODE_NUMBER := 101
DATA_MAX_NUMBER := 10000000
moveCnt := 0
startDataId := make([]int, 0)
newStartDataId := make([]int, 0)

remainder := DATA_MAX_NUMBER % NODE_NUMBER
base := DATA_MAX_NUMBER / NODE_NUMBER
cnt := 0
for i := 0; i < NODE_NUMBER; i++ {
startDataId = append(startDataId, cnt)
if i < remainder {
cnt = cnt + base + 1
} else {
cnt = cnt + base
}
}

remainder = DATA_MAX_NUMBER % NEW_NODE_NUMBER
base = DATA_MAX_NUMBER / NEW_NODE_NUMBER
cnt = 0
for i := 0; i < NEW_NODE_NUMBER; i++ {
newStartDataId = append(newStartDataId, cnt)
if i < remainder {
cnt = cnt + base + 1
} else {
cnt = cnt + base
}
}

for i := 0; i < DATA_MAX_NUMBER; i++ {
data := md5.Sum([]byte(strconv.Itoa(i)))
dataStr := hex.EncodeToString(data[0:4])
hash64, err := strconv.ParseInt(dataStr, 16, 0)
if err == nil {
hash := int(hash64)
target := hash % DATA_MAX_NUMBER
id := sort.Search(len(startDataId), func(k int) bool { return startDataId[k] >= target })
id = id % NODE_NUMBER

newid := sort.Search(len(newStartDataId), func(k int) bool { return newStartDataId[k] >= target })
newid = newid % NEW_NODE_NUMBER
if id != newid {
moveCnt++
}
}
}
movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER)
fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent)
}
1
Moved items: 4897001 +48.9700%

一致性hash虽然减少了数据迁移, 但是在node较少时仍然要迁移大量数据, 因此引入虚拟节点Partition. 引入Partition后就形成二级映射: obj->虚节点->node. 通过提前设置固定的较大数目的虚节点, 使得obj到虚节点的映射固化, 增删node时只需要维护虚节点和node间的映射即可.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main

import (
"fmt"
"crypto/md5"
"strconv"
"encoding/hex"
"sort"
)

func main() {
NODE_NUMBER := 100
NEW_NODE_NUMBER := 101
VNODE_NUMBER := 1000
DATA_MAX_NUMBER := 10000000
moveCnt := 0
vstartDataId := make([]int, 0)
newStartDataId := make([]int, 0)
vnode2node := make([]int, 0)

remainder := DATA_MAX_NUMBER % VNODE_NUMBER
base := DATA_MAX_NUMBER / VNODE_NUMBER
cnt := 0
for i := 0; i < VNODE_NUMBER; i++ {
vstartDataId = append(vstartDataId, cnt)
if i < remainder {
cnt = cnt + base + 1
} else {
cnt = cnt + base
}
vnode2node = append(vnode2node, i % NODE_NUMBER)
}

new_vnode2node := make([]int, len(vnode2node))
copy(new_vnode2node, vnode2node)

remainder = DATA_MAX_NUMBER % NEW_NODE_NUMBER
base = DATA_MAX_NUMBER / NEW_NODE_NUMBER
cnt = 0
for i := 0; i < NEW_NODE_NUMBER; i++ {
newStartDataId = append(newStartDataId, cnt)
if i < remainder {
cnt = cnt + base + 1
} else {
cnt = cnt + base
}
}

newNodeID := 100
reassignNumber := VNODE_NUMBER / NEW_NODE_NUMBER
for i := 0; i < reassignNumber; i++ {
vid := i
new_vnode2node[vid] = newNodeID
}

for i := 0; i < DATA_MAX_NUMBER; i++ {
data := md5.Sum([]byte(strconv.Itoa(i)))
dataStr := hex.EncodeToString(data[0:4])
hash64, err := strconv.ParseInt(dataStr, 16, 0)
if err == nil {
hash := int(hash64)
// 不用二分,数据项到虚节点映射固定:vid := hash % VNODE_NUMBER
target := hash % DATA_MAX_NUMBER
vid := sort.Search(len(vstartDataId), func(k int) bool { return vstartDataId[k] >= target })
vid = vid % VNODE_NUMBER

id := vnode2node[vid]

newid := new_vnode2node[vid]
if id != newid {
moveCnt++
}
}
}
movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER)
fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent)
}
1
Moved items: 90423 +0.9042%

上述方案中需要预设合理的虚节点数目, 如果实际节点多于虚节点数目,会导致有的节点分配不到虚节点,此时如果增加虚节点数,就会导致大量的数据迁移。因此通常需要将虚节点数目预设为系统最大规模的若干倍。

为了加快速度,还可以通过位操作代替取模操作,即所谓的partition power.

ref

Ring实现原理剖析