test_base.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- coding: utf-8 -*-
  2. # vim: tabstop=4 shiftwidth=4 softtabstop=4
  3. # Copyright 2013, Red Hat, Inc.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. import shutil
  17. import tempfile
  18. import subprocess
  19. import logging
  20. LOG = logging.getLogger(__name__)
  21. class FakePopen(object):
  22. '''The FakePopen class replaces subprocess.Popen. Instead of actually
  23. executing commands, it permits the caller to register a list of
  24. commands the output to produce using the FakePopen.register and
  25. FakePopen.register_as_script method. By default, FakePopen will return
  26. empty stdout and stderr and a successful (0) returncode.
  27. '''
  28. cmd_registry = {}
  29. script_registry = {}
  30. @classmethod
  31. def register(cls, args, stdout='', stderr='', returncode=0):
  32. '''Register a fake command.'''
  33. if isinstance(args, list):
  34. args = tuple(args)
  35. cls.cmd_registry[args] = {'stdout': stdout,
  36. 'stderr': stderr,
  37. 'returncode': returncode}
  38. @classmethod
  39. def register_as_script(cls, args, stdout='', stderr='', returncode=0):
  40. '''Register a fake script.'''
  41. if isinstance(args, list):
  42. args = '\n'.join(args)
  43. prefix = "function t(){ exit $? ; } \n trap t ERR \n"
  44. args = prefix + args
  45. cls.script_registry[args] = {'stdout': stdout,
  46. 'stderr': stderr,
  47. 'returncode': returncode}
  48. def __init__(self, args, **kwargs):
  49. script = ["ssh", "-o", "StrictHostKeyChecking=no",
  50. "-o", "UserKnownHostsFile=/dev/null"]
  51. if args[-1] == "bash -x" and args[:5] == script:
  52. self._init_as_script(args, **kwargs)
  53. else:
  54. self._init_as_cmd(args, **kwargs)
  55. def _init_as_cmd(self, args, **kwargs):
  56. self._is_script = False
  57. if isinstance(args, list):
  58. args = tuple(args)
  59. cmd = ' '.join(args)
  60. else:
  61. cmd = args
  62. if args in self.cmd_registry:
  63. this = self.cmd_registry[args]
  64. else:
  65. LOG.warning('call to unregistered command: %s', cmd)
  66. this = {'stdout': '', 'stderr': '', 'returncode': 0}
  67. self.stdout = this['stdout']
  68. self.stderr = this['stderr']
  69. self.returncode = this['returncode']
  70. def _init_as_script(self, args, **kwargs):
  71. self._is_script = True
  72. def communicate(self, input=None):
  73. if self._is_script:
  74. if input in self.script_registry:
  75. this = self.script_registry[input]
  76. else:
  77. LOG.warning('call to unregistered script: %s', input)
  78. this = {'stdout': '', 'stderr': '', 'returncode': 0}
  79. self.stdout = this['stdout']
  80. self.stderr = this['stderr']
  81. self.returncode = this['returncode']
  82. return self.stdout, self.stderr
  83. class PackstackTestCaseMixin(object):
  84. """
  85. Implementation of some assertion methods available by default
  86. in Python2.7+ only
  87. """
  88. def setUp(self):
  89. # Creating a temp directory that can be used by tests
  90. self.tempdir = tempfile.mkdtemp()
  91. # some plugins call popen, we're replacing it for tests
  92. self._Popen = subprocess.Popen
  93. self.fake_popen = subprocess.Popen = FakePopen
  94. def tearDown(self):
  95. # remove the temp directory
  96. shutil.rmtree(self.tempdir)
  97. subprocess.Popen = self._Popen
  98. def assertItemsEqual(self, list1, list2, msg=None):
  99. f, s = len(list1), len(list2)
  100. _msg = msg or ('Element counts were not equal. First has %s, '
  101. 'Second has %s' % (f, s))
  102. self.assertEqual(f, s, msg=_msg)
  103. _msg = msg or ('Given lists differ:\n%(list1)s'
  104. '\n%(list2)s' % locals())
  105. for i in list1:
  106. if i not in list2:
  107. raise AssertionError(_msg)
  108. def assertListEqual(self, list1, list2, msg=None):
  109. f, s = len(list1), len(list2)
  110. _msg = msg or ('Element counts were not equal. First has %s, '
  111. 'Second has %s' % (f, s))
  112. self.assertEqual(f, s, msg=_msg)
  113. _msg = msg or ('Given lists differ:\n%(list1)s'
  114. '\n%(list2)s' % locals())
  115. for index, item in enumerate(list1):
  116. if item != list2[index]:
  117. raise AssertionError(_msg)
  118. def assertIsInstance(self, obj, cls, msg=None):
  119. _msg = msg or ('%s is not an instance of %s' % (obj, cls))
  120. if not isinstance(obj, cls):
  121. raise AssertionError(_msg)
  122. def assertIn(self, first, second, msg=None):
  123. _msg = msg or ('%s is not a member of %s' % (first, second))
  124. if first not in second:
  125. raise AssertionError(_msg)
  126. def assertIsNone(self, expr, msg=None):
  127. _msg = msg or ('%s is not None' % expr)
  128. if expr is not None:
  129. raise AssertionError(_msg)