Coverage for manila/tests/share/drivers/test_helpers.py: 100%

321 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2015 Mirantis Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import os 

17from unittest import mock 

18 

19import ddt 

20from oslo_config import cfg 

21 

22from manila.common import constants as const 

23from manila import exception 

24import manila.share.configuration 

25from manila.share.drivers import helpers 

26from manila import test 

27from manila.tests import fake_compute 

28from manila.tests import fake_utils 

29from manila.tests.share.drivers import test_generic 

30 

31 

32CONF = cfg.CONF 

33 

34 

35@ddt.ddt 

36class NFSHelperTestCase(test.TestCase): 

37 """Test case for NFS helper.""" 

38 

39 def setUp(self): 

40 super(NFSHelperTestCase, self).setUp() 

41 fake_utils.stub_out_utils_execute(self) 

42 self.fake_conf = manila.share.configuration.Configuration(None) 

43 self._ssh_exec = mock.Mock(return_value=('', '')) 

44 self._execute = mock.Mock(return_value=('', '')) 

45 self._helper = helpers.NFSHelper(self._execute, self._ssh_exec, 

46 self.fake_conf) 

47 ip = '10.254.0.3' 

48 self.server = fake_compute.FakeServer( 

49 ip=ip, public_address=ip, instance_id='fake_instance_id') 

50 self.share_name = 'fake_share_name' 

51 

52 def test_init_helper(self): 

53 

54 # mocks 

55 self.mock_object( 

56 self._helper, '_ssh_exec', 

57 mock.Mock(side_effect=exception.ProcessExecutionError( 

58 stderr='command not found'))) 

59 

60 # run 

61 self.assertRaises(exception.ManilaException, 

62 self._helper.init_helper, self.server) 

63 

64 # asserts 

65 self._helper._ssh_exec.assert_called_once_with( 

66 self.server, ['sudo', 'exportfs']) 

67 

68 def test_init_helper_log(self): 

69 

70 # mocks 

71 self.mock_object( 

72 self._helper, '_ssh_exec', 

73 mock.Mock(side_effect=exception.ProcessExecutionError( 

74 stderr='fake'))) 

75 

76 # run 

77 self._helper.init_helper(self.server) 

78 

79 # asserts 

80 self._helper._ssh_exec.assert_called_once_with( 

81 self.server, ['sudo', 'exportfs']) 

82 

83 @ddt.data( 

84 {"server": {"public_address": "1.2.3.4"}, "version": 4}, 

85 {"server": {"public_address": "1001::1002"}, "version": 6}, 

86 {"server": {"public_address": "1.2.3.4", "admin_ip": "5.6.7.8"}, 

87 "version": 4}, 

88 {"server": {"public_address": "1.2.3.4", "ip": "9.10.11.12"}, 

89 "version": 4}, 

90 {"server": {"public_address": "1001::1001", "ip": "1001::1002"}, 

91 "version": 6}, 

92 {"server": {"public_address": "1001::1002", "admin_ip": "1001::1002"}, 

93 "version": 6}, 

94 {"server": {"public_addresses": ["1001::1002"]}, "version": 6}, 

95 {"server": {"public_addresses": ["1.2.3.4", "1001::1002"]}, 

96 "version": {"1.2.3.4": 4, "1001::1002": 6}}, 

97 ) 

98 @ddt.unpack 

99 def test_create_exports(self, server, version): 

100 result = self._helper.create_exports(server, self.share_name) 

101 

102 expected_export_locations = [] 

103 path = os.path.join(CONF.share_mount_path, self.share_name) 

104 service_address = server.get("admin_ip", server.get("ip")) 

105 version_copy = version 

106 

107 def convert_address(address, version): 

108 if version == 4: 

109 return address 

110 return "[%s]" % address 

111 

112 if 'public_addresses' in server: 

113 pairs = list(map(lambda addr: (addr, False), 

114 server['public_addresses'])) 

115 else: 

116 pairs = [(server['public_address'], False)] 

117 

118 service_address = server.get("admin_ip", server.get("ip")) 

119 if service_address: 

120 pairs.append((service_address, True)) 

121 

122 for ip, is_admin in pairs: 

123 if isinstance(version_copy, dict): 

124 version = version_copy.get(ip) 

125 

126 expected_export_locations.append({ 

127 "path": "%s:%s" % (convert_address(ip, version), path), 

128 "is_admin_only": is_admin, 

129 "metadata": { 

130 "export_location_metadata_example": "example", 

131 }, 

132 }) 

133 self.assertEqual(expected_export_locations, result) 

134 

135 @ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO) 

136 def test_update_access(self, access_level): 

