您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

287 行
11 KiB

  1. import random
  2. import weakref
  3. from redis.client import Redis
  4. from redis.connection import ConnectionPool, Connection
  5. from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
  6. TimeoutError)
  7. from redis._compat import iteritems, nativestr, xrange
  8. class MasterNotFoundError(ConnectionError):
  9. pass
  10. class SlaveNotFoundError(ConnectionError):
  11. pass
  12. class SentinelManagedConnection(Connection):
  13. def __init__(self, **kwargs):
  14. self.connection_pool = kwargs.pop('connection_pool')
  15. super(SentinelManagedConnection, self).__init__(**kwargs)
  16. def __repr__(self):
  17. pool = self.connection_pool
  18. s = '%s<service=%s%%s>' % (type(self).__name__, pool.service_name)
  19. if self.host:
  20. host_info = ',host=%s,port=%s' % (self.host, self.port)
  21. s = s % host_info
  22. return s
  23. def connect_to(self, address):
  24. self.host, self.port = address
  25. super(SentinelManagedConnection, self).connect()
  26. if self.connection_pool.check_connection:
  27. self.send_command('PING')
  28. if nativestr(self.read_response()) != 'PONG':
  29. raise ConnectionError('PING failed')
  30. def connect(self):
  31. if self._sock:
  32. return # already connected
  33. if self.connection_pool.is_master:
  34. self.connect_to(self.connection_pool.get_master_address())
  35. else:
  36. for slave in self.connection_pool.rotate_slaves():
  37. try:
  38. return self.connect_to(slave)
  39. except ConnectionError:
  40. continue
  41. raise SlaveNotFoundError # Never be here
  42. def read_response(self):
  43. try:
  44. return super(SentinelManagedConnection, self).read_response()
  45. except ReadOnlyError:
  46. if self.connection_pool.is_master:
  47. # When talking to a master, a ReadOnlyError when likely
  48. # indicates that the previous master that we're still connected
  49. # to has been demoted to a slave and there's a new master.
  50. # calling disconnect will force the connection to re-query
  51. # sentinel during the next connect() attempt.
  52. self.disconnect()
  53. raise ConnectionError('The previous master is now a slave')
  54. raise
  55. class SentinelConnectionPool(ConnectionPool):
  56. """
  57. Sentinel backed connection pool.
  58. If ``check_connection`` flag is set to True, SentinelManagedConnection
  59. sends a PING command right after establishing the connection.
  60. """
  61. def __init__(self, service_name, sentinel_manager, **kwargs):
  62. kwargs['connection_class'] = kwargs.get(
  63. 'connection_class', SentinelManagedConnection)
  64. self.is_master = kwargs.pop('is_master', True)
  65. self.check_connection = kwargs.pop('check_connection', False)
  66. super(SentinelConnectionPool, self).__init__(**kwargs)
  67. self.connection_kwargs['connection_pool'] = weakref.proxy(self)
  68. self.service_name = service_name
  69. self.sentinel_manager = sentinel_manager
  70. def __repr__(self):
  71. return "%s<service=%s(%s)" % (
  72. type(self).__name__,
  73. self.service_name,
  74. self.is_master and 'master' or 'slave',
  75. )
  76. def reset(self):
  77. super(SentinelConnectionPool, self).reset()
  78. self.master_address = None
  79. self.slave_rr_counter = None
  80. def get_master_address(self):
  81. master_address = self.sentinel_manager.discover_master(
  82. self.service_name)
  83. if self.is_master:
  84. if self.master_address is None:
  85. self.master_address = master_address
  86. elif master_address != self.master_address:
  87. # Master address changed, disconnect all clients in this pool
  88. self.disconnect()
  89. return master_address
  90. def rotate_slaves(self):
  91. "Round-robin slave balancer"
  92. slaves = self.sentinel_manager.discover_slaves(self.service_name)
  93. if slaves:
  94. if self.slave_rr_counter is None:
  95. self.slave_rr_counter = random.randint(0, len(slaves) - 1)
  96. for _ in xrange(len(slaves)):
  97. self.slave_rr_counter = (
  98. self.slave_rr_counter + 1) % len(slaves)
  99. slave = slaves[self.slave_rr_counter]
  100. yield slave
  101. # Fallback to the master connection
  102. try:
  103. yield self.get_master_address()
  104. except MasterNotFoundError:
  105. pass
  106. raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
  107. class Sentinel(object):
  108. """
  109. Redis Sentinel cluster client
  110. >>> from redis.sentinel import Sentinel
  111. >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
  112. >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
  113. >>> master.set('foo', 'bar')
  114. >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
  115. >>> slave.get('foo')
  116. 'bar'
  117. ``sentinels`` is a list of sentinel nodes. Each node is represented by
  118. a pair (hostname, port).
  119. ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
  120. When querying a sentinel, if it doesn't meet this threshold, responses
  121. from that sentinel won't be considered valid.
  122. ``sentinel_kwargs`` is a dictionary of connection arguments used when
  123. connecting to sentinel instances. Any argument that can be passed to
  124. a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
  125. not specified, any socket_timeout and socket_keepalive options specified
  126. in ``connection_kwargs`` will be used.
  127. ``connection_kwargs`` are keyword arguments that will be used when
  128. establishing a connection to a Redis server.
  129. """
  130. def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
  131. **connection_kwargs):
  132. # if sentinel_kwargs isn't defined, use the socket_* options from
  133. # connection_kwargs
  134. if sentinel_kwargs is None:
  135. sentinel_kwargs = {
  136. k: v
  137. for k, v in iteritems(connection_kwargs)
  138. if k.startswith('socket_')
  139. }
  140. self.sentinel_kwargs = sentinel_kwargs
  141. self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
  142. for hostname, port in sentinels]
  143. self.min_other_sentinels = min_other_sentinels
  144. self.connection_kwargs = connection_kwargs
  145. def __repr__(self):
  146. sentinel_addresses = []
  147. for sentinel in self.sentinels:
  148. sentinel_addresses.append('%s:%s' % (
  149. sentinel.connection_pool.connection_kwargs['host'],
  150. sentinel.connection_pool.connection_kwargs['port'],
  151. ))
  152. return '%s<sentinels=[%s]>' % (
  153. type(self).__name__,
  154. ','.join(sentinel_addresses))
  155. def check_master_state(self, state, service_name):
  156. if not state['is_master'] or state['is_sdown'] or state['is_odown']:
  157. return False
  158. # Check if our sentinel doesn't see other nodes
  159. if state['num-other-sentinels'] < self.min_other_sentinels:
  160. return False
  161. return True
  162. def discover_master(self, service_name):
  163. """
  164. Asks sentinel servers for the Redis master's address corresponding
  165. to the service labeled ``service_name``.
  166. Returns a pair (address, port) or raises MasterNotFoundError if no
  167. master is found.
  168. """
  169. for sentinel_no, sentinel in enumerate(self.sentinels):
  170. try:
  171. masters = sentinel.sentinel_masters()
  172. except (ConnectionError, TimeoutError):
  173. continue
  174. state = masters.get(service_name)
  175. if state and self.check_master_state(state, service_name):
  176. # Put this sentinel at the top of the list
  177. self.sentinels[0], self.sentinels[sentinel_no] = (
  178. sentinel, self.sentinels[0])
  179. return state['ip'], state['port']
  180. raise MasterNotFoundError("No master found for %r" % (service_name,))
  181. def filter_slaves(self, slaves):
  182. "Remove slaves that are in an ODOWN or SDOWN state"
  183. slaves_alive = []
  184. for slave in slaves:
  185. if slave['is_odown'] or slave['is_sdown']:
  186. continue
  187. slaves_alive.append((slave['ip'], slave['port']))
  188. return slaves_alive
  189. def discover_slaves(self, service_name):
  190. "Returns a list of alive slaves for service ``service_name``"
  191. for sentinel in self.sentinels:
  192. try:
  193. slaves = sentinel.sentinel_slaves(service_name)
  194. except (ConnectionError, ResponseError, TimeoutError):
  195. continue
  196. slaves = self.filter_slaves(slaves)
  197. if slaves:
  198. return slaves
  199. return []
  200. def master_for(self, service_name, redis_class=Redis,
  201. connection_pool_class=SentinelConnectionPool, **kwargs):
  202. """
  203. Returns a redis client instance for the ``service_name`` master.
  204. A SentinelConnectionPool class is used to retrive the master's
  205. address before establishing a new connection.
  206. NOTE: If the master's address has changed, any cached connections to
  207. the old master are closed.
  208. By default clients will be a redis.Redis instance. Specify a
  209. different class to the ``redis_class`` argument if you desire
  210. something different.
  211. The ``connection_pool_class`` specifies the connection pool to use.
  212. The SentinelConnectionPool will be used by default.
  213. All other keyword arguments are merged with any connection_kwargs
  214. passed to this class and passed to the connection pool as keyword
  215. arguments to be used to initialize Redis connections.
  216. """
  217. kwargs['is_master'] = True
  218. connection_kwargs = dict(self.connection_kwargs)
  219. connection_kwargs.update(kwargs)
  220. return redis_class(connection_pool=connection_pool_class(
  221. service_name, self, **connection_kwargs))
  222. def slave_for(self, service_name, redis_class=Redis,
  223. connection_pool_class=SentinelConnectionPool, **kwargs):
  224. """
  225. Returns redis client instance for the ``service_name`` slave(s).
  226. A SentinelConnectionPool class is used to retrive the slave's
  227. address before establishing a new connection.
  228. By default clients will be a redis.Redis instance. Specify a
  229. different class to the ``redis_class`` argument if you desire
  230. something different.
  231. The ``connection_pool_class`` specifies the connection pool to use.
  232. The SentinelConnectionPool will be used by default.
  233. All other keyword arguments are merged with any connection_kwargs
  234. passed to this class and passed to the connection pool as keyword
  235. arguments to be used to initialize Redis connections.
  236. """
  237. kwargs['is_master'] = False
  238. connection_kwargs = dict(self.connection_kwargs)
  239. connection_kwargs.update(kwargs)
  240. return redis_class(connection_pool=connection_pool_class(
  241. service_name, self, **connection_kwargs))