You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

284 lines
9.1 KiB

  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import io
  4. import os
  5. import re
  6. import shutil
  7. import sys
  8. import tempfile
  9. from typing import (Dict, Iterator, List, Match, Optional, # noqa
  10. Pattern, Union, TYPE_CHECKING, Text, IO, Tuple)
  11. import warnings
  12. from collections import OrderedDict
  13. from contextlib import contextmanager
  14. from .compat import StringIO, PY2, to_env
  15. from .parser import parse_stream
  16. if TYPE_CHECKING: # pragma: no cover
  17. if sys.version_info >= (3, 6):
  18. _PathLike = os.PathLike
  19. else:
  20. _PathLike = Text
  21. if sys.version_info >= (3, 0):
  22. _StringIO = StringIO
  23. else:
  24. _StringIO = StringIO[Text]
  25. __posix_variable = re.compile(r'\$\{[^\}]*\}') # type: Pattern[Text]
  26. class DotEnv():
  27. def __init__(self, dotenv_path, verbose=False, encoding=None):
  28. # type: (Union[Text, _PathLike, _StringIO], bool, Union[None, Text]) -> None
  29. self.dotenv_path = dotenv_path # type: Union[Text,_PathLike, _StringIO]
  30. self._dict = None # type: Optional[Dict[Text, Text]]
  31. self.verbose = verbose # type: bool
  32. self.encoding = encoding # type: Union[None, Text]
  33. @contextmanager
  34. def _get_stream(self):
  35. # type: () -> Iterator[IO[Text]]
  36. if isinstance(self.dotenv_path, StringIO):
  37. yield self.dotenv_path
  38. elif os.path.isfile(self.dotenv_path):
  39. with io.open(self.dotenv_path, encoding=self.encoding) as stream:
  40. yield stream
  41. else:
  42. if self.verbose:
  43. warnings.warn("File doesn't exist {}".format(self.dotenv_path)) # type: ignore
  44. yield StringIO('')
  45. def dict(self):
  46. # type: () -> Dict[Text, Text]
  47. """Return dotenv as dict"""
  48. if self._dict:
  49. return self._dict
  50. values = OrderedDict(self.parse())
  51. self._dict = resolve_nested_variables(values)
  52. return self._dict
  53. def parse(self):
  54. # type: () -> Iterator[Tuple[Text, Text]]
  55. with self._get_stream() as stream:
  56. for mapping in parse_stream(stream):
  57. if mapping.key is not None and mapping.value is not None:
  58. yield mapping.key, mapping.value
  59. def set_as_environment_variables(self, override=False):
  60. # type: (bool) -> bool
  61. """
  62. Load the current dotenv as system environemt variable.
  63. """
  64. for k, v in self.dict().items():
  65. if k in os.environ and not override:
  66. continue
  67. os.environ[to_env(k)] = to_env(v)
  68. return True
  69. def get(self, key):
  70. # type: (Text) -> Optional[Text]
  71. """
  72. """
  73. data = self.dict()
  74. if key in data:
  75. return data[key]
  76. if self.verbose:
  77. warnings.warn("key %s not found in %s." % (key, self.dotenv_path)) # type: ignore
  78. return None
  79. def get_key(dotenv_path, key_to_get):
  80. # type: (Union[Text, _PathLike], Text) -> Optional[Text]
  81. """
  82. Gets the value of a given key from the given .env
  83. If the .env path given doesn't exist, fails
  84. """
  85. return DotEnv(dotenv_path, verbose=True).get(key_to_get)
  86. @contextmanager
  87. def rewrite(path):
  88. # type: (_PathLike) -> Iterator[Tuple[IO[Text], IO[Text]]]
  89. try:
  90. with tempfile.NamedTemporaryFile(mode="w+", delete=False) as dest:
  91. with io.open(path) as source:
  92. yield (source, dest) # type: ignore
  93. except BaseException:
  94. if os.path.isfile(dest.name):
  95. os.unlink(dest.name)
  96. raise
  97. else:
  98. shutil.move(dest.name, path)
  99. def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
  100. # type: (_PathLike, Text, Text, Text) -> Tuple[Optional[bool], Text, Text]
  101. """
  102. Adds or Updates a key/value to the given .env
  103. If the .env path given doesn't exist, fails instead of risking creating
  104. an orphan .env somewhere in the filesystem
  105. """
  106. value_to_set = value_to_set.strip("'").strip('"')
  107. if not os.path.exists(dotenv_path):
  108. warnings.warn("can't write to %s - it doesn't exist." % dotenv_path) # type: ignore
  109. return None, key_to_set, value_to_set
  110. if " " in value_to_set:
  111. quote_mode = "always"
  112. line_template = '{}="{}"\n' if quote_mode == "always" else '{}={}\n'
  113. line_out = line_template.format(key_to_set, value_to_set)
  114. with rewrite(dotenv_path) as (source, dest):
  115. replaced = False
  116. for mapping in parse_stream(source):
  117. if mapping.key == key_to_set:
  118. dest.write(line_out)
  119. replaced = True
  120. else:
  121. dest.write(mapping.original)
  122. if not replaced:
  123. dest.write(line_out)
  124. return True, key_to_set, value_to_set
  125. def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
  126. # type: (_PathLike, Text, Text) -> Tuple[Optional[bool], Text]
  127. """
  128. Removes a given key from the given .env
  129. If the .env path given doesn't exist, fails
  130. If the given key doesn't exist in the .env, fails
  131. """
  132. if not os.path.exists(dotenv_path):
  133. warnings.warn("can't delete from %s - it doesn't exist." % dotenv_path) # type: ignore
  134. return None, key_to_unset
  135. removed = False
  136. with rewrite(dotenv_path) as (source, dest):
  137. for mapping in parse_stream(source):
  138. if mapping.key == key_to_unset:
  139. removed = True
  140. else:
  141. dest.write(mapping.original)
  142. if not removed:
  143. warnings.warn("key %s not removed from %s - key doesn't exist." % (key_to_unset, dotenv_path)) # type: ignore
  144. return None, key_to_unset
  145. return removed, key_to_unset
  146. def resolve_nested_variables(values):
  147. # type: (Dict[Text, Text]) -> Dict[Text, Text]
  148. def _replacement(name):
  149. # type: (Text) -> Text
  150. """
  151. get appropriate value for a variable name.
  152. first search in environ, if not found,
  153. then look into the dotenv variables
  154. """
  155. ret = os.getenv(name, new_values.get(name, ""))
  156. return ret
  157. def _re_sub_callback(match_object):
  158. # type: (Match[Text]) -> Text
  159. """
  160. From a match object gets the variable name and returns
  161. the correct replacement
  162. """
  163. return _replacement(match_object.group()[2:-1])
  164. new_values = {}
  165. for k, v in values.items():
  166. new_values[k] = __posix_variable.sub(_re_sub_callback, v)
  167. return new_values
  168. def _walk_to_root(path):
  169. # type: (Text) -> Iterator[Text]
  170. """
  171. Yield directories starting from the given directory up to the root
  172. """
  173. if not os.path.exists(path):
  174. raise IOError('Starting path not found')
  175. if os.path.isfile(path):
  176. path = os.path.dirname(path)
  177. last_dir = None
  178. current_dir = os.path.abspath(path)
  179. while last_dir != current_dir:
  180. yield current_dir
  181. parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
  182. last_dir, current_dir = current_dir, parent_dir
  183. def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
  184. # type: (Text, bool, bool) -> Text
  185. """
  186. Search in increasingly higher folders for the given file
  187. Returns path to the file if found, or an empty string otherwise
  188. """
  189. def _is_interactive():
  190. """ Decide whether this is running in a REPL or IPython notebook """
  191. main = __import__('__main__', None, None, fromlist=['__file__'])
  192. return not hasattr(main, '__file__')
  193. if usecwd or _is_interactive():
  194. # Should work without __file__, e.g. in REPL or IPython notebook.
  195. path = os.getcwd()
  196. else:
  197. # will work for .py files
  198. frame = sys._getframe()
  199. # find first frame that is outside of this file
  200. if PY2 and not __file__.endswith('.py'):
  201. # in Python2 __file__ extension could be .pyc or .pyo (this doesn't account
  202. # for edge case of Python compiled for non-standard extension)
  203. current_file = __file__.rsplit('.', 1)[0] + '.py'
  204. else:
  205. current_file = __file__
  206. while frame.f_code.co_filename == current_file:
  207. frame = frame.f_back
  208. frame_filename = frame.f_code.co_filename
  209. path = os.path.dirname(os.path.abspath(frame_filename))
  210. for dirname in _walk_to_root(path):
  211. check_path = os.path.join(dirname, filename)
  212. if os.path.isfile(check_path):
  213. return check_path
  214. if raise_error_if_not_found:
  215. raise IOError('File not found')
  216. return ''
  217. def load_dotenv(dotenv_path=None, stream=None, verbose=False, override=False, **kwargs):
  218. # type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, bool, Union[None, Text]) -> bool
  219. f = dotenv_path or stream or find_dotenv()
  220. return DotEnv(f, verbose=verbose, **kwargs).set_as_environment_variables(override=override)
  221. def dotenv_values(dotenv_path=None, stream=None, verbose=False, **kwargs):
  222. # type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, Union[None, Text]) -> Dict[Text, Text]
  223. f = dotenv_path or stream or find_dotenv()
  224. return DotEnv(f, verbose=verbose, **kwargs).dict()