Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

172 řádky
5.1 KiB

  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import base64
  6. import binascii
  7. import os
  8. import struct
  9. import time
  10. import six
  11. from cryptography import utils
  12. from cryptography.exceptions import InvalidSignature
  13. from cryptography.hazmat.backends import default_backend
  14. from cryptography.hazmat.primitives import hashes, padding
  15. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  16. from cryptography.hazmat.primitives.hmac import HMAC
  17. class InvalidToken(Exception):
  18. pass
  19. _MAX_CLOCK_SKEW = 60
  20. class Fernet(object):
  21. def __init__(self, key, backend=None):
  22. if backend is None:
  23. backend = default_backend()
  24. key = base64.urlsafe_b64decode(key)
  25. if len(key) != 32:
  26. raise ValueError(
  27. "Fernet key must be 32 url-safe base64-encoded bytes."
  28. )
  29. self._signing_key = key[:16]
  30. self._encryption_key = key[16:]
  31. self._backend = backend
  32. @classmethod
  33. def generate_key(cls):
  34. return base64.urlsafe_b64encode(os.urandom(32))
  35. def encrypt(self, data):
  36. current_time = int(time.time())
  37. iv = os.urandom(16)
  38. return self._encrypt_from_parts(data, current_time, iv)
  39. def _encrypt_from_parts(self, data, current_time, iv):
  40. utils._check_bytes("data", data)
  41. padder = padding.PKCS7(algorithms.AES.block_size).padder()
  42. padded_data = padder.update(data) + padder.finalize()
  43. encryptor = Cipher(
  44. algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
  45. ).encryptor()
  46. ciphertext = encryptor.update(padded_data) + encryptor.finalize()
  47. basic_parts = (
  48. b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
  49. )
  50. h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
  51. h.update(basic_parts)
  52. hmac = h.finalize()
  53. return base64.urlsafe_b64encode(basic_parts + hmac)
  54. def decrypt(self, token, ttl=None):
  55. timestamp, data = Fernet._get_unverified_token_data(token)
  56. return self._decrypt_data(data, timestamp, ttl)
  57. def extract_timestamp(self, token):
  58. timestamp, data = Fernet._get_unverified_token_data(token)
  59. # Verify the token was not tampered with.
  60. self._verify_signature(data)
  61. return timestamp
  62. @staticmethod
  63. def _get_unverified_token_data(token):
  64. utils._check_bytes("token", token)
  65. try:
  66. data = base64.urlsafe_b64decode(token)
  67. except (TypeError, binascii.Error):
  68. raise InvalidToken
  69. if not data or six.indexbytes(data, 0) != 0x80:
  70. raise InvalidToken
  71. try:
  72. timestamp, = struct.unpack(">Q", data[1:9])
  73. except struct.error:
  74. raise InvalidToken
  75. return timestamp, data
  76. def _verify_signature(self, data):
  77. h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
  78. h.update(data[:-32])
  79. try:
  80. h.verify(data[-32:])
  81. except InvalidSignature:
  82. raise InvalidToken
  83. def _decrypt_data(self, data, timestamp, ttl):
  84. current_time = int(time.time())
  85. if ttl is not None:
  86. if timestamp + ttl < current_time:
  87. raise InvalidToken
  88. if current_time + _MAX_CLOCK_SKEW < timestamp:
  89. raise InvalidToken
  90. self._verify_signature(data)
  91. iv = data[9:25]
  92. ciphertext = data[25:-32]
  93. decryptor = Cipher(
  94. algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
  95. ).decryptor()
  96. plaintext_padded = decryptor.update(ciphertext)
  97. try:
  98. plaintext_padded += decryptor.finalize()
  99. except ValueError:
  100. raise InvalidToken
  101. unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
  102. unpadded = unpadder.update(plaintext_padded)
  103. try:
  104. unpadded += unpadder.finalize()
  105. except ValueError:
  106. raise InvalidToken
  107. return unpadded
  108. class MultiFernet(object):
  109. def __init__(self, fernets):
  110. fernets = list(fernets)
  111. if not fernets:
  112. raise ValueError(
  113. "MultiFernet requires at least one Fernet instance"
  114. )
  115. self._fernets = fernets
  116. def encrypt(self, msg):
  117. return self._fernets[0].encrypt(msg)
  118. def rotate(self, msg):
  119. timestamp, data = Fernet._get_unverified_token_data(msg)
  120. for f in self._fernets:
  121. try:
  122. p = f._decrypt_data(data, timestamp, None)
  123. break
  124. except InvalidToken:
  125. pass
  126. else:
  127. raise InvalidToken
  128. iv = os.urandom(16)
  129. return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
  130. def decrypt(self, msg, ttl=None):
  131. for f in self._fernets:
  132. try:
  133. return f.decrypt(msg, ttl)
  134. except InvalidToken:
  135. pass
  136. raise InvalidToken