You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

375 line
13 KiB

  1. import os
  2. import sys
  3. import shutil
  4. import tempfile
  5. import contextlib
  6. import shlex
  7. from ._compat import iteritems, PY2, string_types
  8. # If someone wants to vendor click, we want to ensure the
  9. # correct package is discovered. Ideally we could use a
  10. # relative import here but unfortunately Python does not
  11. # support that.
  12. clickpkg = sys.modules[__name__.rsplit('.', 1)[0]]
  13. if PY2:
  14. from cStringIO import StringIO
  15. else:
  16. import io
  17. from ._compat import _find_binary_reader
  18. class EchoingStdin(object):
  19. def __init__(self, input, output):
  20. self._input = input
  21. self._output = output
  22. def __getattr__(self, x):
  23. return getattr(self._input, x)
  24. def _echo(self, rv):
  25. self._output.write(rv)
  26. return rv
  27. def read(self, n=-1):
  28. return self._echo(self._input.read(n))
  29. def readline(self, n=-1):
  30. return self._echo(self._input.readline(n))
  31. def readlines(self):
  32. return [self._echo(x) for x in self._input.readlines()]
  33. def __iter__(self):
  34. return iter(self._echo(x) for x in self._input)
  35. def __repr__(self):
  36. return repr(self._input)
  37. def make_input_stream(input, charset):
  38. # Is already an input stream.
  39. if hasattr(input, 'read'):
  40. if PY2:
  41. return input
  42. rv = _find_binary_reader(input)
  43. if rv is not None:
  44. return rv
  45. raise TypeError('Could not find binary reader for input stream.')
  46. if input is None:
  47. input = b''
  48. elif not isinstance(input, bytes):
  49. input = input.encode(charset)
  50. if PY2:
  51. return StringIO(input)
  52. return io.BytesIO(input)
  53. class Result(object):
  54. """Holds the captured result of an invoked CLI script."""
  55. def __init__(self, runner, stdout_bytes, stderr_bytes, exit_code,
  56. exception, exc_info=None):
  57. #: The runner that created the result
  58. self.runner = runner
  59. #: The standard output as bytes.
  60. self.stdout_bytes = stdout_bytes
  61. #: The standard error as bytes, or False(y) if not available
  62. self.stderr_bytes = stderr_bytes
  63. #: The exit code as integer.
  64. self.exit_code = exit_code
  65. #: The exception that happened if one did.
  66. self.exception = exception
  67. #: The traceback
  68. self.exc_info = exc_info
  69. @property
  70. def output(self):
  71. """The (standard) output as unicode string."""
  72. return self.stdout
  73. @property
  74. def stdout(self):
  75. """The standard output as unicode string."""
  76. return self.stdout_bytes.decode(self.runner.charset, 'replace') \
  77. .replace('\r\n', '\n')
  78. @property
  79. def stderr(self):
  80. """The standard error as unicode string."""
  81. if not self.stderr_bytes:
  82. raise ValueError("stderr not separately captured")
  83. return self.stderr_bytes.decode(self.runner.charset, 'replace') \
  84. .replace('\r\n', '\n')
  85. def __repr__(self):
  86. return '<%s %s>' % (
  87. type(self).__name__,
  88. self.exception and repr(self.exception) or 'okay',
  89. )
  90. class CliRunner(object):
  91. """The CLI runner provides functionality to invoke a Click command line
  92. script for unittesting purposes in a isolated environment. This only
  93. works in single-threaded systems without any concurrency as it changes the
  94. global interpreter state.
  95. :param charset: the character set for the input and output data. This is
  96. UTF-8 by default and should not be changed currently as
  97. the reporting to Click only works in Python 2 properly.
  98. :param env: a dictionary with environment variables for overriding.
  99. :param echo_stdin: if this is set to `True`, then reading from stdin writes
  100. to stdout. This is useful for showing examples in
  101. some circumstances. Note that regular prompts
  102. will automatically echo the input.
  103. :param mix_stderr: if this is set to `False`, then stdout and stderr are
  104. preserved as independent streams. This is useful for
  105. Unix-philosophy apps that have predictable stdout and
  106. noisy stderr, such that each may be measured
  107. independently
  108. """
  109. def __init__(self, charset=None, env=None, echo_stdin=False,
  110. mix_stderr=True):
  111. if charset is None:
  112. charset = 'utf-8'
  113. self.charset = charset
  114. self.env = env or {}
  115. self.echo_stdin = echo_stdin
  116. self.mix_stderr = mix_stderr
  117. def get_default_prog_name(self, cli):
  118. """Given a command object it will return the default program name
  119. for it. The default is the `name` attribute or ``"root"`` if not
  120. set.
  121. """
  122. return cli.name or 'root'
  123. def make_env(self, overrides=None):
  124. """Returns the environment overrides for invoking a script."""
  125. rv = dict(self.env)
  126. if overrides:
  127. rv.update(overrides)
  128. return rv
  129. @contextlib.contextmanager
  130. def isolation(self, input=None, env=None, color=False):
  131. """A context manager that sets up the isolation for invoking of a
  132. command line tool. This sets up stdin with the given input data
  133. and `os.environ` with the overrides from the given dictionary.
  134. This also rebinds some internals in Click to be mocked (like the
  135. prompt functionality).
  136. This is automatically done in the :meth:`invoke` method.
  137. .. versionadded:: 4.0
  138. The ``color`` parameter was added.
  139. :param input: the input stream to put into sys.stdin.
  140. :param env: the environment overrides as dictionary.
  141. :param color: whether the output should contain color codes. The
  142. application can still override this explicitly.
  143. """
  144. input = make_input_stream(input, self.charset)
  145. old_stdin = sys.stdin
  146. old_stdout = sys.stdout
  147. old_stderr = sys.stderr
  148. old_forced_width = clickpkg.formatting.FORCED_WIDTH
  149. clickpkg.formatting.FORCED_WIDTH = 80
  150. env = self.make_env(env)
  151. if PY2:
  152. bytes_output = StringIO()
  153. if self.echo_stdin:
  154. input = EchoingStdin(input, bytes_output)
  155. sys.stdout = bytes_output
  156. if not self.mix_stderr:
  157. bytes_error = StringIO()
  158. sys.stderr = bytes_error
  159. else:
  160. bytes_output = io.BytesIO()
  161. if self.echo_stdin:
  162. input = EchoingStdin(input, bytes_output)
  163. input = io.TextIOWrapper(input, encoding=self.charset)
  164. sys.stdout = io.TextIOWrapper(
  165. bytes_output, encoding=self.charset)
  166. if not self.mix_stderr:
  167. bytes_error = io.BytesIO()
  168. sys.stderr = io.TextIOWrapper(
  169. bytes_error, encoding=self.charset)
  170. if self.mix_stderr:
  171. sys.stderr = sys.stdout
  172. sys.stdin = input
  173. def visible_input(prompt=None):
  174. sys.stdout.write(prompt or '')
  175. val = input.readline().rstrip('\r\n')
  176. sys.stdout.write(val + '\n')
  177. sys.stdout.flush()
  178. return val
  179. def hidden_input(prompt=None):
  180. sys.stdout.write((prompt or '') + '\n')
  181. sys.stdout.flush()
  182. return input.readline().rstrip('\r\n')
  183. def _getchar(echo):
  184. char = sys.stdin.read(1)
  185. if echo:
  186. sys.stdout.write(char)
  187. sys.stdout.flush()
  188. return char
  189. default_color = color
  190. def should_strip_ansi(stream=None, color=None):
  191. if color is None:
  192. return not default_color
  193. return not color
  194. old_visible_prompt_func = clickpkg.termui.visible_prompt_func
  195. old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func
  196. old__getchar_func = clickpkg.termui._getchar
  197. old_should_strip_ansi = clickpkg.utils.should_strip_ansi
  198. clickpkg.termui.visible_prompt_func = visible_input
  199. clickpkg.termui.hidden_prompt_func = hidden_input
  200. clickpkg.termui._getchar = _getchar
  201. clickpkg.utils.should_strip_ansi = should_strip_ansi
  202. old_env = {}
  203. try:
  204. for key, value in iteritems(env):
  205. old_env[key] = os.environ.get(key)
  206. if value is None:
  207. try:
  208. del os.environ[key]
  209. except Exception:
  210. pass
  211. else:
  212. os.environ[key] = value
  213. yield (bytes_output, not self.mix_stderr and bytes_error)
  214. finally:
  215. for key, value in iteritems(old_env):
  216. if value is None:
  217. try:
  218. del os.environ[key]
  219. except Exception:
  220. pass
  221. else:
  222. os.environ[key] = value
  223. sys.stdout = old_stdout
  224. sys.stderr = old_stderr
  225. sys.stdin = old_stdin
  226. clickpkg.termui.visible_prompt_func = old_visible_prompt_func
  227. clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func
  228. clickpkg.termui._getchar = old__getchar_func
  229. clickpkg.utils.should_strip_ansi = old_should_strip_ansi
  230. clickpkg.formatting.FORCED_WIDTH = old_forced_width
  231. def invoke(self, cli, args=None, input=None, env=None,
  232. catch_exceptions=True, color=False, mix_stderr=False, **extra):
  233. """Invokes a command in an isolated environment. The arguments are
  234. forwarded directly to the command line script, the `extra` keyword
  235. arguments are passed to the :meth:`~clickpkg.Command.main` function of
  236. the command.
  237. This returns a :class:`Result` object.
  238. .. versionadded:: 3.0
  239. The ``catch_exceptions`` parameter was added.
  240. .. versionchanged:: 3.0
  241. The result object now has an `exc_info` attribute with the
  242. traceback if available.
  243. .. versionadded:: 4.0
  244. The ``color`` parameter was added.
  245. :param cli: the command to invoke
  246. :param args: the arguments to invoke. It may be given as an iterable
  247. or a string. When given as string it will be interpreted
  248. as a Unix shell command. More details at
  249. :func:`shlex.split`.
  250. :param input: the input data for `sys.stdin`.
  251. :param env: the environment overrides.
  252. :param catch_exceptions: Whether to catch any other exceptions than
  253. ``SystemExit``.
  254. :param extra: the keyword arguments to pass to :meth:`main`.
  255. :param color: whether the output should contain color codes. The
  256. application can still override this explicitly.
  257. """
  258. exc_info = None
  259. with self.isolation(input=input, env=env, color=color) as outstreams:
  260. exception = None
  261. exit_code = 0
  262. if isinstance(args, string_types):
  263. args = shlex.split(args)
  264. try:
  265. prog_name = extra.pop("prog_name")
  266. except KeyError:
  267. prog_name = self.get_default_prog_name(cli)
  268. try:
  269. cli.main(args=args or (), prog_name=prog_name, **extra)
  270. except SystemExit as e:
  271. exc_info = sys.exc_info()
  272. exit_code = e.code
  273. if exit_code is None:
  274. exit_code = 0
  275. if exit_code != 0:
  276. exception = e
  277. if not isinstance(exit_code, int):
  278. sys.stdout.write(str(exit_code))
  279. sys.stdout.write('\n')
  280. exit_code = 1
  281. except Exception as e:
  282. if not catch_exceptions:
  283. raise
  284. exception = e
  285. exit_code = 1
  286. exc_info = sys.exc_info()
  287. finally:
  288. sys.stdout.flush()
  289. stdout = outstreams[0].getvalue()
  290. stderr = outstreams[1] and outstreams[1].getvalue()
  291. return Result(runner=self,
  292. stdout_bytes=stdout,
  293. stderr_bytes=stderr,
  294. exit_code=exit_code,
  295. exception=exception,
  296. exc_info=exc_info)
  297. @contextlib.contextmanager
  298. def isolated_filesystem(self):
  299. """A context manager that creates a temporary folder and changes
  300. the current working directory to it for isolated filesystem tests.
  301. """
  302. cwd = os.getcwd()
  303. t = tempfile.mkdtemp()
  304. os.chdir(t)
  305. try:
  306. yield t
  307. finally:
  308. os.chdir(cwd)
  309. try:
  310. shutil.rmtree(t)
  311. except (OSError, IOError):
  312. pass