由于hash算法的均匀性,可以用hash(object) % N
得到数据的存储位置,使其平均存储到N个节点上。如果每个数据的访问量比较平均,负载自然也是均衡的。这样的问题在于分布式系统的节点数目通常是在动态变化的,
因此传统的哈希会有大量的数据迁移, 可扩展性很差.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package main
import ( "fmt" "crypto/md5" "strconv" "encoding/hex" )
func main() { NODE_NUMBER := 100 DATA_MAX_NUMBER := 10000000 cnts := make([]int, NODE_NUMBER) for i := 0; i < DATA_MAX_NUMBER; i++ { data := md5.Sum([]byte(strconv.Itoa(i))) dataStr := hex.EncodeToString(data[0:4]) hash, err := strconv.ParseInt(dataStr, 16, 64) if err == nil { id := hash % int64(NODE_NUMBER) cnts[id]++ } } desiredLoad := DATA_MAX_NUMBER / NODE_NUMBER maxLoad := cnts[0] minLoad := cnts[0] for i := 1; i < NODE_NUMBER; i++ { if cnts[i] > maxLoad { maxLoad = cnts[i] } if cnts[i] < minLoad { minLoad = cnts[i] } } maxPercent := 100.0 * float64(maxLoad - desiredLoad) / float64(desiredLoad) minPercent := 100.0 * float64(minLoad - desiredLoad) / float64(desiredLoad) fmt.Printf("Desired loading: %d\n", desiredLoad) fmt.Printf("Most loading in one node: %d %+.4f%%\n", maxLoad, maxPercent) fmt.Printf("Least loading in one node: %d %+.4f%%\n", minLoad, minPercent) }
|
1 2 3
| Desired loading: 100000 Most loading in one node: 100695 +0.6950% Least loading in one node: 99073 -0.9270%
|
可以看到负载整体上是比较均衡的,此时如果增加一个节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package main
import ( "fmt" "crypto/md5" "strconv" "encoding/hex" )
func main() { NODE_NUMBER := 100 NEW_NODE_NUMBER := 101 DATA_MAX_NUMBER := 10000000 moveCnt := 0 for i := 0; i < DATA_MAX_NUMBER; i++ { data := md5.Sum([]byte(strconv.Itoa(i))) dataStr := hex.EncodeToString(data[0:4]) hash, err := strconv.ParseInt(dataStr, 16, 64) if err == nil { id := hash % int64(NODE_NUMBER) newid := hash % int64(NEW_NODE_NUMBER) if id != newid { moveCnt++ } } } movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER) fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent) }
|
1
| Moved items: 9900989 +99.0099%
|
可以看到:只增加了一个节点,竟然需要移动99%的数据,这是完全不能接受的!一致性Hash主要用来解决服务器的负载均衡问题.
一致性hash不仅需要计算存储对象的hash值,
还要计算每个节点的hash值hash(name/IP)
,
并将其分配到圆环区间\([0,2^{32}-1]\).
对于存储请求, 将hash(obj)
利用二分法顺时针分配到第一个节点,
从而提高单调性, 即增删节点后,
原有数据的哈希结果不迁移或迁移到新节点, 但不会迁移到旧节点.
在Swift云存储系统中, 负责存储对象与物理位置映射关系的组件叫做Ring,
也许就有环hash的意思.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package main
import ( "fmt" "crypto/md5" "strconv" "encoding/hex" "sort" )
func main() { NODE_NUMBER := 100 NEW_NODE_NUMBER := 101 DATA_MAX_NUMBER := 10000000 moveCnt := 0 startDataId := make([]int, 0) newStartDataId := make([]int, 0)
remainder := DATA_MAX_NUMBER % NODE_NUMBER base := DATA_MAX_NUMBER / NODE_NUMBER cnt := 0 for i := 0; i < NODE_NUMBER; i++ { startDataId = append(startDataId, cnt) if i < remainder { cnt = cnt + base + 1 } else { cnt = cnt + base } }
remainder = DATA_MAX_NUMBER % NEW_NODE_NUMBER base = DATA_MAX_NUMBER / NEW_NODE_NUMBER cnt = 0 for i := 0; i < NEW_NODE_NUMBER; i++ { newStartDataId = append(newStartDataId, cnt) if i < remainder { cnt = cnt + base + 1 } else { cnt = cnt + base } }
for i := 0; i < DATA_MAX_NUMBER; i++ { data := md5.Sum([]byte(strconv.Itoa(i))) dataStr := hex.EncodeToString(data[0:4]) hash64, err := strconv.ParseInt(dataStr, 16, 0) if err == nil { hash := int(hash64) target := hash % DATA_MAX_NUMBER id := sort.Search(len(startDataId), func(k int) bool { return startDataId[k] >= target }) id = id % NODE_NUMBER
newid := sort.Search(len(newStartDataId), func(k int) bool { return newStartDataId[k] >= target }) newid = newid % NEW_NODE_NUMBER if id != newid { moveCnt++ } } } movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER) fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent) }
|
1
| Moved items: 4897001 +48.9700%
|
一致性hash虽然减少了数据迁移, 但是在node较少时仍然要迁移大量数据,
因此引入虚拟节点Partition. 引入Partition后就形成二级映射:
obj->虚节点->node
.
通过提前设置固定的较大数目的虚节点, 使得obj到虚节点的映射固化,
增删node时只需要维护虚节点和node间的映射即可.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| package main
import ( "fmt" "crypto/md5" "strconv" "encoding/hex" "sort" )
func main() { NODE_NUMBER := 100 NEW_NODE_NUMBER := 101 VNODE_NUMBER := 1000 DATA_MAX_NUMBER := 10000000 moveCnt := 0 vstartDataId := make([]int, 0) newStartDataId := make([]int, 0) vnode2node := make([]int, 0)
remainder := DATA_MAX_NUMBER % VNODE_NUMBER base := DATA_MAX_NUMBER / VNODE_NUMBER cnt := 0 for i := 0; i < VNODE_NUMBER; i++ { vstartDataId = append(vstartDataId, cnt) if i < remainder { cnt = cnt + base + 1 } else { cnt = cnt + base } vnode2node = append(vnode2node, i % NODE_NUMBER) } new_vnode2node := make([]int, len(vnode2node)) copy(new_vnode2node, vnode2node)
remainder = DATA_MAX_NUMBER % NEW_NODE_NUMBER base = DATA_MAX_NUMBER / NEW_NODE_NUMBER cnt = 0 for i := 0; i < NEW_NODE_NUMBER; i++ { newStartDataId = append(newStartDataId, cnt) if i < remainder { cnt = cnt + base + 1 } else { cnt = cnt + base } }
newNodeID := 100 reassignNumber := VNODE_NUMBER / NEW_NODE_NUMBER for i := 0; i < reassignNumber; i++ { vid := i new_vnode2node[vid] = newNodeID }
for i := 0; i < DATA_MAX_NUMBER; i++ { data := md5.Sum([]byte(strconv.Itoa(i))) dataStr := hex.EncodeToString(data[0:4]) hash64, err := strconv.ParseInt(dataStr, 16, 0) if err == nil { hash := int(hash64) target := hash % DATA_MAX_NUMBER vid := sort.Search(len(vstartDataId), func(k int) bool { return vstartDataId[k] >= target }) vid = vid % VNODE_NUMBER
id := vnode2node[vid]
newid := new_vnode2node[vid] if id != newid { moveCnt++ } } } movePercent := 100.0 * float64(moveCnt) / float64(DATA_MAX_NUMBER) fmt.Printf("Moved items: %d %+.4f%%\n", moveCnt, movePercent) }
|
1
| Moved items: 90423 +0.9042%
|
上述方案中需要预设合理的虚节点数目,
如果实际节点多于虚节点数目,会导致有的节点分配不到虚节点,此时如果增加虚节点数,就会导致大量的数据迁移。因此通常需要将虚节点数目预设为系统最大规模的若干倍。
为了加快速度,还可以通过位操作代替取模操作,即所谓的partition
power.
ref
Ring实现原理剖析