137 expected_mount_options = '%s,no_subtree_check,no_root_squash' 

138 self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files') 

139 local_path = os.path.join(CONF.share_mount_path, self.share_name) 

140 exec_result = ' '.join([local_path, '2.2.2.3']) 

141 self.mock_object(self._helper, '_ssh_exec', 

142 mock.Mock(return_value=(exec_result, ''))) 

143 access_rules = [ 

144 test_generic.get_fake_access_rule('1.1.1.1', access_level), 

145 test_generic.get_fake_access_rule('2.2.2.2', access_level), 

146 test_generic.get_fake_access_rule('2.2.2.3', access_level)] 

147 add_rules = [ 

148 test_generic.get_fake_access_rule('2.2.2.2', access_level), 

149 test_generic.get_fake_access_rule('2.2.2.3', access_level), 

150 test_generic.get_fake_access_rule('5.5.5.0/24', access_level)] 

151 delete_rules = [ 

152 test_generic.get_fake_access_rule('3.3.3.3', access_level), 

153 test_generic.get_fake_access_rule('4.4.4.4', access_level, 'user'), 

154 test_generic.get_fake_access_rule('0.0.0.0/0', access_level)] 

155 self._helper.update_access(self.server, self.share_name, access_rules, 

156 add_rules=add_rules, 

157 delete_rules=delete_rules) 

158 local_path = os.path.join(CONF.share_mount_path, self.share_name) 

159 self._helper._ssh_exec.assert_has_calls([ 

160 mock.call(self.server, ['sudo', 'exportfs']), 

161 mock.call(self.server, ['sudo', 'exportfs', '-u', 

162 ':'.join(['3.3.3.3', local_path])]), 

163 mock.call(self.server, ['sudo', 'exportfs', '-u', 

164 ':'.join(['*', 

165 local_path])]), 

166 mock.call(self.server, ['sudo', 'exportfs', '-o', 

167 expected_mount_options % access_level, 

168 ':'.join(['2.2.2.2', local_path])]), 

169 mock.call(self.server, ['sudo', 'exportfs', '-o', 

170 expected_mount_options % access_level, 

171 ':'.join(['5.5.5.0/24', 

172 local_path])]), 

173 ]) 

174 self._helper._sync_nfs_temp_and_perm_files.assert_has_calls([ 

175 mock.call(self.server), mock.call(self.server)]) 

176 

177 @ddt.data({'access': '10.0.0.1', 'result': '10.0.0.1'}, 

178 {'access': '10.0.0.1/32', 'result': '10.0.0.1'}, 

179 {'access': '10.0.0.0/24', 'result': '10.0.0.0/24'}, 

180 {'access': '1001::1001', 'result': '[1001::1001]'}, 

181 {'access': '1001::1000/128', 'result': '[1001::1000]'}, 

182 {'access': '1001::1000/124', 'result': '[1001::1000]/124'}) 

183 @ddt.unpack 

184 def test__get_parsed_address_or_cidr(self, access, result): 

185 self.assertEqual(result, 

186 self._helper._get_parsed_address_or_cidr(access)) 

187 

188 @ddt.data('10.0.0.265', '10.0.0.1/33', '1001::10069', '1001::1000/129') 

189 def test__get_parsed_address_or_cidr_with_invalid_access(self, access): 

190 self.assertRaises(ValueError, 

191 self._helper._get_parsed_address_or_cidr, 

192 access) 

193 

194 def test_update_access_invalid_type(self): 

