Coverage for manila/tests/test_utils.py: 99%

414 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2011 Justin Santa Barbara 

2# Copyright 2014 NetApp, Inc. 

3# Copyright 2014 Mirantis, Inc. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import datetime 

18import json 

19import time 

20from unittest import mock 

21 

22import ddt 

23from oslo_config import cfg 

24from oslo_utils import encodeutils 

25from oslo_utils import timeutils 

26import tenacity 

27from webob import exc 

28 

29import manila 

30from manila.common import constants 

31from manila import context 

32from manila.db import api as db 

33from manila import exception 

34from manila import test 

35from manila import utils 

36 

37CONF = cfg.CONF 

38 

39 

40@ddt.ddt 

41class GenericUtilsTestCase(test.TestCase): 

42 def test_service_is_up(self): 

43 fts_func = datetime.datetime.fromtimestamp 

44 fake_now = 1000 

45 down_time = 5 

46 self.flags(service_down_time=down_time) 

47 with mock.patch.object(timeutils, 'utcnow', 

48 mock.Mock(return_value=fts_func(fake_now))): 

49 

50 # Up (equal) 

51 service = {'updated_at': fts_func(fake_now - down_time), 

52 'created_at': fts_func(fake_now - down_time)} 

53 result = utils.service_is_up(service) 

54 self.assertTrue(result) 

55 timeutils.utcnow.assert_called_once_with() 

56 

57 with mock.patch.object(timeutils, 'utcnow', 

58 mock.Mock(return_value=fts_func(fake_now))): 

59 # Up 

60 service = {'updated_at': fts_func(fake_now - down_time + 1), 

61 'created_at': fts_func(fake_now - down_time + 1)} 

62 result = utils.service_is_up(service) 

63 self.assertTrue(result) 

64 timeutils.utcnow.assert_called_once_with() 

65 

66 with mock.patch.object(timeutils, 'utcnow', 

67 mock.Mock(return_value=fts_func(fake_now))): 

68 # Down 

69 service = {'updated_at': fts_func(fake_now - down_time - 1), 

70 'created_at': fts_func(fake_now - down_time - 1)} 

71 result = utils.service_is_up(service) 

72 self.assertFalse(result) 

73 timeutils.utcnow.assert_called_once_with() 

74 

75 @ddt.data(['ssh', '-D', 'my_name@name_of_remote_computer'], 

76 ['echo', '"quoted arg with space"'], 

77 ['echo', "'quoted arg with space'"]) 

78 def test_check_ssh_injection(self, cmd): 

79 cmd_list = cmd 

80 self.assertIsNone(utils.check_ssh_injection(cmd_list)) 

81 

82 @ddt.data(['ssh', 'my_name@ name_of_remote_computer'], 

83 ['||', 'my_name@name_of_remote_computer'], 

84 ['cmd', 'virus;ls'], 

85 ['cmd', '"arg\"withunescaped"'], 

86 ['cmd', 'virus;"quoted argument"'], 

87 ['echo', '"quoted argument";rm -rf'], 

88 ['echo', "'quoted argument `rm -rf`'"], 

89 ['echo', '"quoted";virus;"quoted"'], 

90 ['echo', '"quoted";virus;\'quoted\'']) 

91 def test_check_ssh_injection_on_error0(self, cmd): 

92 self.assertRaises(exception.SSHInjectionThreat, 

93 utils.check_ssh_injection, cmd) 

94 

95 @ddt.data( 

96 (("3G", "G"), 3.0), 

97 (("4.1G", "G"), 4.1), 

98 (("4,1G", "G"), 4.1), 

99 (("5.23G", "G"), 5.23), 

100 (("5,23G", "G"), 5.23), 

101 (("9728M", "G"), 9.5), 

102 (("8192K", "G"), 0.0078125), 

103 (("2T", "G"), 2048.0), 

104 (("2.1T", "G"), 2150.4), 

105 (("2,1T", "G"), 2150.4), 

106 (("3P", "G"), 3145728.0), 

107 (("3.4P", "G"), 3565158.4), 

108 (("3,4P", "G"), 3565158.4), 

109 (("9728M", "M"), 9728.0), 

110 (("9728.2381T", "T"), 9728.2381), 

111 (("9728,2381T", "T"), 9728.2381), 

112 (("0", "G"), 0.0), 

113 (("512", "M"), 0.00048828125), 

114 (("2097152.", "M"), 2.0), 

115 ((".1024", "K"), 0.0001), 

116 ((",1024", "K"), 0.0001), 

117 (("2048G", "T"), 2.0), 

118 (("65536G", "P"), 0.0625), 

119 ) 

