场景

业务中有 N 台 redis 缓存服务器 [0, N-1],那么对于任意一个关键字 key 的请求,我们都是做一个对 key 哈希值取模的请求分发。前台请求被分发到编号 (hash (key)% N) 这台服务器上。理想状态下,这样的缓存命中率是很高的。但当因为业务需求而增删服务节点数,例如添加一台服务器。那么原来被分配到 k 号服务器的现在会被分到 (k-1) 号服务器。这个时候几乎所有的缓存全部会被 miss 掉,造成极大的业务压力。

一致性哈希算法的目的就是,当后台增删节点时,旧的数据能够依旧计算到原来的服务器,而新数据能够按新的哈希算法分配到新服务器。

原理

一致性哈希算法把 N 个服务节点映射到一个环上。对每个服务节点的 key 值进行 hash 计算,获得一个长度为 N 的数组,再把数组首尾相连,获得一个环。我们对所有的请求节点 key 作 hash 计算,会落入该环的某一个弧段,我们可以设定每个弧段上的请求节点都以顺时针最近的 Node 作为服务节点。这样增加或减少服务节点,那影响的也只是一个弧段上的请求节点,而其他弧段上的请求节点则不受影响。

这实际上就是把映射环增大了。减少了数据映射关系的变动,不会像 hash (i)% N 那样带来全局的变动。

如果某一个弧段上的节点密度很大,导致单个服务节点过载,即热点数据问题,那我们可以在这个弧段上增加一个服务节点就可以分担压力。

改进

雪崩效应

上述的算法有一个弊端,就是如果某个服务节点因为过载而 down 掉,那么原本分配在它上面的负载都会被分配到顺时针下一台服务器上去,那么下一台服务器也会 down 掉,依次类推,所有的服务器最终都会挂掉。这就是雪崩效应。

为了解决雪崩效应,最好的办法就是,当一个服务节点挂掉的时候,分配在上面的流量被其他的服务节点共同分担。这样才能保证整个集群的可用性。因此,可以采用 虚拟节点 的方式来改进算法。

虚拟节点

这里把每个服务器分为两个副本节点,每个副本节点负责一个更精细的环,如图可以得知,如果 A 挂掉了,那么原本分配给 A 的两个小环的流量会被 D 和 C 来分摊。实际上我们可以通过增加副本节点的数量来提高算法的均衡性。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#! /usr/bin/env python
# -*- coding:utf8 -*-

import md5

class HashRing(object):
def __init__(self, nodes=None, replicas=3):
"""
创建一个 hash 环
:nodes
节点
:replicas
每个服务节点的副本数
"""

self.replicas = replicas
self.ring = {}
self._sorted_keys = []
if nodes:
for node in nodes:
self.add_node (node)

def add_node(self, node):
"""
把一个服务节点加入 hash 环
"""
for i in range(0, self.replicas):
key = self.gen_key ("% s:% s" % (node, i))
self.ring [key] = node
self._sorted_keys.append (key)

self._sorted_keys.sort ()

def remove_node(self, node):
"""
从 hash 环删除一个服务节点
"""
for i in range(0, self.replicas):
key = self.gen_key ("% s:% s" % (node, i))
del self.ring [key]
self._sorted_keys.remove (key)

def get_node_keys(self, node):
"""
获取一个服务节点的所有副本的虚拟 key
"""
keys = []
for i in range(0, self.replicas):
key = self.gen_key ("% s:% s" % (node, i))
keys.append (key)
return key

def get_node(self, string_key):
"""
对于给定的 key 获取对应的服务节点,返回节点
"""
return self.get_node_pos (string_key)[0]

def get_node_pos(self, string_key):
"""
对于给定的 key 获取对应的服务节点,返回节点和位置
"""
if not self.ring:
return None, None

key = self.gen_key (string_key)
nodes = self._sorted_keys
for i in range(0, len(nodes)):
node = nodes [i]
if key <= node:
return self.ring [node], i

return self.ring [nodes [0]], 0

def get_nodes(self, string_key):
"""
对于一个给定的 key ,返回一个
Given a string key it returns the nodes as a generator that can hold the key.
The generator is never ending and iterates through the ring starting at the correct position.
"""
if not self.ring:
yield None

node, pos = self.get_node_pos (string_key)
for key in self._sorted_keys:
yield self.ring [key]

def gen_key(self, key):
"""
以 md5 算法来进行 hash (key) 运算
"""
m = md5.new ()
m.update (key)
return long (m.hexdigest (), 16)