195 access_rules = [test_generic.get_fake_access_rule( 

196 '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ] 

197 self.assertRaises( 

198 exception.InvalidShareAccess, 

199 self._helper.update_access, 

200 self.server, 

201 self.share_name, 

202 access_rules, 

203 [], 

204 []) 

205 

206 def test_update_access_invalid_level(self): 

207 access_rules = [test_generic.get_fake_access_rule( 

208 '2.2.2.2', 'fake_level', access_type='ip'), ] 

209 self.assertRaises( 

210 exception.InvalidShareAccessLevel, 

211 self._helper.update_access, 

212 self.server, 

213 self.share_name, 

214 access_rules, 

215 [], 

216 []) 

217 

218 @ddt.data({'access_to': 'lala', 'access_type': 'user'}, 

219 {'access_to': '203.0.113.29'}, 

220 {'access_to': '2001:0DB8:7d18:c63e:5f0a:871f:83b8:d244', 

221 'access_level': 'ro'}) 

222 @ddt.unpack 

223 def test_update_access_delete_invalid_rule( 

224 self, access_to, access_level='rw', access_type='ip'): 

225 mount_path = '%s:/shares/%s' % (access_to, self.share_name) 

226 if access_type == 'ip': 

227 self._helper._get_parsed_address_or_cidr = mock.Mock( 

228 return_value=access_to) 

229 not_found_msg = ( 

230 "exportfs: Could not find '%s' to unexport.\n" % mount_path 

231 ) 

232 exc = exception.ProcessExecutionError 

233 self.mock_object( 

234 self._helper, 

235 '_ssh_exec', 

236 mock.Mock(side_effect=[(0, 0), exc(stderr=not_found_msg)])) 

237 

238 delete_rules = [ 

239 test_generic.get_fake_access_rule(access_to, 

240 access_level, 

241 access_type), 

242 ] 

243 self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files') 

244 

245 self._helper.update_access(self.server, self.share_name, [], 

246 [], delete_rules) 

247 

248 if access_type == 'ip': 

249 self._helper._ssh_exec.assert_has_calls([ 

250 mock.call(self.server, ['sudo', 'exportfs']), 

251 mock.call(self.server, 

252 ['sudo', 'exportfs', '-u', mount_path])]) 

253 self._helper._sync_nfs_temp_and_perm_files.assert_called_with( 

254 self.server) 

255 

256 def test_get_host_list(self): 

257 fake_exportfs = ('/shares/share-1\n\t\t20.0.0.3\n' 

258 '/shares/share-1\n\t\t20.0.0.6\n' 

259 '/shares/share-1\n\t\t<world>\n' 

260 '/shares/share-2\n\t\t10.0.0.2\n' 

261 '/shares/share-2\n\t\t10.0.0.5\n' 

262 '/shares/share-3\n\t\t30.0.0.4\n' 

263 '/shares/share-3\n\t\t30.0.0.7\n') 

264 expected = ['20.0.0.3', '20.0.0.6', '*'] 

265 result = self._helper.get_host_list(fake_exportfs, '/shares/share-1') 

266 self.assertEqual(expected, result) 

267 

268 @ddt.data({"level": const.ACCESS_LEVEL_RW, "ip": "1.1.1.1", 

269 "expected": "1.1.1.1"}, 

270 {"level": const.ACCESS_LEVEL_RO, "ip": "1.1.1.1", 

271 "expected": "1.1.1.1"}, 

272 {"level": const.ACCESS_LEVEL_RW, "ip": "fd12:abcd::10", 

273 "expected": "[fd12:abcd::10]"}, 

274 {"level": const.ACCESS_LEVEL_RO, "ip": "fd12:abcd::10", 

275 "expected": "[fd12:abcd::10]"}) 

276 @ddt.unpack 

277 def test_update_access_recovery_mode(self, level, ip, expected): 

278 expected_mount_options = '%s,no_subtree_check,no_root_squash' 

279 access_rules = [test_generic.get_fake_access_rule( 

280 ip, level), ] 

281 self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files') 

282 self.mock_object(self._helper, 'get_host_list', 

283 mock.Mock(return_value=[ip])) 

284 self._helper.update_access(self.server, self.share_name, access_rules, 

285 [], []) 

286 local_path = os.path.join(CONF.share_mount_path, self.share_name) 

287 self._ssh_exec.assert_has_calls([ 

288 mock.call(self.server, ['sudo', 'exportfs']), 

289 mock.call( 

290 self.server, ['sudo', 'exportfs', '-u', 

291 ':'.join([expected, 

292 local_path])]), 

293 mock.call(self.server, ['sudo', 'exportfs', '-o', 

294 expected_mount_options % level, 

295 ':'.join([expected, local_path])]), 

296 ]) 

297 self._helper._sync_nfs_temp_and_perm_files.assert_called_with( 

298 self.server) 

299 

300 def test_sync_nfs_temp_and_perm_files(self): 

301 self._helper._sync_nfs_temp_and_perm_files(self.server) 

302 self._helper._ssh_exec.assert_has_calls( 

303 [mock.call(self.server, mock.ANY) for i in range(1)]) 

304 

305 @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz', 

306 '[1001::1001]:/foo/bar', '[1001::1000]/:124:/foo/bar') 

307 def test_get_exports_for_share_single_ip(self, export_location): 

308 server = dict(public_address='1.2.3.4') 

309 

310 result = self._helper.get_exports_for_share(server, export_location) 

311 

312 path = export_location.split(':')[-1] 

313 expected_export_locations = [ 

314 {"is_admin_only": False, 

315 "path": "%s:%s" % (server["public_address"], path), 

316 "metadata": {"export_location_metadata_example": "example"}} 

317 ] 

318 self.assertEqual(expected_export_locations, result) 

319 

320 @ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.9:/foo/quuz') 

321 def test_get_exports_for_share_multi_ip(self, export_location): 

322 server = dict(public_addresses=['1.2.3.4', '1.2.3.5']) 

323 

324 result = self._helper.get_exports_for_share(server, export_location) 

325 

326 path = export_location.split(':')[-1] 

327 expected_export_locations = list(map( 

328 lambda addr: { 

329 "is_admin_only": False, 

330 "path": "%s:%s" % (addr, path), 

331 "metadata": {"export_location_metadata_example": "example"} 

332 }, 

333 server['public_addresses']) 

334 ) 

335 self.assertEqual(expected_export_locations, result) 

336 

337 @ddt.data( 

338 {'public_address_with_suffix': 'foo'}, 

339 {'with_prefix_public_address': 'bar'}, 

340 {'with_prefix_public_address_and_with_suffix': 'quuz'}, {}) 

341 def test_get_exports_for_share_with_error(self, server): 

342 export_location = '1.2.3.4:/foo/bar' 

343 

344 self.assertRaises( 

345 exception.ManilaException, 

346 self._helper.get_exports_for_share, server, export_location) 

347 

348 @ddt.data('/foo/bar', '5.6.7.8:/foo/bar', '5.6.7.88:fake:/foo/bar', 

349 '[1001::1002]:/foo/bar', '[1001::1000]/124:/foo/bar') 

350 def test_get_share_path_by_export_location(self, export_location): 

351 result = self._helper.get_share_path_by_export_location( 

352 dict(), export_location) 

353 

354 self.assertEqual('/foo/bar', result) 

355 

356 @ddt.data( 

357 ('/shares/fake_share1\n\t\t1.1.1.10\n' 

358 '/shares/fake_share2\n\t\t1.1.1.16\n' 

359 '/shares/fake_share3\n\t\t<world>\n' 

360 '/mnt/fake_share1 1.1.1.11', False), 

361 ('/shares/fake_share_name\n\t\t1.1.1.10\n' 

362 '/shares/fake_share_name\n\t\t1.1.1.16\n' 

363 '/shares/fake_share_name\n\t\t<world>\n' 

364 '/mnt/fake_share1\n\t\t1.1.1.11', True), 

365 ('/mnt/fake_share_name\n\t\t1.1.1.11\n' 

366 '/shares/fake_share_name\n\t\t1.1.1.10\n' 

367 '/shares/fake_share_name\n\t\t1.1.1.16\n' 

368 '/shares/fake_share_name\n\t\t<world>\n', True)) 

369 @ddt.unpack 

370 def test_disable_access_for_maintenance(self, output, hosts_match): 

371 fake_maintenance_path = "fake.path" 

372 self._helper.configuration.share_mount_path = '/shares' 

373 local_path = os.path.join(self._helper.configuration.share_mount_path, 

374 self.share_name) 

375 

376 def fake_ssh_exec(*args, **kwargs): 

377 if 'exportfs' in args[1] and '-u' not in args[1]: 

378 return output, '' 

379 else: 

380 return '', '' 

381 

382 self.mock_object(self._helper, '_ssh_exec', 

383 mock.Mock(side_effect=fake_ssh_exec)) 

384 

385 self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files') 

386 self.mock_object(self._helper, '_get_maintenance_file_path', 

387 mock.Mock(return_value=fake_maintenance_path)) 

388 

389 self._helper.disable_access_for_maintenance( 

390 self.server, self.share_name) 

391 

392 self._helper._ssh_exec.assert_any_call( 

393 self.server, 

394 ['cat', const.NFS_EXPORTS_FILE, 

395 '|', 'grep', self.share_name, 

396 '|', 'sudo', 'tee', fake_maintenance_path] 

397 ) 

398 self._helper._ssh_exec.assert_has_calls([ 

399 mock.call(self.server, ['sudo', 'exportfs']), 

400 ]) 

401 

402 if hosts_match: 

403 self._helper._ssh_exec.assert_has_calls([ 

404 mock.call(self.server, 

405 ['sudo', 'exportfs', '-u', 

406 '"{}"'.format(':'.join(['1.1.1.10', local_path]))]), 

407 mock.call(self.server, 

408 ['sudo', 'exportfs', '-u', 

409 '"{}"'.format(':'.join(['1.1.1.16', local_path]))]), 

410 mock.call(self.server, 

411 ['sudo', 'exportfs', '-u', 

412 '"{}"'.format(':'.join(['*', local_path]))]), 

413 ]) 

414 

415 self._helper._sync_nfs_temp_and_perm_files.assert_called_once_with( 

416 self.server 

417 ) 

418 

419 def test_restore_access_after_maintenance(self): 

420 fake_maintenance_path = "fake.path" 

421 self.mock_object(self._helper, '_get_maintenance_file_path', 

422 mock.Mock(return_value=fake_maintenance_path)) 

423 self.mock_object(self._helper, '_ssh_exec') 

424 

425 self._helper.restore_access_after_maintenance( 

426 self.server, self.share_name) 

427 

428 self._helper._ssh_exec.assert_called_once_with( 

429 self.server, 

430 ['cat', fake_maintenance_path, 

431 '|', 'sudo', 'tee', '-a', const.NFS_EXPORTS_FILE, 

432 '&&', 'sudo', 'exportfs', '-r', '&&', 'sudo', 'rm', '-f', 

433 fake_maintenance_path] 

434 ) 

435 

436 

437@ddt.ddt 

438class CIFSHelperIPAccessTestCase(test.TestCase): 

439 """Test case for CIFS helper with IP access.""" 

440 

441 def setUp(self): 

442 super(CIFSHelperIPAccessTestCase, self).setUp() 

443 self.server_details = {'instance_id': 'fake', 

444 'public_address': '1.2.3.4', } 

445 self.share_name = 'fake_share_name' 

446 self.fake_conf = manila.share.configuration.Configuration(None) 

447 self._ssh_exec = mock.Mock(return_value=('', '')) 

448 self._execute = mock.Mock(return_value=('', '')) 

449 self._helper = helpers.CIFSHelperIPAccess(self._execute, 

450 self._ssh_exec, 

451 self.fake_conf) 

452 self.access = dict( 

453 access_level=const.ACCESS_LEVEL_RW, 

454 access_type='ip', 

455 access_to='1.1.1.1') 

456 

457 def test_init_helper(self): 

458 self._helper.init_helper(self.server_details) 

459 self._helper._ssh_exec.assert_called_once_with( 

460 self.server_details, 

461 ['sudo', 'net', 'conf', 'list'], 

462 ) 

463 

464 def test_create_export_share_does_not_exist(self): 

465 def fake_ssh_exec(*args, **kwargs): 

466 if 'showshare' in args[1]: 

467 raise exception.ProcessExecutionError() 

468 else: 

469 return '', '' 

470 

471 self.mock_object(self._helper, '_ssh_exec', 

472 mock.Mock(side_effect=fake_ssh_exec)) 

473 

474 ret = self._helper.create_exports(self.server_details, self.share_name) 

475 

476 expected_location = [{ 

477 "is_admin_only": False, 

478 "path": "\\\\%s\\%s" % ( 

479 self.server_details['public_address'], self.share_name), 

480 "metadata": {"export_location_metadata_example": "example"} 

481 }] 

482 self.assertEqual(expected_location, ret) 

483 share_path = os.path.join( 

484 self._helper.configuration.share_mount_path, 

485 self.share_name) 

486 self._helper._ssh_exec.assert_has_calls([ 

487 mock.call( 

488 self.server_details, 

489 ['sudo', 'net', 'conf', 'showshare', self.share_name, ] 

490 ), 

491 mock.call( 

492 self.server_details, 

493 [ 

494 'sudo', 'net', 'conf', 'addshare', self.share_name, 

495 share_path, 'writeable=y', 'guest_ok=y', 

496 ] 

497 ), 

498 mock.call(self.server_details, mock.ANY), 

499 ]) 

500 

501 def test_create_export_share_does_not_exist_exception(self): 

502 

503 self.mock_object(self._helper, '_ssh_exec', 

504 mock.Mock( 

505 side_effect=[exception.ProcessExecutionError(), 

506 Exception('')] 

507 )) 

508 

509 self.assertRaises( 

510 exception.ManilaException, self._helper.create_exports, 

511 self.server_details, self.share_name) 

512 

513 def test_create_exports_share_exist_recreate_true(self): 

514 ret = self._helper.create_exports( 

515 self.server_details, self.share_name, recreate=True) 

516 

517 expected_location = [{ 

518 "is_admin_only": False, 

519 "path": "\\\\%s\\%s" % ( 

520 self.server_details['public_address'], self.share_name), 

521 "metadata": {"export_location_metadata_example": "example"} 

522 }] 

523 self.assertEqual(expected_location, ret) 

524 share_path = os.path.join( 

525 self._helper.configuration.share_mount_path, 

526 self.share_name) 

527 self._helper._ssh_exec.assert_has_calls([ 

528 mock.call( 

529 self.server_details, 

530 ['sudo', 'net', 'conf', 'showshare', self.share_name, ] 

531 ), 

532 mock.call( 

533 self.server_details, 

534 ['sudo', 'net', 'conf', 'delshare', self.share_name, ] 

535 ), 

536 mock.call( 

537 self.server_details, 

538 [ 

539 'sudo', 'net', 'conf', 'addshare', self.share_name, 

540 share_path, 'writeable=y', 'guest_ok=y', 

541 ] 

542 ), 

543 mock.call(self.server_details, mock.ANY), 

544 ]) 

545 

546 def test_create_export_share_exist_recreate_false(self): 

547 self.assertRaises( 

548 exception.ShareBackendException, 

549 self._helper.create_exports, 

550 self.server_details, 

551 self.share_name, 

552 recreate=False, 

553 ) 

554 self._helper._ssh_exec.assert_has_calls([ 

555 mock.call( 

556 self.server_details, 

557 ['sudo', 'net', 'conf', 'showshare', self.share_name, ] 

558 ), 

559 ]) 

560 

561 def test_remove_exports(self): 

562 self._helper.remove_exports(self.server_details, self.share_name) 

563 

564 self._helper._ssh_exec.assert_called_once_with( 

565 self.server_details, 

566 ['sudo', 'net', 'conf', 'delshare', self.share_name], 

567 ) 

568 

569 def test_remove_export_forcibly(self): 

570 delshare_command = ['sudo', 'net', 'conf', 'delshare', self.share_name] 

571 

572 def fake_ssh_exec(*args, **kwargs): 

573 if delshare_command == args[1]: 

574 raise exception.ProcessExecutionError() 

575 else: 

576 return ('', '') 

577 

578 self.mock_object(self._helper, '_ssh_exec', 

579 mock.Mock(side_effect=fake_ssh_exec)) 

580 

581 self._helper.remove_exports(self.server_details, self.share_name) 

582 

583 self._helper._ssh_exec.assert_has_calls([ 

584 mock.call( 

585 self.server_details, 

586 ['sudo', 'net', 'conf', 'delshare', self.share_name], 

587 ), 

588 mock.call( 

589 self.server_details, 

590 ['sudo', 'smbcontrol', 'all', 'close-share', self.share_name], 

591 ), 

592 ]) 

593 

594 def test_update_access_wrong_access_level(self): 

595 access_rules = [test_generic.get_fake_access_rule( 

596 '2.2.2.2', const.ACCESS_LEVEL_RO), ] 

597 self.assertRaises( 

598 exception.InvalidShareAccessLevel, 

599 self._helper.update_access, 

600 self.server_details, 

601 self.share_name, 

602 access_rules, 

603 [], 

604 []) 

605 

606 def test_update_access_wrong_access_type(self): 

607 access_rules = [test_generic.get_fake_access_rule( 

608 '2.2.2.2', const.ACCESS_LEVEL_RW, access_type='fake'), ] 

609 self.assertRaises( 

610 exception.InvalidShareAccess, 

611 self._helper.update_access, 

612 self.server_details, 

613 self.share_name, 

614 access_rules, 

615 [], 

616 []) 

617 

618 def test_update_access(self): 

619 access_rules = [test_generic.get_fake_access_rule( 

620 '1.1.1.1', const.ACCESS_LEVEL_RW), ] 

621 

622 self._helper.update_access(self.server_details, self.share_name, 

623 access_rules, [], []) 

624 self._helper._ssh_exec.assert_called_once_with( 

625 self.server_details, ['sudo', 'net', 'conf', 'setparm', 

626 self.share_name, 'hosts allow', 

627 '1.1.1.1']) 

628 

629 def test_get_allow_hosts(self): 

630 self.mock_object(self._helper, '_ssh_exec', 

631 mock.Mock( 

632 return_value=('1.1.1.1 2.2.2.2 3.3.3.3', ''))) 

633 expected = ['1.1.1.1', '2.2.2.2', '3.3.3.3'] 

634 result = self._helper._get_allow_hosts( 

635 self.server_details, self.share_name) 

636 self.assertEqual(expected, result) 

637 cmd = ['sudo', 'net', 'conf', 'getparm', self.share_name, 

638 'hosts allow'] 

639 self._helper._ssh_exec.assert_called_once_with( 

640 self.server_details, cmd) 

641 

642 @ddt.data( 

643 '', '1.2.3.4:/nfs/like/export', '/1.2.3.4/foo', '\\1.2.3.4\\foo', 

644 '//1.2.3.4\\mixed_slashes_and_backslashes_one', 

645 '\\\\1.2.3.4/mixed_slashes_and_backslashes_two') 

646 def test__get_share_group_name_from_export_location(self, export_location): 

647 self.assertRaises( 

648 exception.InvalidShare, 

649 self._helper._get_share_group_name_from_export_location, 

650 export_location) 

651 

652 @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo') 

653 def test_get_exports_for_share(self, export_location): 

654 server = dict(public_address='1.2.3.4') 

655 self.mock_object( 

656 self._helper, '_get_share_group_name_from_export_location', 

657 mock.Mock(side_effect=( 

658 self._helper._get_share_group_name_from_export_location))) 

659 

660 result = self._helper.get_exports_for_share(server, export_location) 

661 

662 expected_export_location = [{ 

663 "is_admin_only": False, 

664 "path": "\\\\%s\\foo" % server['public_address'], 

665 "metadata": {"export_location_metadata_example": "example"} 

666 }] 

667 self.assertEqual(expected_export_location, result) 

668 (self._helper._get_share_group_name_from_export_location. 

669 assert_called_once_with(export_location)) 

670 

671 @ddt.data( 

672 {'public_address_with_suffix': 'foo'}, 

673 {'with_prefix_public_address': 'bar'}, 

674 {'with_prefix_public_address_and_with_suffix': 'quuz'}, {}) 

675 def test_get_exports_for_share_with_exception(self, server): 

676 export_location = '1.2.3.4:/foo/bar' 

677 

678 self.assertRaises( 

679 exception.ManilaException, 

680 self._helper.get_exports_for_share, server, export_location) 

681 

682 @ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo') 

683 def test_get_share_path_by_export_location(self, export_location): 

684 fake_path = ' /bar/quuz\n ' 

685 fake_server = dict() 

686 self.mock_object( 

687 self._helper, '_ssh_exec', 

688 mock.Mock(return_value=(fake_path, 'fake'))) 

689 self.mock_object( 

690 self._helper, '_get_share_group_name_from_export_location', 

691 mock.Mock(side_effect=( 

692 self._helper._get_share_group_name_from_export_location))) 

693 

694 result = self._helper.get_share_path_by_export_location( 

695 fake_server, export_location) 

696 

697 self.assertEqual('/bar/quuz', result) 

698 self._helper._ssh_exec.assert_called_once_with( 

699 fake_server, ['sudo', 'net', 'conf', 'getparm', 'foo', 'path']) 

700 (self._helper._get_share_group_name_from_export_location. 

701 assert_called_once_with(export_location)) 

702 

703 def test_disable_access_for_maintenance(self): 

704 allowed_hosts = ['test', 'test2'] 

705 maintenance_path = os.path.join( 

706 self._helper.configuration.share_mount_path, 

707 "%s.maintenance" % self.share_name) 

708 self.mock_object(self._helper, '_set_allow_hosts') 

709 self.mock_object(self._helper, '_get_allow_hosts', 

710 mock.Mock(return_value=allowed_hosts)) 

711 

712 self._helper.disable_access_for_maintenance( 

713 self.server_details, self.share_name) 

714 

715 self._helper._get_allow_hosts.assert_called_once_with( 

716 self.server_details, self.share_name) 

717 self._helper._set_allow_hosts.assert_called_once_with( 

718 self.server_details, [], self.share_name) 

719 kickoff_user_cmd = ['sudo', 'smbstatus', '-S'] 

720 self._helper._ssh_exec.assert_any_call( 

721 self.server_details, kickoff_user_cmd) 

722 valid_cmd = ['echo', "'test test2'", '|', 'sudo', 'tee', 

723 maintenance_path] 

724 self._helper._ssh_exec.assert_any_call( 

725 self.server_details, valid_cmd) 

726 

727 def test__kick_out_users_success(self): 

728 smbstatus_return = """Service pid machine Connected at 

729------------------------------------------------------- 

730fake_share_name 1001 fake_machine1 Thu Sep 14 14:59:07 2017 

731fake_share_name 1002 fake_machine2 Thu Sep 14 14:59:07 2017 

732""" 

733 self.mock_object(self._helper, '_ssh_exec', mock.Mock( 

734 side_effect=[(smbstatus_return, "fake_stderr"), ("fake", "fake")])) 

735 self._helper._kick_out_users(self.server_details, self.share_name) 

736 self._helper._ssh_exec.assert_any_call( 

737 self.server_details, ['sudo', 'smbstatus', '-S']) 

738 self._helper._ssh_exec.assert_any_call( 

739 self.server_details, ["sudo", "kill", "-15", "1001", "1002"]) 

740 

741 def test__kick_out_users_failed(self): 

742 smbstatus_return = """Service pid machine Connected at 

743------------------------------------------------------- 

744fake line 

745""" 

746 self.mock_object(self._helper, '_ssh_exec', mock.Mock( 

747 return_value=(smbstatus_return, "fake_stderr"))) 

748 self.assertRaises(exception.ShareBackendException, 

749 self._helper._kick_out_users, self.server_details, 

750 self.share_name) 

751 

752 def test_restore_access_after_maintenance(self): 

753 fake_maintenance_path = "test.path" 

754 self.mock_object(self._helper, '_set_allow_hosts') 

755 self.mock_object(self._helper, '_get_maintenance_file_path', 

756 mock.Mock(return_value=fake_maintenance_path)) 

757 self.mock_object(self._helper, '_ssh_exec', 

758 mock.Mock(side_effect=[("fake fake2", 0), "fake"])) 

759 

760 self._helper.restore_access_after_maintenance( 

761 self.server_details, self.share_name) 

762 

763 self._helper._set_allow_hosts.assert_called_once_with( 

764 self.server_details, ['fake', 'fake2'], self.share_name) 

765 self._helper._ssh_exec.assert_any_call( 

766 self.server_details, ['cat', fake_maintenance_path]) 

767 self._helper._ssh_exec.assert_any_call( 

768 self.server_details, ['sudo', 'rm', '-f', fake_maintenance_path]) 

769 

770 

771@ddt.ddt 

772class CIFSHelperUserAccessTestCase(test.TestCase): 

773 """Test case for CIFS helper with user access.""" 

774 access_rw = dict( 

775 access_level=const.ACCESS_LEVEL_RW, 

776 access_type='user', 

777 access_to='manila-user') 

778 access_ro = dict( 

779 access_level=const.ACCESS_LEVEL_RO, 

780 access_type='user', 

781 access_to='manila-user') 

782 

783 def setUp(self): 

784 super(CIFSHelperUserAccessTestCase, self).setUp() 

785 self.server_details = {'instance_id': 'fake', 

786 'public_address': '1.2.3.4', } 

787 self.share_name = 'fake_share_name' 

788 self.fake_conf = manila.share.configuration.Configuration(None) 

789 self._ssh_exec = mock.Mock(return_value=('', '')) 

790 self._execute = mock.Mock(return_value=('', '')) 

791 self._helper = helpers.CIFSHelperUserAccess( 

792 self._execute, self._ssh_exec, self.fake_conf) 

793 

794 def test_update_access_exception_type(self): 

795 access_rules = [test_generic.get_fake_access_rule( 

796 'user1', const.ACCESS_LEVEL_RW, access_type='ip')] 

797 self.assertRaises(exception.InvalidShareAccess, 

798 self._helper.update_access, self.server_details, 

799 self.share_name, access_rules, [], []) 

800 

801 def test_update_access(self): 

802 access_list = [test_generic.get_fake_access_rule( 

803 'user1', const.ACCESS_LEVEL_RW, access_type='user'), 

804 test_generic.get_fake_access_rule( 

805 'user2', const.ACCESS_LEVEL_RO, access_type='user')] 

806 self._helper.update_access(self.server_details, self.share_name, 

807 access_list, [], []) 

808 

809 self._helper._ssh_exec.assert_has_calls([ 

810 mock.call(self.server_details, 

811 ['sudo', 'net', 'conf', 'setparm', self.share_name, 

812 'valid users', 'user1']), 

813 mock.call(self.server_details, 

814 ['sudo', 'net', 'conf', 'setparm', self.share_name, 

815 'read list', 'user2']) 

816 ]) 

817 

818 def test_update_access_exception_level(self): 

819 access_rules = [test_generic.get_fake_access_rule( 

820 'user1', 'fake_level', access_type='user'), ] 

821 self.assertRaises( 

822 exception.InvalidShareAccessLevel, 

823 self._helper.update_access, 

824 self.server_details, 

825 self.share_name, 

826 access_rules, 

827 [], 

828 []) 

829 

830 

831@ddt.ddt 

832class NFSSynchronizedTestCase(test.TestCase): 

833 

834 @helpers.nfs_synchronized 

835 def wrapped_method(self, server, share_name): 

836 return server['instance_id'] + share_name 

837 

838 @ddt.data( 

839 ({'lock_name': 'FOO', 'instance_id': 'QUUZ'}, 'nfs-FOO'), 

840 ({'instance_id': 'QUUZ'}, 'nfs-QUUZ'), 

841 ) 

842 @ddt.unpack 

843 def test_with_lock_name(self, server, expected_lock_name): 

844 share_name = 'fake_share_name' 

845 self.mock_object( 

846 helpers.utils, 'synchronized', 

847 mock.Mock(side_effect=helpers.utils.synchronized)) 

848 

849 result = self.wrapped_method(server, share_name) 

850 

851 self.assertEqual(server['instance_id'] + share_name, result) 

852 helpers.utils.synchronized.assert_called_once_with( 

853 expected_lock_name, external=True)