|
- """passlib.totp -- TOTP / RFC6238 / Google Authenticator utilities."""
- #=============================================================================
- # imports
- #=============================================================================
- from __future__ import absolute_import, division, print_function
- from passlib.utils.compat import PY3
- # core
- import base64
- import calendar
- import json
- import logging; log = logging.getLogger(__name__)
- import math
- import struct
- import sys
- import time as _time
- import re
- if PY3:
- from urllib.parse import urlparse, parse_qsl, quote, unquote
- else:
- from urllib import quote, unquote
- from urlparse import urlparse, parse_qsl
- from warnings import warn
- # site
- try:
- # TOTP encrypted keys only supported if cryptography (https://cryptography.io) is installed
- from cryptography.hazmat.backends import default_backend as _cg_default_backend
- import cryptography.hazmat.primitives.ciphers.algorithms
- import cryptography.hazmat.primitives.ciphers.modes
- from cryptography.hazmat.primitives import ciphers as _cg_ciphers
- del cryptography
- except ImportError:
- log.debug("can't import 'cryptography' package, totp encryption disabled")
- _cg_ciphers = _cg_default_backend = None
- # pkg
- from passlib import exc
- from passlib.exc import TokenError, MalformedTokenError, InvalidTokenError, UsedTokenError
- from passlib.utils import (to_unicode, to_bytes, consteq,
- getrandbytes, rng, SequenceMixin, xor_bytes, getrandstr)
- from passlib.utils.binary import BASE64_CHARS, b32encode, b32decode
- from passlib.utils.compat import (u, unicode, native_string_types, bascii_to_str, int_types, num_types,
- irange, byte_elem_value, UnicodeIO, suppress_cause)
- from passlib.utils.decor import hybrid_method, memoized_property
- from passlib.crypto.digest import lookup_hash, compile_hmac, pbkdf2_hmac
- from passlib.hash import pbkdf2_sha256
- # local
- __all__ = [
- # frontend classes
- "AppWallet",
- "TOTP",
-
- # errors (defined in passlib.exc, but exposed here for convenience)
- "TokenError",
- "MalformedTokenError",
- "InvalidTokenError",
- "UsedTokenError",
-
- # internal helper classes
- "TotpToken",
- "TotpMatch",
- ]
-
- #=============================================================================
- # HACK: python < 2.7.4's urlparse() won't parse query strings unless the url scheme
- # is one of the schemes in the urlparse.uses_query list. 2.7 abandoned
- # this, and parses query if present, regardless of the scheme.
- # as a workaround for older versions, we add "otpauth" to the known list.
- # this was fixed by https://bugs.python.org/issue9374, in 2.7.4 release.
- #=============================================================================
- if sys.version_info < (2,7,4):
- from urlparse import uses_query
- if "otpauth" not in uses_query:
- uses_query.append("otpauth")
- log.debug("registered 'otpauth' scheme with urlparse.uses_query")
- del uses_query
-
- #=============================================================================
- # internal helpers
- #=============================================================================
-
- #-----------------------------------------------------------------------------
- # token parsing / rendering helpers
- #-----------------------------------------------------------------------------
-
- #: regex used to clean whitespace from tokens & keys
- _clean_re = re.compile(u(r"\s|[-=]"), re.U)
-
- _chunk_sizes = [4,6,5]
-
- def _get_group_size(klen):
- """
- helper for group_string() --
- calculates optimal size of group for given string size.
- """
- # look for exact divisor
- for size in _chunk_sizes:
- if not klen % size:
- return size
- # fallback to divisor with largest remainder
- # (so chunks are as close to even as possible)
- best = _chunk_sizes[0]
- rem = 0
- for size in _chunk_sizes:
- if klen % size > rem:
- best = size
- rem = klen % size
- return best
-
- def group_string(value, sep="-"):
- """
- reformat string into (roughly) evenly-sized groups, separated by **sep**.
- useful for making tokens & keys easier to read by humans.
- """
- klen = len(value)
- size = _get_group_size(klen)
- return sep.join(value[o:o+size] for o in irange(0, klen, size))
-
- #-----------------------------------------------------------------------------
- # encoding helpers
- #-----------------------------------------------------------------------------
-
- def _decode_bytes(key, format):
- """
- internal TOTP() helper --
- decodes key according to specified format.
- """
- if format == "raw":
- if not isinstance(key, bytes):
- raise exc.ExpectedTypeError(key, "bytes", "key")
- return key
- # for encoded data, key must be either unicode or ascii-encoded bytes,
- # and must contain a hex or base32 string.
- key = to_unicode(key, param="key")
- key = _clean_re.sub("", key).encode("utf-8") # strip whitespace & hypens
- if format == "hex" or format == "base16":
- return base64.b16decode(key.upper())
- elif format == "base32":
- return b32decode(key)
- # XXX: add base64 support?
- else:
- raise ValueError("unknown byte-encoding format: %r" % (format,))
-
- #=============================================================================
- # OTP management
- #=============================================================================
-
- #: flag for detecting if encrypted totp support is present
- AES_SUPPORT = bool(_cg_ciphers)
-
- #: regex for validating secret tags
- _tag_re = re.compile("(?i)^[a-z0-9][a-z0-9_.-]*$")
-
- class AppWallet(object):
- """
- This class stores application-wide secrets that can be used
- to encrypt & decrypt TOTP keys for storage.
- It's mostly an internal detail, applications usually just need
- to pass ``secrets`` or ``secrets_path`` to :meth:`TOTP.using`.
-
- .. seealso::
-
- :ref:`totp-storing-instances` for more details on this workflow.
-
- Arguments
- =========
- :param secrets:
- Dict of application secrets to use when encrypting/decrypting
- stored TOTP keys. This should include a secret to use when encrypting
- new keys, but may contain additional older secrets to decrypt
- existing stored keys.
-
- The dict should map tags -> secrets, so that each secret is identified
- by a unique tag. This tag will be stored along with the encrypted
- key in order to determine which secret should be used for decryption.
- Tag should be string that starts with regex range ``[a-z0-9]``,
- and the remaining characters must be in ``[a-z0-9_.-]``.
-
- It is recommended to use something like a incremental counter
- ("1", "2", ...), an ISO date ("2016-01-01", "2016-05-16", ...),
- or a timestamp ("19803495", "19813495", ...) when assigning tags.
-
- This mapping be provided in three formats:
-
- * A python dict mapping tag -> secret
- * A JSON-formatted string containing the dict
- * A multiline string with the format ``"tag: value\\ntag: value\\n..."``
-
- (This last format is mainly useful when loading from a text file via **secrets_path**)
-
- .. seealso:: :func:`generate_secret` to create a secret with sufficient entropy
-
- :param secrets_path:
- Alternately, callers can specify a separate file where the
- application-wide secrets are stored, using either of the string
- formats described in **secrets**.
-
- :param default_tag:
- Specifies which tag in **secrets** should be used as the default
- for encrypting new keys. If omitted, the tags will be sorted,
- and the largest tag used as the default.
-
- if all tags are numeric, they will be sorted numerically;
- otherwise they will be sorted alphabetically.
- this permits tags to be assigned numerically,
- or e.g. using ``YYYY-MM-DD`` dates.
-
- :param encrypt_cost:
- Optional time-cost factor for key encryption.
- This value corresponds to log2() of the number of PBKDF2
- rounds used.
-
- .. warning::
-
- The application secret(s) should be stored in a secure location by
- your application, and each secret should contain a large amount
- of entropy (to prevent brute-force attacks if the encrypted keys
- are leaked).
-
- :func:`generate_secret` is provided as a convenience helper
- to generate a new application secret of suitable size.
-
- Best practice is to load these values from a file via **secrets_path**,
- and then have your application give up permission to read this file
- once it's running.
-
- Public Methods
- ==============
- .. autoattribute:: has_secrets
- .. autoattribute:: default_tag
-
- Semi-Private Methods
- ====================
- The following methods are used internally by the :class:`TOTP`
- class in order to encrypt & decrypt keys using the provided application
- secrets. They will generally not be publically useful, and may have their
- API changed periodically.
-
- .. automethod:: get_secret
- .. automethod:: encrypt_key
- .. automethod:: decrypt_key
- """
- #========================================================================
- # instance attrs
- #========================================================================
-
- #: default salt size for encrypt_key() output
- salt_size = 12
-
- #: default cost (log2 of pbkdf2 rounds) for encrypt_key() output
- #: NOTE: this is relatively low, since the majority of the security
- #: relies on a high entropy secret to pass to AES.
- encrypt_cost = 14
-
- #: map of secret tag -> secret bytes
- _secrets = None
-
- #: tag for default secret
- default_tag = None
-
- #========================================================================
- # init
- #========================================================================
- def __init__(self, secrets=None, default_tag=None, encrypt_cost=None,
- secrets_path=None):
-
- # TODO: allow a lot more things to be customized from here,
- # e.g. setting default TOTP constructor options.
-
- #
- # init cost
- #
- if encrypt_cost is not None:
- if isinstance(encrypt_cost, native_string_types):
- encrypt_cost = int(encrypt_cost)
- assert encrypt_cost >= 0
- self.encrypt_cost = encrypt_cost
-
- #
- # init secrets map
- #
-
- # load secrets from file (if needed)
- if secrets_path is not None:
- if secrets is not None:
- raise TypeError("'secrets' and 'secrets_path' are mutually exclusive")
- secrets = open(secrets_path, "rt").read()
-
- # parse & store secrets
- secrets = self._secrets = self._parse_secrets(secrets)
-
- #
- # init default tag/secret
- #
- if secrets:
- if default_tag is not None:
- # verify that tag is present in map
- self.get_secret(default_tag)
- elif all(tag.isdigit() for tag in secrets):
- default_tag = max(secrets, key=int)
- else:
- default_tag = max(secrets)
- self.default_tag = default_tag
-
- def _parse_secrets(self, source):
- """
- parse 'secrets' parameter
-
- :returns:
- Dict[tag:str, secret:bytes]
- """
- # parse string formats
- # to make this easy to pass in configuration from a separate file,
- # 'secrets' can be string using two formats -- json & "tag:value\n"
- check_type = True
- if isinstance(source, native_string_types):
- if source.lstrip().startswith(("[", "{")):
- # json list / dict
- source = json.loads(source)
- elif "\n" in source and ":" in source:
- # multiline string containing series of "tag: value\n" rows;
- # empty and "#\n" rows are ignored
- def iter_pairs(source):
- for line in source.splitlines():
- line = line.strip()
- if line and not line.startswith("#"):
- tag, secret = line.split(":", 1)
- yield tag.strip(), secret.strip()
- source = iter_pairs(source)
- check_type = False
- else:
- raise ValueError("unrecognized secrets string format")
-
- # ensure we have iterable of (tag, value) pairs
- if source is None:
- return {}
- elif isinstance(source, dict):
- source = source.items()
- # XXX: could support iterable of (tag,value) pairs, but not yet needed...
- # elif check_type and (isinstance(source, str) or not isinstance(source, Iterable)):
- elif check_type:
- raise TypeError("'secrets' must be mapping, or list of items")
-
- # parse into final dict, normalizing contents
- return dict(self._parse_secret_pair(tag, value)
- for tag, value in source)
-
- def _parse_secret_pair(self, tag, value):
- if isinstance(tag, native_string_types):
- pass
- elif isinstance(tag, int):
- tag = str(tag)
- else:
- raise TypeError("tag must be unicode/string: %r" % (tag,))
- if not _tag_re.match(tag):
- raise ValueError("tag contains invalid characters: %r" % (tag,))
- if not isinstance(value, bytes):
- value = to_bytes(value, param="secret %r" % (tag,))
- if not value:
- raise ValueError("tag contains empty secret: %r" % (tag,))
- return tag, value
-
- #========================================================================
- # accessing secrets
- #========================================================================
-
- @property
- def has_secrets(self):
- """whether at least one application secret is present"""
- return self.default_tag is not None
-
- def get_secret(self, tag):
- """
- resolve a secret tag to the secret (as bytes).
- throws a KeyError if not found.
- """
- secrets = self._secrets
- if not secrets:
- raise KeyError("no application secrets configured")
- try:
- return secrets[tag]
- except KeyError:
- raise suppress_cause(KeyError("unknown secret tag: %r" % (tag,)))
-
- #========================================================================
- # encrypted key helpers -- used internally by TOTP
- #========================================================================
-
- @staticmethod
- def _cipher_aes_key(value, secret, salt, cost, decrypt=False):
- """
- Internal helper for :meth:`encrypt_key` --
- handles lowlevel encryption/decryption.
-
- Algorithm details:
-
- This function uses PBKDF2-HMAC-SHA256 to generate a 32-byte AES key
- and a 16-byte IV from the application secret & random salt.
- It then uses AES-256-CTR to encrypt/decrypt the TOTP key.
-
- CTR mode was chosen over CBC because the main attack scenario here
- is that the attacker has stolen the database, and is trying to decrypt a TOTP key
- (the plaintext value here). To make it hard for them, we want every password
- to decrypt to a potentially valid key -- thus need to avoid any authentication
- or padding oracle attacks. While some random padding construction could be devised
- to make this work for CBC mode, a stream cipher mode is just plain simpler.
- OFB/CFB modes would also work here, but seeing as they have malleability
- and cyclic issues (though remote and barely relevant here),
- CTR was picked as the best overall choice.
- """
- # make sure backend AES support is available
- if _cg_ciphers is None:
- raise RuntimeError("TOTP encryption requires 'cryptography' package "
- "(https://cryptography.io)")
-
- # use pbkdf2 to derive both key (32 bytes) & iv (16 bytes)
- # NOTE: this requires 2 sha256 blocks to be calculated.
- keyiv = pbkdf2_hmac("sha256", secret, salt=salt, rounds=(1 << cost), keylen=48)
-
- # use AES-256-CTR to encrypt/decrypt input value
- cipher = _cg_ciphers.Cipher(_cg_ciphers.algorithms.AES(keyiv[:32]),
- _cg_ciphers.modes.CTR(keyiv[32:]),
- _cg_default_backend())
- ctx = cipher.decryptor() if decrypt else cipher.encryptor()
- return ctx.update(value) + ctx.finalize()
-
- def encrypt_key(self, key):
- """
- Helper used to encrypt TOTP keys for storage.
-
- :param key:
- TOTP key to encrypt, as raw bytes.
-
- :returns:
- dict containing encrypted TOTP key & configuration parameters.
- this format should be treated as opaque, and potentially subject
- to change, though it is designed to be easily serialized/deserialized
- (e.g. via JSON).
-
- .. note::
-
- This function requires installation of the external
- `cryptography <https://cryptography.io>`_ package.
-
- To give some algorithm details: This function uses AES-256-CTR to encrypt
- the provided data. It takes the application secret and randomly generated salt,
- and uses PBKDF2-HMAC-SHA256 to combine them and generate the AES key & IV.
- """
- if not key:
- raise ValueError("no key provided")
- salt = getrandbytes(rng, self.salt_size)
- cost = self.encrypt_cost
- tag = self.default_tag
- if not tag:
- raise TypeError("no application secrets configured, can't encrypt OTP key")
- ckey = self._cipher_aes_key(key, self.get_secret(tag), salt, cost)
- # XXX: switch to base64?
- return dict(v=1, c=cost, t=tag, s=b32encode(salt), k=b32encode(ckey))
-
- def decrypt_key(self, enckey):
- """
- Helper used to decrypt TOTP keys from storage format.
- Consults configured secrets to decrypt key.
-
- :param source:
- source object, as returned by :meth:`encrypt_key`.
-
- :returns:
- ``(key, needs_recrypt)`` --
-
- **key** will be the decrypted key, as bytes.
-
- **needs_recrypt** will be a boolean flag indicating
- whether encryption cost or default tag is too old,
- and henace that key needs re-encrypting before storing.
-
- .. note::
-
- This function requires installation of the external
- `cryptography <https://cryptography.io>`_ package.
- """
- if not isinstance(enckey, dict):
- raise TypeError("'enckey' must be dictionary")
- version = enckey.get("v", None)
- needs_recrypt = False
- if version == 1:
- _cipher_key = self._cipher_aes_key
- else:
- raise ValueError("missing / unrecognized 'enckey' version: %r" % (version,))
- tag = enckey['t']
- cost = enckey['c']
- key = _cipher_key(
- value=b32decode(enckey['k']),
- secret=self.get_secret(tag),
- salt=b32decode(enckey['s']),
- cost=cost,
- )
- if cost != self.encrypt_cost or tag != self.default_tag:
- needs_recrypt = True
- return key, needs_recrypt
-
- #=============================================================================
- # eoc
- #=============================================================================
-
- #=============================================================================
- # TOTP class
- #=============================================================================
-
- #: helper to convert HOTP counter to bytes
- _pack_uint64 = struct.Struct(">Q").pack
-
- #: helper to extract value from HOTP digest
- _unpack_uint32 = struct.Struct(">I").unpack
-
- #: dummy bytes used as temp key for .using() method
- _DUMMY_KEY = b"\x00" * 16
-
- class TOTP(object):
- """
- Helper for generating and verifying TOTP codes.
-
- Given a secret key and set of configuration options, this object
- offers methods for token generation, token validation, and serialization.
- It can also be used to track important persistent TOTP state,
- such as the last counter used.
-
- This class accepts the following options
- (only **key** and **format** may be specified as positional arguments).
-
- :arg str key:
- The secret key to use. By default, should be encoded as
- a base32 string (see **format** for other encodings).
-
- Exactly one of **key** or ``new=True`` must be specified.
-
- :arg str format:
- The encoding used by the **key** parameter. May be one of:
- ``"base32"`` (base32-encoded string),
- ``"hex"`` (hexadecimal string), or ``"raw"`` (raw bytes).
- Defaults to ``"base32"``.
-
- :param bool new:
- If ``True``, a new key will be generated using :class:`random.SystemRandom`.
-
- Exactly one ``new=True`` or **key** must be specified.
-
- :param str label:
- Label to associate with this token when generating a URI.
- Displayed to user by most OTP client applications (e.g. Google Authenticator),
- and typically has format such as ``"John Smith"`` or ``"jsmith@webservice.example.org"``.
- Defaults to ``None``.
- See :meth:`to_uri` for details.
-
- :param str issuer:
- String identifying the token issuer (e.g. the domain name of your service).
- Used internally by some OTP client applications (e.g. Google Authenticator) to distinguish entries
- which otherwise have the same label.
- Optional but strongly recommended if you're rendering to a URI.
- Defaults to ``None``.
- See :meth:`to_uri` for details.
-
- :param int size:
- Number of bytes when generating new keys. Defaults to size of hash algorithm (e.g. 20 for SHA1).
-
- .. warning::
-
- Overriding the default values for ``digits``, ``period``, or ``alg`` may
- cause problems with some OTP client programs (such as Google Authenticator),
- which may have these defaults hardcoded.
-
- :param int digits:
- The number of digits in the generated / accepted tokens. Defaults to ``6``.
- Must be in range [6 .. 10].
-
- .. rst-class:: inline-title
- .. caution::
- Due to a limitation of the HOTP algorithm, the 10th digit can only take on values 0 .. 2,
- and thus offers very little extra security.
-
- :param str alg:
- Name of hash algorithm to use. Defaults to ``"sha1"``.
- ``"sha256"`` and ``"sha512"`` are also accepted, per :rfc:`6238`.
-
- :param int period:
- The time-step period to use, in integer seconds. Defaults to ``30``.
-
- ..
- See the passlib documentation for a full list of attributes & methods.
- """
- #=============================================================================
- # class attrs
- #=============================================================================
-
- #: minimum number of bytes to allow in key, enforced by passlib.
- # XXX: see if spec says anything relevant to this.
- _min_key_size = 10
-
- #: minimum & current serialization version (may be set independently by subclasses)
- min_json_version = json_version = 1
-
- #: AppWallet that this class will use for encrypting/decrypting keys.
- #: (can be overwritten via the :meth:`TOTP.using()` constructor)
- wallet = None
-
- #: function to get system time in seconds, as needed by :meth:`generate` and :meth:`verify`.
- #: defaults to :func:`time.time`, but can be overridden on a per-instance basis.
- now = _time.time
-
- #=============================================================================
- # instance attrs
- #=============================================================================
-
- #---------------------------------------------------------------------------
- # configuration attrs
- #---------------------------------------------------------------------------
-
- #: [private] secret key as raw :class:`!bytes`
- #: see .key property for public access.
- _key = None
-
- #: [private] cached copy of encrypted secret,
- #: so .to_json() doesn't have to re-encrypt on each call.
- _encrypted_key = None
-
- #: [private] cached copy of keyed HMAC function,
- #: so ._generate() doesn't have to rebuild this each time
- #: ._find_match() invokes it.
- _keyed_hmac = None
-
- #: number of digits in the generated tokens.
- digits = 6
-
- #: name of hash algorithm in use (e.g. ``"sha1"``)
- alg = "sha1"
-
- #: default label for :meth:`to_uri`
- label = None
-
- #: default issuer for :meth:`to_uri`
- issuer = None
-
- #: number of seconds per counter step.
- #: *(TOTP uses an internal time-derived counter which
- #: increments by 1 every* :attr:`!period` *seconds)*.
- period = 30
-
- #---------------------------------------------------------------------------
- # state attrs
- #---------------------------------------------------------------------------
-
- #: Flag set by deserialization methods to indicate the object needs to be re-serialized.
- #: This can be for a number of reasons -- encoded using deprecated format,
- #: or encrypted using a deprecated key or too few rounds.
- changed = False
-
- #=============================================================================
- # prototype construction
- #=============================================================================
- @classmethod
- def using(cls, digits=None, alg=None, period=None,
- issuer=None, wallet=None, now=None, **kwds):
- """
- Dynamically create subtype of :class:`!TOTP` class
- which has the specified defaults set.
-
- :parameters: **digits, alg, period, issuer**:
-
- All these options are the same as in the :class:`TOTP` constructor,
- and the resulting class will use any values you specify here
- as the default for all TOTP instances it creates.
-
- :param wallet:
- Optional :class:`AppWallet` that will be used for encrypting/decrypting keys.
-
- :param secrets, secrets_path, encrypt_cost:
-
- If specified, these options will be passed to the :class:`AppWallet` constructor,
- allowing you to directly specify the secret keys that should be used
- to encrypt & decrypt stored keys.
-
- :returns:
- subclass of :class:`!TOTP`.
-
- This method is useful for creating a TOTP class configured
- to use your application's secrets for encrypting & decrypting
- keys, as well as create new keys using it's desired configuration defaults.
-
- As an example::
-
- >>> # your application can create a custom class when it initializes
- >>> from passlib.totp import TOTP, generate_secret
- >>> TotpFactory = TOTP.using(secrets={"1": generate_secret()})
-
- >>> # subsequent TOTP objects created from this factory
- >>> # will use the specified secrets to encrypt their keys...
- >>> totp = TotpFactory.new()
- >>> totp.to_dict()
- {'enckey': {'c': 14,
- 'k': 'H77SYXWORDPGVOQTFRR2HFUB3C45XXI7',
- 's': 'G5DOQPIHIBUM2OOHHADQ',
- 't': '1',
- 'v': 1},
- 'type': 'totp',
- 'v': 1}
-
- .. seealso:: :ref:`totp-creation` and :ref:`totp-storing-instances` tutorials for a usage example
- """
- # XXX: could add support for setting default match 'window' and 'reuse' policy
-
- # :param now:
- # Optional callable that should return current time for generator to use.
- # Default to :func:`time.time`. This optional is generally not needed,
- # and is mainly present for examples & unit-testing.
-
- subcls = type("TOTP", (cls,), {})
-
- def norm_param(attr, value):
- """
- helper which uses constructor to validate parameter value.
- it returns corresponding attribute, so we use normalized value.
- """
- # NOTE: this creates *subclass* instance,
- # so normalization takes into account any custom params
- # already stored.
- kwds = dict(key=_DUMMY_KEY, format="raw")
- kwds[attr] = value
- obj = subcls(**kwds)
- return getattr(obj, attr)
-
- if digits is not None:
- subcls.digits = norm_param("digits", digits)
-
- if alg is not None:
- subcls.alg = norm_param("alg", alg)
-
- if period is not None:
- subcls.period = norm_param("period", period)
-
- # XXX: add default size as configurable parameter?
-
- if issuer is not None:
- subcls.issuer = norm_param("issuer", issuer)
-
- if kwds:
- subcls.wallet = AppWallet(**kwds)
- if wallet:
- raise TypeError("'wallet' and 'secrets' keywords are mutually exclusive")
- elif wallet is not None:
- if not isinstance(wallet, AppWallet):
- raise exc.ExpectedTypeError(wallet, AppWallet, "wallet")
- subcls.wallet = wallet
-
- if now is not None:
- assert isinstance(now(), num_types) and now() >= 0, \
- "now() function must return non-negative int/float"
- subcls.now = staticmethod(now)
-
- return subcls
-
- #=============================================================================
- # init
- #=============================================================================
-
- @classmethod
- def new(cls, **kwds):
- """
- convenience alias for creating new TOTP key, same as ``TOTP(new=True)``
- """
- return cls(new=True, **kwds)
-
- def __init__(self, key=None, format="base32",
- # keyword only...
- new=False, digits=None, alg=None, size=None, period=None,
- label=None, issuer=None, changed=False,
- **kwds):
- super(TOTP, self).__init__(**kwds)
- if changed:
- self.changed = changed
-
- # validate & normalize alg
- info = lookup_hash(alg or self.alg)
- self.alg = info.name
- digest_size = info.digest_size
- if digest_size < 4:
- raise RuntimeError("%r hash digest too small" % alg)
-
- # parse or generate new key
- if new:
- # generate new key
- if key:
- raise TypeError("'key' and 'new=True' are mutually exclusive")
- if size is None:
- # default to digest size, per RFC 6238 Section 5.1
- size = digest_size
- elif size > digest_size:
- # not forbidden by spec, but would just be wasted bytes.
- # maybe just warn about this?
- raise ValueError("'size' should be less than digest size "
- "(%d)" % digest_size)
- self.key = getrandbytes(rng, size)
- elif not key:
- raise TypeError("must specify either an existing 'key', or 'new=True'")
- elif format == "encrypted":
- # NOTE: this handles decrypting & setting '.key'
- self.encrypted_key = key
- elif key:
- # use existing key, encoded using specified <format>
- self.key = _decode_bytes(key, format)
-
- # enforce min key size
- if len(self.key) < self._min_key_size:
- # only making this fatal for new=True,
- # so that existing (but ridiculously small) keys can still be used.
- msg = "for security purposes, secret key must be >= %d bytes" % self._min_key_size
- if new:
- raise ValueError(msg)
- else:
- warn(msg, exc.PasslibSecurityWarning, stacklevel=1)
-
- # validate digits
- if digits is None:
- digits = self.digits
- if not isinstance(digits, int_types):
- raise TypeError("digits must be an integer, not a %r" % type(digits))
- if digits < 6 or digits > 10:
- raise ValueError("digits must in range(6,11)")
- self.digits = digits
-
- # validate label
- if label:
- self._check_label(label)
- self.label = label
-
- # validate issuer
- if issuer:
- self._check_issuer(issuer)
- self.issuer = issuer
-
- # init period
- if period is not None:
- self._check_serial(period, "period", minval=1)
- self.period = period
-
- #=============================================================================
- # helpers to verify value types & ranges
- #=============================================================================
-
- @staticmethod
- def _check_serial(value, param, minval=0):
- """
- check that serial value (e.g. 'counter') is non-negative integer
- """
- if not isinstance(value, int_types):
- raise exc.ExpectedTypeError(value, "int", param)
- if value < minval:
- raise ValueError("%s must be >= %d" % (param, minval))
-
- @staticmethod
- def _check_label(label):
- """
- check that label doesn't contain chars forbidden by KeyURI spec
- """
- if label and ":" in label:
- raise ValueError("label may not contain ':'")
-
- @staticmethod
- def _check_issuer(issuer):
- """
- check that issuer doesn't contain chars forbidden by KeyURI spec
- """
- if issuer and ":" in issuer:
- raise ValueError("issuer may not contain ':'")
-
- #=============================================================================
- # key attributes
- #=============================================================================
-
- #------------------------------------------------------------------
- # raw key
- #------------------------------------------------------------------
- @property
- def key(self):
- """
- secret key as raw bytes
- """
- return self._key
-
- @key.setter
- def key(self, value):
- # set key
- if not isinstance(value, bytes):
- raise exc.ExpectedTypeError(value, bytes, "key")
- self._key = value
-
- # clear cached properties derived from key
- self._encrypted_key = self._keyed_hmac = None
-
- #------------------------------------------------------------------
- # encrypted key
- #------------------------------------------------------------------
- @property
- def encrypted_key(self):
- """
- secret key, encrypted using application secret.
- this match the output of :meth:`AppWallet.encrypt_key`,
- and should be treated as an opaque json serializable object.
- """
- enckey = self._encrypted_key
- if enckey is None:
- wallet = self.wallet
- if not wallet:
- raise TypeError("no application secrets present, can't encrypt TOTP key")
- enckey = self._encrypted_key = wallet.encrypt_key(self.key)
- return enckey
-
- @encrypted_key.setter
- def encrypted_key(self, value):
- wallet = self.wallet
- if not wallet:
- raise TypeError("no application secrets present, can't decrypt TOTP key")
- self.key, needs_recrypt = wallet.decrypt_key(value)
- if needs_recrypt:
- # mark as changed so it gets re-encrypted & written to db
- self.changed = True
- else:
- # cache encrypted key for re-use
- self._encrypted_key = value
-
- #------------------------------------------------------------------
- # pretty-printed / encoded key helpers
- #------------------------------------------------------------------
-
- @property
- def hex_key(self):
- """
- secret key encoded as hexadecimal string
- """
- return bascii_to_str(base64.b16encode(self.key)).lower()
-
- @property
- def base32_key(self):
- """
- secret key encoded as base32 string
- """
- return b32encode(self.key)
-
- def pretty_key(self, format="base32", sep="-"):
- """
- pretty-print the secret key.
-
- This is mainly useful for situations where the user cannot get the qrcode to work,
- and must enter the key manually into their TOTP client. It tries to format
- the key in a manner that is easier for humans to read.
-
- :param format:
- format to output secret key. ``"hex"`` and ``"base32"`` are both accepted.
-
- :param sep:
- separator to insert to break up key visually.
- can be any of ``"-"`` (the default), ``" "``, or ``False`` (no separator).
-
- :return:
- key as native string.
-
- Usage example::
-
- >>> t = TOTP('s3jdvb7qd2r7jpxx')
- >>> t.pretty_key()
- 'S3JD-VB7Q-D2R7-JPXX'
- """
- if format == "hex" or format == "base16":
- key = self.hex_key
- elif format == "base32":
- key = self.base32_key
- else:
- raise ValueError("unknown byte-encoding format: %r" % (format,))
- if sep:
- key = group_string(key, sep)
- return key
-
- #=============================================================================
- # time & token parsing
- #=============================================================================
-
- @classmethod
- def normalize_time(cls, time):
- """
- Normalize time value to unix epoch seconds.
-
- :arg time:
- Can be ``None``, :class:`!datetime`,
- or unix epoch timestamp as :class:`!float` or :class:`!int`.
- If ``None``, uses current system time.
- Naive datetimes are treated as UTC.
-
- :returns:
- unix epoch timestamp as :class:`int`.
- """
- if isinstance(time, int_types):
- return time
- elif isinstance(time, float):
- return int(time)
- elif time is None:
- return int(cls.now())
- elif hasattr(time, "utctimetuple"):
- # coerce datetime to UTC timestamp
- # NOTE: utctimetuple() assumes naive datetimes are in UTC
- # NOTE: we explicitly *don't* want microseconds.
- return calendar.timegm(time.utctimetuple())
- else:
- raise exc.ExpectedTypeError(time, "int, float, or datetime", "time")
-
- def _time_to_counter(self, time):
- """
- convert timestamp to HOTP counter using :attr:`period`.
- """
- return time // self.period
-
- def _counter_to_time(self, counter):
- """
- convert HOTP counter to timestamp using :attr:`period`.
- """
- return counter * self.period
-
- @hybrid_method
- def normalize_token(self_or_cls, token):
- """
- Normalize OTP token representation:
- strips whitespace, converts integers to a zero-padded string,
- validates token content & number of digits.
-
- This is a hybrid method -- it can be called at the class level,
- as ``TOTP.normalize_token()``, or the instance level as ``TOTP().normalize_token()``.
- It will normalize to the instance-specific number of :attr:`~TOTP.digits`,
- or use the class default.
-
- :arg token:
- token as ascii bytes, unicode, or an integer.
-
- :raises ValueError:
- if token has wrong number of digits, or contains non-numeric characters.
-
- :returns:
- token as :class:`!unicode` string, containing only digits 0-9.
- """
- digits = self_or_cls.digits
- if isinstance(token, int_types):
- token = u("%0*d") % (digits, token)
- else:
- token = to_unicode(token, param="token")
- token = _clean_re.sub(u(""), token)
- if not token.isdigit():
- raise MalformedTokenError("Token must contain only the digits 0-9")
- if len(token) != digits:
- raise MalformedTokenError("Token must have exactly %d digits" % digits)
- return token
-
- #=============================================================================
- # token generation
- #=============================================================================
-
- # # debug helper
- # def generate_range(self, size, time=None):
- # counter = self._time_to_counter(time) - (size + 1) // 2
- # end = counter + size
- # while counter <= end:
- # token = self._generate(counter)
- # yield TotpToken(self, token, counter)
- # counter += 1
-
- def generate(self, time=None):
- """
- Generate token for specified time
- (uses current time if none specified).
-
- :arg time:
- Can be ``None``, a :class:`!datetime`,
- or class:`!float` / :class:`!int` unix epoch timestamp.
- If ``None`` (the default), uses current system time.
- Naive datetimes are treated as UTC.
-
- :returns:
-
- A :class:`TotpToken` instance, which can be treated
- as a sequence of ``(token, expire_time)`` -- see that class
- for more details.
-
- Usage example::
-
- >>> # generate a new token, wrapped in a TotpToken instance...
- >>> otp = TOTP('s3jdvb7qd2r7jpxx')
- >>> otp.generate(1419622739)
- <TotpToken token='897212' expire_time=1419622740>
-
- >>> # when you just need the token...
- >>> otp.generate(1419622739).token
- '897212'
- """
- time = self.normalize_time(time)
- counter = self._time_to_counter(time)
- if counter < 0:
- raise ValueError("timestamp must be >= 0")
- token = self._generate(counter)
- return TotpToken(self, token, counter)
-
- def _generate(self, counter):
- """
- base implementation of HOTP token generation algorithm.
-
- :arg counter: HOTP counter, as non-negative integer
- :returns: token as unicode string
- """
- # generate digest
- assert isinstance(counter, int_types), "counter must be integer"
- assert counter >= 0, "counter must be non-negative"
- keyed_hmac = self._keyed_hmac
- if keyed_hmac is None:
- keyed_hmac = self._keyed_hmac = compile_hmac(self.alg, self.key)
- digest = keyed_hmac(_pack_uint64(counter))
- digest_size = keyed_hmac.digest_info.digest_size
- assert len(digest) == digest_size, "digest_size: sanity check failed"
-
- # derive 31-bit token value
- assert digest_size >= 20, "digest_size: sanity check 2 failed" # otherwise 0xF+4 will run off end of hash.
- offset = byte_elem_value(digest[-1]) & 0xF
- value = _unpack_uint32(digest[offset:offset+4])[0] & 0x7fffffff
-
- # render to decimal string, return last <digits> chars
- # NOTE: the 10'th digit is not as secure, as it can only take on values 0-2, not 0-9,
- # due to 31-bit mask on int ">I". But some servers / clients use it :|
- # if 31-bit mask removed (which breaks spec), would only get values 0-4.
- digits = self.digits
- assert 0 < digits < 11, "digits: sanity check failed"
- return (u("%0*d") % (digits, value))[-digits:]
-
- #=============================================================================
- # token verification
- #=============================================================================
-
- @classmethod
- def verify(cls, token, source, **kwds):
- r"""
- Convenience wrapper around :meth:`TOTP.from_source` and :meth:`TOTP.match`.
-
- This parses a TOTP key & configuration from the specified source,
- and tries and match the token.
- It's designed to parallel the :meth:`passlib.ifc.PasswordHash.verify` method.
-
- :param token:
- Token string to match.
-
- :param source:
- Serialized TOTP key.
- Can be anything accepted by :meth:`TOTP.from_source`.
-
- :param \\*\\*kwds:
- All additional keywords passed to :meth:`TOTP.match`.
-
- :return:
- A :class:`TotpMatch` instance, or raises a :exc:`TokenError`.
- """
- return cls.from_source(source).match(token, **kwds)
-
- def match(self, token, time=None, window=30, skew=0, last_counter=None):
- """
- Match TOTP token against specified timestamp.
- Searches within a window before & after the provided time,
- in order to account for transmission delay and small amounts of skew in the client's clock.
-
- :arg token:
- Token to validate.
- may be integer or string (whitespace and hyphens are ignored).
-
- :param time:
- Unix epoch timestamp, can be any of :class:`!float`, :class:`!int`, or :class:`!datetime`.
- if ``None`` (the default), uses current system time.
- *this should correspond to the time the token was received from the client*.
-
- :param int window:
- How far backward and forward in time to search for a match.
- Measured in seconds. Defaults to ``30``. Typically only useful if set
- to multiples of :attr:`period`.
-
- :param int skew:
- Adjust timestamp by specified value, to account for excessive
- client clock skew. Measured in seconds. Defaults to ``0``.
-
- Negative skew (the common case) indicates transmission delay,
- and/or that the client clock is running behind the server.
-
- Positive skew indicates the client clock is running ahead of the server
- (and by enough that it cancels out any negative skew added by
- the transmission delay).
-
- You should ensure the server clock uses a reliable time source such as NTP,
- so that only the client clock's inaccuracy needs to be accounted for.
-
- This is an advanced parameter that should usually be left at ``0``;
- The **window** parameter is usually enough to account
- for any observed transmission delay.
-
- :param last_counter:
- Optional value of last counter value that was successfully used.
- If specified, verify will never search earlier counters,
- no matter how large the window is.
-
- Useful when client has previously authenticated,
- and thus should never provide a token older than previously
- verified value.
-
- :raises ~passlib.exc.TokenError:
-
- If the token is malformed, fails to match, or has already been used.
-
- :returns TotpMatch:
-
- Returns a :class:`TotpMatch` instance on successful match.
- Can be treated as tuple of ``(counter, time)``.
- Raises error if token is malformed / can't be verified.
-
- Usage example::
-
- >>> totp = TOTP('s3jdvb7qd2r7jpxx')
-
- >>> # valid token for this time period
- >>> totp.match('897212', 1419622729)
- <TotpMatch counter=47320757 time=1419622729 cache_seconds=60>
-
- >>> # token from counter step 30 sec ago (within allowed window)
- >>> totp.match('000492', 1419622729)
- <TotpMatch counter=47320756 time=1419622729 cache_seconds=60>
-
- >>> # invalid token -- token from 60 sec ago (outside of window)
- >>> totp.match('760389', 1419622729)
- Traceback:
- ...
- InvalidTokenError: Token did not match
- """
- time = self.normalize_time(time)
- self._check_serial(window, "window")
-
- client_time = time + skew
- if last_counter is None:
- last_counter = -1
- start = max(last_counter, self._time_to_counter(client_time - window))
- end = self._time_to_counter(client_time + window) + 1
- # XXX: could pass 'expected = _time_to_counter(client_time + TRANSMISSION_DELAY)'
- # to the _find_match() method, would help if window set to very large value.
-
- counter = self._find_match(token, start, end)
- assert counter >= last_counter, "sanity check failed: counter went backward"
-
- if counter == last_counter:
- raise UsedTokenError(expire_time=(last_counter + 1) * self.period)
-
- # NOTE: By returning match tied to <time>, not <client_time>, we're
- # causing .skipped to reflect the observed skew, independent of
- # the 'skew' param. This is deliberately done so that caller
- # can use historical .skipped values to estimate future skew.
- return TotpMatch(self, counter, time, window)
-
- def _find_match(self, token, start, end, expected=None):
- """
- helper for verify() --
- returns counter value within specified range that matches token.
-
- :arg token:
- token value to match (will be normalized internally)
-
- :arg start:
- starting counter value to check
-
- :arg end:
- check up to (but not including) this counter value
-
- :arg expected:
- optional expected value where search should start,
- to help speed up searches.
-
- :raises ~passlib.exc.TokenError:
- If the token is malformed, or fails to verify.
-
- :returns:
- counter value that matched
- """
- token = self.normalize_token(token)
- if start < 0:
- start = 0
- if end <= start:
- raise InvalidTokenError()
- generate = self._generate
- if not (expected is None or expected < start) and consteq(token, generate(expected)):
- return expected
- # XXX: if (end - start) is very large (e.g. for resync purposes),
- # could start with expected value, and work outward from there,
- # alternately checking before & after it until match is found.
- # XXX: can't use irange(start, end) here since py2x/win32
- # throws error on values >= (1<<31), which 'end' can be.
- counter = start
- while counter < end:
- if consteq(token, generate(counter)):
- return counter
- counter += 1
- raise InvalidTokenError()
-
- #-------------------------------------------------------------------------
- # TODO: resync(self, tokens, time=None, min_tokens=10, window=100)
- # helper to re-synchronize using series of sequential tokens,
- # all of which must validate; per RFC recommendation.
- # NOTE: need to make sure this function is constant time
- # (i.e. scans ALL tokens, and doesn't short-circuit after first mismatch)
- #-------------------------------------------------------------------------
-
- #=============================================================================
- # generic parsing
- #=============================================================================
-
- @classmethod
- def from_source(cls, source):
- """
- Load / create a TOTP object from a serialized source.
- This acts as a wrapper for the various deserialization methods:
-
- * TOTP URIs are handed off to :meth:`from_uri`
- * Any other strings are handed off to :meth:`from_json`
- * Dicts are handed off to :meth:`from_dict`
-
- :param source:
- Serialized TOTP object.
-
- :raises ValueError:
- If the key has been encrypted, but the application secret isn't available;
- or if the string cannot be recognized, parsed, or decoded.
-
- See :meth:`TOTP.using()` for how to configure application secrets.
-
- :returns:
- a :class:`TOTP` instance.
- """
- if isinstance(source, TOTP):
- # return object unchanged if they share same wallet.
- # otherwise make a new one that's bound to expected wallet.
- if cls.wallet == source.wallet:
- return source
- source = source.to_dict(encrypt=False)
- if isinstance(source, dict):
- return cls.from_dict(source)
- # NOTE: letting to_unicode() raise TypeError in this case
- source = to_unicode(source, param="totp source")
- if source.startswith("otpauth://"):
- return cls.from_uri(source)
- else:
- return cls.from_json(source)
-
- #=============================================================================
- # uri parsing
- #=============================================================================
- @classmethod
- def from_uri(cls, uri):
- """
- create an OTP instance from a URI (such as returned by :meth:`to_uri`).
-
- :returns:
- :class:`TOTP` instance.
-
- :raises ValueError:
- if the uri cannot be parsed or contains errors.
-
- .. seealso:: :ref:`totp-configuring-clients` tutorial for a usage example
- """
- # check for valid uri
- uri = to_unicode(uri, param="uri").strip()
- result = urlparse(uri)
- if result.scheme != "otpauth":
- raise cls._uri_parse_error("wrong uri scheme")
-
- # validate netloc, and hand off to helper
- cls._check_otp_type(result.netloc)
- return cls._from_parsed_uri(result)
-
- @classmethod
- def _check_otp_type(cls, type):
- """
- validate otp URI type is supported.
- returns True or raises appropriate error.
- """
- if type == "totp":
- return True
- if type == "hotp":
- raise NotImplementedError("HOTP not supported")
- raise ValueError("unknown otp type: %r" % type)
-
- @classmethod
- def _from_parsed_uri(cls, result):
- """
- internal from_uri() helper --
- handles parsing a validated TOTP URI
-
- :param result:
- a urlparse() instance
-
- :returns:
- cls instance
- """
-
- # decode label from uri path
- label = result.path
- if label.startswith("/") and len(label) > 1:
- label = unquote(label[1:])
- else:
- raise cls._uri_parse_error("missing label")
-
- # extract old-style issuer prefix
- if ":" in label:
- try:
- issuer, label = label.split(":")
- except ValueError: # too many ":"
- raise cls._uri_parse_error("malformed label")
- else:
- issuer = None
- if label:
- # NOTE: KeyURI spec says there may be leading spaces
- label = label.strip() or None
-
- # parse query params
- params = dict(label=label)
- for k, v in parse_qsl(result.query):
- if k in params:
- raise cls._uri_parse_error("duplicate parameter (%r)" % k)
- params[k] = v
-
- # synchronize issuer prefix w/ issuer param
- if issuer:
- if "issuer" not in params:
- params['issuer'] = issuer
- elif params['issuer'] != issuer:
- raise cls._uri_parse_error("conflicting issuer identifiers")
-
- # convert query params to constructor kwds, and call constructor
- return cls(**cls._adapt_uri_params(**params))
-
- @classmethod
- def _adapt_uri_params(cls, label=None, secret=None, issuer=None,
- digits=None, algorithm=None, period=None,
- **extra):
- """
- from_uri() helper --
- converts uri params into constructor args.
- """
- assert label, "from_uri() failed to provide label"
- if not secret:
- raise cls._uri_parse_error("missing 'secret' parameter")
- kwds = dict(label=label, issuer=issuer, key=secret, format="base32")
- if digits:
- kwds['digits'] = cls._uri_parse_int(digits, "digits")
- if algorithm:
- kwds['alg'] = algorithm
- if period:
- kwds['period'] = cls._uri_parse_int(period, "period")
- if extra:
- # malicious uri, deviation from spec, or newer revision of spec?
- # in either case, we issue warning and ignore extra params.
- warn("%s: unexpected parameters encountered in otp uri: %r" %
- (cls, extra), exc.PasslibRuntimeWarning)
- return kwds
-
- @staticmethod
- def _uri_parse_error(reason):
- """uri parsing helper -- creates preformatted error message"""
- return ValueError("Invalid otpauth uri: %s" % (reason,))
-
- @classmethod
- def _uri_parse_int(cls, source, param):
- """uri parsing helper -- int() wrapper"""
- try:
- return int(source)
- except ValueError:
- raise cls._uri_parse_error("Malformed %r parameter" % param)
-
- #=============================================================================
- # uri rendering
- #=============================================================================
- def to_uri(self, label=None, issuer=None):
- """
- Serialize key and configuration into a URI, per
- Google Auth's `KeyUriFormat <http://code.google.com/p/google-authenticator/wiki/KeyUriFormat>`_.
-
- :param str label:
- Label to associate with this token when generating a URI.
- Displayed to user by most OTP client applications (e.g. Google Authenticator),
- and typically has format such as ``"John Smith"`` or ``"jsmith@webservice.example.org"``.
-
- Defaults to **label** constructor argument. Must be provided in one or the other location.
- May not contain ``:``.
-
- :param str issuer:
- String identifying the token issuer (e.g. the domain or canonical name of your service).
- Optional but strongly recommended if you're rendering to a URI.
- Used internally by some OTP client applications (e.g. Google Authenticator) to distinguish entries
- which otherwise have the same label.
-
- Defaults to **issuer** constructor argument, or ``None``.
- May not contain ``:``.
-
- :raises ValueError:
- * if a label was not provided either as an argument, or in the constructor.
- * if the label or issuer contains invalid characters.
-
- :returns:
- all the configuration information for this OTP token generator,
- encoded into a URI.
-
- These URIs are frequently converted to a QRCode for transferring
- to a TOTP client application such as Google Auth.
- Usage example::
-
- >>> from passlib.totp import TOTP
- >>> tp = TOTP('s3jdvb7qd2r7jpxx')
- >>> uri = tp.to_uri("user@example.org", "myservice.another-example.org")
- >>> uri
- 'otpauth://totp/user@example.org?secret=S3JDVB7QD2R7JPXX&issuer=myservice.another-example.org'
-
- .. versionchanged:: 1.7.2
-
- This method now prepends the issuer URI label. This is recommended by the KeyURI
- specification, for compatibility with older clients.
- """
- # encode label
- if label is None:
- label = self.label
- if not label:
- raise ValueError("a label must be specified as argument, or in the constructor")
- self._check_label(label)
- # NOTE: reference examples in spec seem to indicate the '@' in a label
- # shouldn't be escaped, though spec doesn't explicitly address this.
- # XXX: is '/' ok to leave unencoded?
- label = quote(label, '@')
-
- # encode query parameters
- params = self._to_uri_params()
- if issuer is None:
- issuer = self.issuer
- if issuer:
- self._check_issuer(issuer)
- # NOTE: per KeyURI spec, including issuer as part of label is deprecated,
- # in favor of adding it to query params. however, some QRCode clients
- # don't recognize the 'issuer' query parameter, so spec recommends (as of 2018-7)
- # to include both.
- label = "%s:%s" % (quote(issuer, '@'), label)
- params.append(("issuer", issuer))
- # NOTE: not using urllib.urlencode() because it encodes ' ' as '+';
- # but spec says to use '%20', and not sure how fragile
- # the various totp clients' parsers are.
- param_str = u("&").join(u("%s=%s") % (key, quote(value, '')) for key, value in params)
- assert param_str, "param_str should never be empty"
-
- # render uri
- return u("otpauth://totp/%s?%s") % (label, param_str)
-
- def _to_uri_params(self):
- """return list of (key, param) entries for URI"""
- args = [("secret", self.base32_key)]
- if self.alg != "sha1":
- args.append(("algorithm", self.alg.upper()))
- if self.digits != 6:
- args.append(("digits", str(self.digits)))
- if self.period != 30:
- args.append(("period", str(self.period)))
- return args
-
- #=============================================================================
- # json rendering / parsing
- #=============================================================================
-
- @classmethod
- def from_json(cls, source):
- """
- Load / create an OTP object from a serialized json string
- (as generated by :meth:`to_json`).
-
- :arg json:
- Serialized output from :meth:`to_json`, as unicode or ascii bytes.
-
- :raises ValueError:
- If the key has been encrypted, but the application secret isn't available;
- or if the string cannot be recognized, parsed, or decoded.
-
- See :meth:`TOTP.using()` for how to configure application secrets.
-
- :returns:
- a :class:`TOTP` instance.
-
- .. seealso:: :ref:`totp-storing-instances` tutorial for a usage example
- """
- source = to_unicode(source, param="json source")
- return cls.from_dict(json.loads(source))
-
- def to_json(self, encrypt=None):
- """
- Serialize configuration & internal state to a json string,
- mainly useful for persisting client-specific state in a database.
- All keywords passed to :meth:`to_dict`.
-
- :returns:
- json string containing serializes configuration & state.
- """
- state = self.to_dict(encrypt=encrypt)
- return json.dumps(state, sort_keys=True, separators=(",", ":"))
-
- #=============================================================================
- # dict rendering / parsing
- #=============================================================================
-
- @classmethod
- def from_dict(cls, source):
- """
- Load / create a TOTP object from a dictionary
- (as generated by :meth:`to_dict`)
-
- :param source:
- dict containing serialized TOTP key & configuration.
-
- :raises ValueError:
- If the key has been encrypted, but the application secret isn't available;
- or if the dict cannot be recognized, parsed, or decoded.
-
- See :meth:`TOTP.using()` for how to configure application secrets.
-
- :returns:
- A :class:`TOTP` instance.
-
- .. seealso:: :ref:`totp-storing-instances` tutorial for a usage example
- """
- if not isinstance(source, dict) or "type" not in source:
- raise cls._dict_parse_error("unrecognized format")
- return cls(**cls._adapt_dict_kwds(**source))
-
- @classmethod
- def _adapt_dict_kwds(cls, type, **kwds):
- """
- Internal helper for .from_json() --
- Adapts serialized json dict into constructor keywords.
- """
- # default json format is just serialization of constructor kwds.
- # XXX: just pass all this through to _from_json / constructor?
- # go ahead and mark as changed (needs re-saving) if the version is too old
- assert cls._check_otp_type(type)
- ver = kwds.pop("v", None)
- if not ver or ver < cls.min_json_version or ver > cls.json_version:
- raise cls._dict_parse_error("missing/unsupported version (%r)" % (ver,))
- elif ver != cls.json_version:
- # mark older version as needing re-serializing
- kwds['changed'] = True
- if 'enckey' in kwds:
- # handing encrypted key off to constructor, which handles the
- # decryption. this lets it get ahold of (and store) the original
- # encrypted key, so if to_json() is called again, the encrypted
- # key can be re-used.
- # XXX: wallet is known at this point, could decrypt key here.
- assert 'key' not in kwds # shouldn't be present w/ enckey
- kwds.update(key=kwds.pop("enckey"), format="encrypted")
- elif 'key' not in kwds:
- raise cls._dict_parse_error("missing 'enckey' / 'key'")
- # XXX: could should set changed=True if active wallet is available,
- # and source wasn't encrypted.
- kwds.pop("last_counter", None) # extract legacy counter parameter
- return kwds
-
- @staticmethod
- def _dict_parse_error(reason):
- """dict parsing helper -- creates preformatted error message"""
- return ValueError("Invalid totp data: %s" % (reason,))
-
- def to_dict(self, encrypt=None):
- """
- Serialize configuration & internal state to a dict,
- mainly useful for persisting client-specific state in a database.
-
- :param encrypt:
- Whether to output should be encrypted.
-
- * ``None`` (the default) -- uses encrypted key if application
- secrets are available, otherwise uses plaintext key.
- * ``True`` -- uses encrypted key, or raises TypeError
- if application secret wasn't provided to OTP constructor.
- * ``False`` -- uses raw key.
-
- :returns:
- dictionary, containing basic (json serializable) datatypes.
- """
- # NOTE: 'type' may seem redundant, but using it so code can try to
- # detect that this *is* a TOTP json string / dict.
- state = dict(v=self.json_version, type="totp")
- if self.alg != "sha1":
- state['alg'] = self.alg
- if self.digits != 6:
- state['digits'] = self.digits
- if self.period != 30:
- state['period'] = self.period
- # XXX: should we include label as part of json format?
- if self.label:
- state['label'] = self.label
- issuer = self.issuer
- if issuer and issuer != type(self).issuer:
- # (omit issuer if it matches class default)
- state['issuer'] = issuer
- if encrypt is None:
- wallet = self.wallet
- encrypt = wallet and wallet.has_secrets
- if encrypt:
- state['enckey'] = self.encrypted_key
- else:
- state['key'] = self.base32_key
- # NOTE: in the future, may add a "history" parameter
- # containing a list of (time, skipped) pairs, encoding
- # the last X successful verifications, to allow persisting
- # & estimating client clock skew over time.
- return state
-
- #=============================================================================
- # eoc
- #=============================================================================
-
- #=============================================================================
- # TOTP helpers
- #=============================================================================
- class TotpToken(SequenceMixin):
- """
- Object returned by :meth:`TOTP.generate`.
- It can be treated as a sequence of ``(token, expire_time)``,
- or accessed via the following attributes:
-
- .. autoattribute:: token
- .. autoattribute:: expire_time
- .. autoattribute:: counter
- .. autoattribute:: remaining
- .. autoattribute:: valid
- """
- #: TOTP object that generated this token
- totp = None
-
- #: Token as decimal-encoded ascii string.
- token = None
-
- #: HOTP counter value used to generate token (derived from time)
- counter = None
-
- def __init__(self, totp, token, counter):
- """
- .. warning::
- the constructor signature is an internal detail, and is subject to change.
- """
- self.totp = totp
- self.token = token
- self.counter = counter
-
- @memoized_property
- def start_time(self):
- """Timestamp marking beginning of period when token is valid"""
- return self.totp._counter_to_time(self.counter)
-
- @memoized_property
- def expire_time(self):
- """Timestamp marking end of period when token is valid"""
- return self.totp._counter_to_time(self.counter + 1)
-
- @property
- def remaining(self):
- """number of (float) seconds before token expires"""
- return max(0, self.expire_time - self.totp.now())
-
- @property
- def valid(self):
- """whether token is still valid"""
- return bool(self.remaining)
-
- def _as_tuple(self):
- return self.token, self.expire_time
-
- def __repr__(self):
- expired = "" if self.remaining else " expired"
- return "<TotpToken token='%s' expire_time=%d%s>" % \
- (self.token, self.expire_time, expired)
-
-
- class TotpMatch(SequenceMixin):
- """
- Object returned by :meth:`TOTP.match` and :meth:`TOTP.verify` on a successful match.
-
- It can be treated as a sequence of ``(counter, time)``,
- or accessed via the following attributes:
-
- .. autoattribute:: counter
- :annotation: = 0
-
- .. autoattribute:: time
- :annotation: = 0
-
- .. autoattribute:: expected_counter
- :annotation: = 0
-
- .. autoattribute:: skipped
- :annotation: = 0
-
- .. autoattribute:: expire_time
- :annotation: = 0
-
- .. autoattribute:: cache_seconds
- :annotation: = 60
-
- .. autoattribute:: cache_time
- :annotation: = 0
-
- This object will always have a ``True`` boolean value.
- """
-
- #: TOTP object that generated this token
- totp = None
-
- #: TOTP counter value which matched token.
- #: (Best practice is to subsequently ignore tokens matching this counter
- #: or earlier)
- counter = 0
-
- #: Timestamp when verification was performed.
- time = 0
-
- #: Search window used by verify() (affects cache_time)
- window = 30
-
- def __init__(self, totp, counter, time, window=30):
- """
- .. warning::
- the constructor signature is an internal detail, and is subject to change.
- """
- self.totp = totp
- self.counter = counter
- self.time = time
- self.window = window
-
- @memoized_property
- def expected_counter(self):
- """
- Counter value expected for timestamp.
- """
- return self.totp._time_to_counter(self.time)
-
- @memoized_property
- def skipped(self):
- """
- How many steps were skipped between expected and actual matched counter
- value (may be positive, zero, or negative).
- """
- return self.counter - self.expected_counter
-
- # @memoized_property
- # def start_time(self):
- # """Timestamp marking start of period when token is valid"""
- # return self.totp._counter_to_time(self.counter + 1)
-
- @memoized_property
- def expire_time(self):
- """Timestamp marking end of period when token is valid"""
- return self.totp._counter_to_time(self.counter + 1)
-
- @memoized_property
- def cache_seconds(self):
- """
- Number of seconds counter should be cached
- before it's guaranteed to have passed outside of verification window.
- """
- # XXX: real value is 'cache_time - now()',
- # but this is a cheaper upper bound.
- return self.totp.period + self.window
-
- @memoized_property
- def cache_time(self):
- """
- Timestamp marking when counter has passed outside of verification window.
- """
- return self.expire_time + self.window
-
- def _as_tuple(self):
- return self.counter, self.time
-
- def __repr__(self):
- args = (self.counter, self.time, self.cache_seconds)
- return "<TotpMatch counter=%d time=%d cache_seconds=%d>" % args
-
- #=============================================================================
- # convenience helpers
- #=============================================================================
-
- def generate_secret(entropy=256, charset=BASE64_CHARS[:-2]):
- """
- generate a random string suitable for use as an
- :class:`AppWallet` application secret.
-
- :param entropy:
- number of bits of entropy (controls size/complexity of password).
- """
- assert entropy > 0
- assert len(charset) > 1
- count = int(math.ceil(entropy * math.log(2, len(charset))))
- return getrandstr(rng, charset, count)
-
- #=============================================================================
- # eof
- #=============================================================================
|