120 @ddt.unpack 

121 def test_translate_string_size_to_float_positive(self, request, expected): 

122 actual = utils.translate_string_size_to_float(*request) 

123 self.assertEqual(expected, actual) 

124 

125 @ddt.data( 

126 (None, "G"), 

127 ("fake", "G"), 

128 ("1fake", "G"), 

129 ("2GG", "G"), 

130 ("1KM", "G"), 

131 ("K1M", "G"), 

132 ("M1K", "G"), 

133 ("1.2fake", "G"), 

134 ("1,2fake", "G"), 

135 ("2.2GG", "G"), 

136 ("1.1KM", "G"), 

137 ("K2.2M", "G"), 

138 ("K2,2M", "G"), 

139 ("M2.2K", "G"), 

140 ("M2,2K", "G"), 

141 ("", "G"), 

142 (23, "G"), 

143 (23.0, "G"), 

144 ) 

145 @ddt.unpack 

146 def test_translate_string_size_to_float_negative(self, string, multiplier): 

147 actual = utils.translate_string_size_to_float(string, multiplier) 

148 self.assertIsNone(actual) 

149 

150 

151class MonkeyPatchTestCase(test.TestCase): 

152 """Unit test for utils.monkey_patch().""" 

153 def setUp(self): 

154 super(MonkeyPatchTestCase, self).setUp() 

155 self.example_package = 'manila.tests.monkey_patch_example.' 

156 self.flags( 

157 monkey_patch=True, 

158 monkey_patch_modules=[self.example_package + 'example_a' + ':' 

159 + self.example_package 

160 + 'example_decorator']) 

161 

162 def test_monkey_patch(self): 

163 utils.monkey_patch() 

164 manila.tests.monkey_patch_example.CALLED_FUNCTION = [] 

165 from manila.tests.monkey_patch_example import example_a 

166 from manila.tests.monkey_patch_example import example_b 

167 

168 self.assertEqual('Example function', example_a.example_function_a()) 

169 exampleA = example_a.ExampleClassA() 

170 exampleA.example_method() 

171 ret_a = exampleA.example_method_add(3, 5) 

172 self.assertEqual(8, ret_a) 

173 

174 self.assertEqual('Example function', example_b.example_function_b()) 

175 exampleB = example_b.ExampleClassB() 

176 exampleB.example_method() 

177 ret_b = exampleB.example_method_add(3, 5) 

178 

179 self.assertEqual(8, ret_b) 

180 package_a = self.example_package + 'example_a.' 

