Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

68 rindas
1.9 KiB

  1. from .compat import as_text
  2. WORKERS_BY_QUEUE_KEY = 'rq:workers:%s'
  3. REDIS_WORKER_KEYS = 'rq:workers'
  4. def register(worker, pipeline=None):
  5. """Store worker key in Redis so we can easily discover active workers."""
  6. connection = pipeline if pipeline is not None else worker.connection
  7. connection.sadd(worker.redis_workers_keys, worker.key)
  8. for name in worker.queue_names():
  9. redis_key = WORKERS_BY_QUEUE_KEY % name
  10. connection.sadd(redis_key, worker.key)
  11. def unregister(worker, pipeline=None):
  12. """Remove worker key from Redis."""
  13. if pipeline is None:
  14. connection = worker.connection.pipeline()
  15. else:
  16. connection = pipeline
  17. connection.srem(worker.redis_workers_keys, worker.key)
  18. for name in worker.queue_names():
  19. redis_key = WORKERS_BY_QUEUE_KEY % name
  20. connection.srem(redis_key, worker.key)
  21. if pipeline is None:
  22. connection.execute()
  23. def get_keys(queue=None, connection=None):
  24. """Returnes a list of worker keys for a queue"""
  25. if queue is None and connection is None:
  26. raise ValueError('"queue" or "connection" argument is required')
  27. if queue:
  28. redis = queue.connection
  29. redis_key = WORKERS_BY_QUEUE_KEY % queue.name
  30. else:
  31. redis = connection
  32. redis_key = REDIS_WORKER_KEYS
  33. return {as_text(key) for key in redis.smembers(redis_key)}
  34. def clean_worker_registry(queue):
  35. """Delete invalid worker keys in registry"""
  36. keys = list(get_keys(queue))
  37. with queue.connection.pipeline() as pipeline:
  38. for key in keys:
  39. pipeline.exists(key)
  40. results = pipeline.execute()
  41. invalid_keys = []
  42. for i, key_exists in enumerate(results):
  43. if not key_exists:
  44. invalid_keys.append(keys[i])
  45. if invalid_keys:
  46. pipeline.srem(WORKERS_BY_QUEUE_KEY % queue.name, *invalid_keys)
  47. pipeline.srem(REDIS_WORKER_KEYS, *invalid_keys)
  48. pipeline.execute()