181 self.assertIn(package_a + 'example_function_a', 

182 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

183 

184 self.assertIn(package_a + 'ExampleClassA.example_method', 

185 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

186 self.assertIn(package_a + 'ExampleClassA.example_method_add', 

187 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

188 package_b = self.example_package + 'example_b.' 

189 self.assertNotIn(package_b + 'example_function_b', 

190 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

191 self.assertNotIn(package_b + 'ExampleClassB.example_method', 

192 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

193 self.assertNotIn(package_b + 'ExampleClassB.example_method_add', 

194 manila.tests.monkey_patch_example.CALLED_FUNCTION) 

195 

196 

197@ddt.ddt 

198class CidrToNetmaskTestCase(test.TestCase): 

199 """Unit test for cidr to netmask.""" 

200 

201 @ddt.data( 

202 ('10.0.0.0/0', '0.0.0.0'), 

203 ('10.0.0.0/24', '255.255.255.0'), 

204 ('10.0.0.0/5', '248.0.0.0'), 

205 ('10.0.0.0/32', '255.255.255.255'), 

206 ('10.0.0.1', '255.255.255.255'), 

207 ) 

208 @ddt.unpack 

209 def test_cidr_to_netmask(self, cidr, expected_netmask): 

210 result = utils.cidr_to_netmask(cidr) 

211 self.assertEqual(expected_netmask, result) 

212 

213 @ddt.data( 

214 '10.0.0.0/33', 

215 '', 

216 '10.0.0.555/33' 

217 ) 

218 def test_cidr_to_netmask_invalid(self, cidr): 

219 self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr) 

220 

221 

222@ddt.ddt 

223class CidrToPrefixLenTestCase(test.TestCase): 

224 """Unit test for cidr to prefix length.""" 

225 

226 @ddt.data( 

227 ('10.0.0.0/0', 0), 

228 ('10.0.0.0/24', 24), 

229 ('10.0.0.1', 32), 

230 ('fdf8:f53b:82e1::1/0', 0), 

231 ('fdf8:f53b:82e1::1/64', 64), 

232 ('fdf8:f53b:82e1::1', 128), 

233 ) 

234 @ddt.unpack 

235 def test_cidr_to_prefixlen(self, cidr, expected_prefixlen): 

236 result = utils.cidr_to_prefixlen(cidr) 

237 self.assertEqual(expected_prefixlen, result) 

238 

239 @ddt.data( 

240 '10.0.0.0/33', 

241 '', 

242 '10.0.0.555/33', 

243 'fdf8:f53b:82e1::1/129', 

244 'fdf8:f53b:82e1::fffff' 

245 ) 

246 def test_cidr_to_prefixlen_invalid(self, cidr): 

247 self.assertRaises(exception.InvalidInput, 

248 utils.cidr_to_prefixlen, cidr) 

249 

250 

251@ddt.ddt 

252class ParseBoolValueTestCase(test.TestCase): 

253 

254 @ddt.data( 

255 ('t', True), 

256 ('on', True), 

257 ('1', True), 

258 ('false', False), 

259 ('n', False), 

260 ('no', False), 

261 ('0', False),) 

262 @ddt.unpack 

263 def test_bool_with_valid_string(self, string, value): 

264 fake_dict = {'fake_key': string} 

265 result = utils.get_bool_from_api_params('fake_key', fake_dict) 

266 self.assertEqual(value, result) 

267 

268 @ddt.data('None', 'invalid', 'falses') 

269 def test_bool_with_invalid_string(self, string): 

270 fake_dict = {'fake_key': string} 

271 self.assertRaises(exc.HTTPBadRequest, 

272 utils.get_bool_from_api_params, 

273 'fake_key', fake_dict) 

274 

275 @ddt.data('undefined', None) 

276 def test_bool_with_key_not_found_raise_error(self, def_val): 

277 fake_dict = {'fake_key1': 'value1'} 

278 self.assertRaises(exc.HTTPBadRequest, 

279 utils.get_bool_from_api_params, 

280 'fake_key2', 

281 fake_dict, 

282 def_val) 

283 

284 @ddt.data((False, False, False), 

285 (True, True, False), 

286 ('true', True, False), 

287 ('false', False, False), 

288 ('undefined', 'undefined', False), 

289 (False, False, True), 

290 ('true', True, True)) 

291 @ddt.unpack 

292 def test_bool_with_key_not_found(self, def_val, expected, strict): 

293 fake_dict = {'fake_key1': 'value1'} 

294 invalid_default = utils.get_bool_from_api_params('fake_key2', 

295 fake_dict, 

296 def_val, 

297 strict) 

298 self.assertEqual(expected, invalid_default) 

299 

300 

301@ddt.ddt 

302class IsValidIPVersion(test.TestCase): 

303 """Test suite for function 'is_valid_ip_address'.""" 

304 

305 @ddt.data('0.0.0.0', '255.255.255.255', '192.168.0.1') 

306 def test_valid_v4(self, addr): 

307 for vers in (4, '4'): 

308 self.assertTrue(utils.is_valid_ip_address(addr, vers)) 

309 

310 @ddt.data( 

311 '2001:cdba:0000:0000:0000:0000:3257:9652', 

312 '2001:cdba:0:0:0:0:3257:9652', 

313 '2001:cdba::3257:9652') 

314 def test_valid_v6(self, addr): 

315 for vers in (6, '6'): 

316 self.assertTrue(utils.is_valid_ip_address(addr, vers)) 

317 

318 @ddt.data( 

319 {'addr': '1.1.1.1', 'vers': 3}, 

320 {'addr': '1.1.1.1', 'vers': 5}, 

321 {'addr': '1.1.1.1', 'vers': 7}, 

322 {'addr': '2001:cdba::3257:9652', 'vers': '3'}, 

323 {'addr': '2001:cdba::3257:9652', 'vers': '5'}, 

324 {'addr': '2001:cdba::3257:9652', 'vers': '7'}) 

325 @ddt.unpack 

326 def test_provided_invalid_version(self, addr, vers): 

327 self.assertRaises( 

328 exception.ManilaException, utils.is_valid_ip_address, addr, vers) 

329 

330 def test_provided_none_version(self): 

331 self.assertRaises(TypeError, utils.is_valid_ip_address, '', None) 

332 

333 @ddt.data(None, 'fake', '1.1.1.1') 

334 def test_provided_invalid_v6_address(self, addr): 

335 for vers in (6, '6'): 

336 self.assertFalse(utils.is_valid_ip_address(addr, vers)) 

337 

338 @ddt.data(None, 'fake', '255.255.255.256', '2001:cdba::3257:9652', '') 

339 def test_provided_invalid_v4_address(self, addr): 

340 for vers in (4, '4'): 

341 self.assertFalse(utils.is_valid_ip_address(addr, vers)) 

342 

343 

344class Comparable(utils.ComparableMixin): 

345 def __init__(self, value): 

346 self.value = value 

347 

348 def _cmpkey(self): 

349 return self.value 

350 

351 

352class TestComparableMixin(test.TestCase): 

353 

354 def setUp(self): 

355 super(TestComparableMixin, self).setUp() 

356 self.one = Comparable(1) 

357 self.two = Comparable(2) 

358 

359 def test_lt(self): 

360 self.assertTrue(self.one < self.two) 

361 self.assertFalse(self.two < self.one) 

362 self.assertFalse(self.one < self.one) 

363 

364 def test_le(self): 

365 self.assertTrue(self.one <= self.two) 

366 self.assertFalse(self.two <= self.one) 

367 self.assertTrue(self.one <= self.one) 

368 

369 def test_eq(self): 

370 self.assertFalse(self.one == self.two) 

371 self.assertFalse(self.two == self.one) 

372 self.assertTrue(self.one == self.one) 

373 

374 def test_ge(self): 

375 self.assertFalse(self.one >= self.two) 

376 self.assertTrue(self.two >= self.one) 

377 self.assertTrue(self.one >= self.one) 

378 

379 def test_gt(self): 

380 self.assertFalse(self.one > self.two) 

381 self.assertTrue(self.two > self.one) 

382 self.assertFalse(self.one > self.one) 

383 

384 def test_ne(self): 

385 self.assertTrue(self.one != self.two) 

386 self.assertTrue(self.two != self.one) 

387 self.assertFalse(self.one != self.one) 

388 

389 def test_compare(self): 

390 self.assertEqual(NotImplemented, 

391 self.one._compare(1, self.one._cmpkey)) 

392 

393 

394class WrongException(Exception): 

395 pass 

396 

397 

398class TestRetryDecorator(test.TestCase): 

399 def test_no_retry_required(self): 

400 self.counter = 0 

401 

402 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

403 @utils.retry(retry_param=exception.ManilaException, 

404 interval=2, 

405 retries=3, 

406 backoff_rate=2) 

407 def succeeds(): 

408 self.counter += 1 

409 return 'success' 

410 

411 ret = succeeds() 

412 self.assertFalse(mock_sleep.called) 

413 self.assertEqual('success', ret) 

414 self.assertEqual(1, self.counter) 

415 

416 def test_no_retry_required_random(self): 

417 self.counter = 0 

418 

419 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

420 @utils.retry(retry_param=exception.ManilaException, 

421 interval=2, 

422 retries=3, 

423 backoff_rate=2, 

424 wait_random=True) 

425 def succeeds(): 

426 self.counter += 1 

427 return 'success' 

428 

429 ret = succeeds() 

430 self.assertFalse(mock_sleep.called) 

431 self.assertEqual('success', ret) 

432 self.assertEqual(1, self.counter) 

433 

434 def test_retries_once(self): 

435 self.counter = 0 

436 interval = 2 

437 backoff_rate = 2 

438 retries = 3 

439 

440 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

441 @utils.retry(retry_param=exception.ManilaException, 

442 interval=interval, 

443 retries=retries, 

444 backoff_rate=backoff_rate) 

445 def fails_once(): 

446 self.counter += 1 

447 if self.counter < 2: 

448 raise exception.ManilaException(data='fake') 

449 else: 

450 return 'success' 

451 

452 ret = fails_once() 

453 self.assertEqual('success', ret) 

454 self.assertEqual(2, self.counter) 

455 self.assertEqual(1, mock_sleep.call_count) 

456 mock_sleep.assert_called_with(interval) 

457 

458 def test_retries_once_random(self): 

459 self.counter = 0 

460 interval = 2 

461 backoff_rate = 2 

462 retries = 3 

463 

464 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

465 @utils.retry(retry_param=exception.ManilaException, 

466 interval=interval, 

467 retries=retries, 

468 backoff_rate=backoff_rate, 

469 wait_random=True) 

470 def fails_once(): 

471 self.counter += 1 

472 if self.counter < 2: 

473 raise exception.ManilaException(data='fake') 

474 else: 

475 return 'success' 

476 

477 ret = fails_once() 

478 self.assertEqual('success', ret) 

479 self.assertEqual(2, self.counter) 

480 self.assertEqual(1, mock_sleep.call_count) 

481 self.assertTrue(mock_sleep.called) 

482 

483 def test_limit_is_reached(self): 

484 self.counter = 0 

485 retries = 3 

486 interval = 2 

487 backoff_rate = 4 

488 

489 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

490 @utils.retry(retry_param=exception.ManilaException, 

491 interval=interval, 

492 retries=retries, 

493 backoff_rate=backoff_rate) 

494 def always_fails(): 

495 self.counter += 1 

496 raise exception.ManilaException(data='fake') 

497 

498 self.assertRaises(exception.ManilaException, 

499 always_fails) 

500 self.assertEqual(retries, self.counter) 

501 

502 expected_sleep_arg = [] 

503 for i in range(retries): 

504 if i > 0: 

505 interval *= (backoff_rate ** (i - 1)) 

506 expected_sleep_arg.append(float(interval)) 

507 

508 mock_sleep.assert_has_calls( 

509 list(map(mock.call, expected_sleep_arg))) 

510 

511 def test_wrong_exception_no_retry(self): 

512 

513 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

514 @utils.retry(retry_param=exception.ManilaException) 

515 def raise_unexpected_error(): 

516 raise WrongException("wrong exception") 

517 

518 self.assertRaises(WrongException, raise_unexpected_error) 

519 self.assertFalse(mock_sleep.called) 

520 

521 @mock.patch('tenacity.nap.sleep') 

522 def test_retry_exit_code(self, sleep_mock): 

523 

524 exit_code = 5 

525 exception = utils.processutils.ProcessExecutionError 

526 

527 @utils.retry(retry=utils.retry_if_exit_code, retry_param=exit_code) 

528 def raise_retriable_exit_code(): 

529 raise exception(exit_code=exit_code) 

530 

531 self.assertRaises(exception, raise_retriable_exit_code) 

532 # we should be sleeping 1 less time than the number of retries, 

533 # default (10) 

534 self.assertEqual(9, sleep_mock.call_count) 

535 sleep_mock.assert_has_calls([mock.call(1.0), 

536 mock.call(2.0), 

537 mock.call(4.0), 

538 mock.call(8.0), 

539 mock.call(16.0), 

540 mock.call(32.0), 

541 mock.call(64.0), 

542 mock.call(128.0), 

543 mock.call(256.0)]) 

544 

545 @mock.patch('tenacity.nap.sleep') 

546 def test_retry_exit_code_non_retriable(self, sleep_mock): 

547 

548 exit_code = 5 

549 exception = utils.processutils.ProcessExecutionError 

550 

551 @utils.retry(retry=utils.retry_if_exit_code, retry_param=exit_code) 

552 def raise_non_retriable_exit_code(): 

553 raise exception(exit_code=exit_code + 1) 

554 

555 self.assertRaises(exception, raise_non_retriable_exit_code) 

556 sleep_mock.assert_not_called() 

557 

558 def test_infinite_retry(self): 

559 retry_param = exception.ManilaException 

560 

561 class FakeTenacityRetry(tenacity.Retrying): 

562 def __init__(self, *args, **kwargs): 

563 pass 

564 

565 with mock.patch('tenacity.Retrying', 

566 autospec=FakeTenacityRetry) as tenacity_retry: 

567 

568 @utils.retry(retry_param=retry_param, 

569 wait_random=True, 

570 infinite=True) 

571 def some_retriable_function(): 

572 pass 

573 

574 some_retriable_function() 

575 

576 tenacity_retry.assert_called_once_with( 

577 sleep=tenacity.nap.sleep, 

578 before_sleep=mock.ANY, 

579 after=mock.ANY, 

580 stop=tenacity.stop.stop_never, 

581 reraise=True, 

582 retry=utils.IsAMatcher(tenacity.retry_if_exception_type), 

583 wait=utils.IsAMatcher(tenacity.wait_random_exponential)) 

584 

585 def test_max_backoff_sleep(self): 

586 self.counter = 0 

587 

588 with mock.patch('tenacity.nap.sleep') as mock_sleep: 

589 @utils.retry(retry_param=exception.ManilaException, 

590 infinite=True, 

591 backoff_rate=2, 

592 backoff_sleep_max=4) 

593 def fails_then_passes(): 

594 self.counter += 1 

595 if self.counter < 5: 

596 raise exception.ManilaException(data='fake') 

597 else: 

598 return 'success' 

599 

600 self.assertEqual('success', fails_then_passes()) 

601 mock_sleep.assert_has_calls( 

602 [mock.call(1), mock.call(2), mock.call(4), mock.call(4)]) 

603 

604 

605@ddt.ddt 

606class RequireDriverInitializedTestCase(test.TestCase): 

607 

608 @ddt.data(True, False) 

609 def test_require_driver_initialized(self, initialized): 

610 

611 class FakeDriver(object): 

612 @property 

613 def initialized(self): 

614 return initialized 

615 

616 class FakeException(Exception): 

617 pass 

618 

619 class FakeManager(object): 

620 driver = FakeDriver() 

621 

622 @utils.require_driver_initialized 

623 def call_me(self): 

624 raise FakeException( 

625 "Should be raised only if manager.driver.initialized " 

626 "('%s') is equal to 'True'." % initialized) 

627 

628 if initialized: 

629 expected_exception = FakeException 

630 else: 

631 expected_exception = exception.DriverNotInitialized 

632 

633 self.assertRaises(expected_exception, FakeManager().call_me) 

634 

635 

636@ddt.ddt 

637class ShareMigrationHelperTestCase(test.TestCase): 

638 """Tests DataMigrationHelper.""" 

639 

640 def setUp(self): 

641 super(ShareMigrationHelperTestCase, self).setUp() 

642 self.context = context.get_admin_context() 

643 

644 def test_wait_for_access_update(self): 

645 sid = 1 

646 fake_share_instances = [ 

647 { 

648 'id': sid, 

649 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING, 

650 }, 

651 { 

652 'id': sid, 

653 'access_rules_status': constants.STATUS_ACTIVE, 

654 }, 

655 ] 

656 

657 self.mock_object(time, 'sleep') 

658 self.mock_object(db, 'share_instance_get', 

659 mock.Mock(side_effect=fake_share_instances)) 

660 

661 utils.wait_for_access_update(self.context, db, 

662 fake_share_instances[0], 1) 

663 

664 db.share_instance_get.assert_has_calls( 

665 [mock.call(mock.ANY, sid), mock.call(mock.ANY, sid)] 

666 ) 

667 time.sleep.assert_called_once_with(1.414) 

668 

669 @ddt.data( 

670 ( 

671 { 

672 'id': '1', 

673 'access_rules_status': constants.SHARE_INSTANCE_RULES_ERROR, 

674 }, 

675 exception.ShareMigrationFailed 

676 ), 

677 ( 

678 { 

679 'id': '1', 

680 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING, 

681 }, 

682 exception.ShareMigrationFailed 

683 ), 

684 ) 

685 @ddt.unpack 

686 def test_wait_for_access_update_invalid(self, fake_instance, expected_exc): 

687 self.mock_object(time, 'sleep') 

688 self.mock_object(db, 'share_instance_get', 

689 mock.Mock(return_value=fake_instance)) 

690 

691 now = time.time() 

692 timeout = now + 100 

693 

694 self.mock_object(time, 'time', 

695 mock.Mock(side_effect=[now, timeout])) 

696 

697 self.assertRaises(expected_exc, 

698 utils.wait_for_access_update, self.context, 

699 db, fake_instance, 1) 

700 

701 

702@ddt.ddt 

703class ConvertStrTestCase(test.TestCase): 

704 

705 def test_convert_str_str_input(self): 

706 self.mock_object(encodeutils, 'safe_encode') 

707 input_value = "string_input" 

708 

709 output_value = utils.convert_str(input_value) 

710 

711 self.assertEqual(0, encodeutils.safe_encode.call_count) 

712 self.assertEqual(input_value, output_value) 

713 

714 def test_convert_str_bytes_input(self): 

715 self.mock_object(encodeutils, 'safe_encode') 

716 input_value = bytes("binary_input", "utf-8") 

717 

718 output_value = utils.convert_str(input_value) 

719 

720 self.assertEqual(0, encodeutils.safe_encode.call_count) 

721 self.assertIsInstance(output_value, str) 

722 self.assertEqual(str("binary_input"), output_value) 

723 

724 

725@ddt.ddt 

726class TestDisableNotifications(test.TestCase): 

727 def test_do_nothing_getter(self): 

728 """Test any attribute will always return the same instance (self).""" 

729 donothing = utils.DoNothing() 

730 self.assertIs(donothing, donothing.anyname) 

731 

732 def test_do_nothing_caller(self): 

733 """Test calling the object will always return the same instance.""" 

734 donothing = utils.DoNothing() 

735 self.assertIs(donothing, donothing()) 

736 

737 def test_do_nothing_json_serializable(self): 

738 """Test calling the object will always return the same instance.""" 

739 donothing = utils.DoNothing() 

740 self.assertEqual('""', json.dumps(donothing)) 

741 

742 @utils.if_notifications_enabled 

743 def _decorated_method(self): 

744 return mock.sentinel.success 

745 

746 def test_if_notification_enabled_when_enabled(self): 

747 """Test method is called when notifications are enabled.""" 

748 result = self._decorated_method() 

749 self.assertEqual(mock.sentinel.success, result) 

750 

751 @ddt.data([], ['noop'], ['noop', 'noop']) 

752 def test_if_notification_enabled_when_disabled(self, driver): 

753 """Test method is not called when notifications are disabled.""" 

754 self.override_config('driver', driver, 

755 group='oslo_messaging_notifications') 

756 result = self._decorated_method() 

757 self.assertEqual(utils.DO_NOTHING, result) 

758 

759 

760@ddt.ddt 

761class TestAllTenantsValueCase(test.TestCase): 

762 @ddt.data(None, '', '1', 'true', 'True') 

763 def test_is_all_tenants_true(self, value): 

764 search_opts = {'all_tenants': value} 

765 self.assertTrue(utils.is_all_tenants(search_opts)) 

766 self.assertIn('all_tenants', search_opts) 

767 

768 @ddt.data('0', 'false', 'False') 

769 def test_is_all_tenants_false(self, value): 

770 search_opts = {'all_tenants': value} 

771 self.assertFalse(utils.is_all_tenants(search_opts)) 

772 self.assertIn('all_tenants', search_opts) 

773 

774 def test_is_all_tenants_missing(self): 

775 self.assertFalse(utils.is_all_tenants({})) 

776 

777 def test_is_all_tenants_invalid(self): 

778 search_opts = {'all_tenants': 'wonk'} 

779 self.assertRaises(exception.InvalidInput, utils.is_all_tenants, 

780 search_opts) 

781 

782 @ddt.data( 

783 ("8minutes", "PT8M"), 

784 ("10hours", "PT10H"), 

785 ("6months", "P6M"), 

786 ("2years", "P2Y") 

787 ) 

788 @ddt.unpack 

789 def test_convert_time_duration_to_iso_format(self, 

790 time_duration, expected): 

791 result = utils.convert_time_duration_to_iso_format(time_duration) 

792 self.assertEqual(expected, result) 

793 

794 def test_convert_time_duration_to_iso_format_negative(self): 

795 self.assertRaises(exception.ManilaException, 

796 utils.convert_time_duration_to_iso_format, 

797 'invalid_duration')