Coverage for manila/tests/share/drivers/macrosan/test_macrosan_nas.py: 99%

927 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2022 MacroSAN Technologies Co., Ltd. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16""" 

17Share driver test for Macrosan Storage Array. 

18""" 

19import ddt 

20import requests 

21 

22from oslo_config import cfg 

23from unittest import mock 

24 

25from manila import context 

26from manila import exception 

27from manila.share import configuration 

28from manila.share import driver 

29from manila.share.drivers.macrosan import macrosan_constants as constants 

30from manila.share.drivers.macrosan import macrosan_helper 

31from manila.share.drivers.macrosan import macrosan_nas 

32from manila.share.drivers.macrosan import rest_helper 

33from manila import test 

34from manila.tests import fake_share 

35 

36CONF = cfg.CONF 

37 

38 

39class FakeResponse(object): 

40 def __init__(self, status, result): 

41 self.status_code = status 

42 self.text = 'return message' 

43 self.response = result 

44 

45 def json(self): 

46 return self.response 

47 

48 def close(self): 

49 pass 

50 

51 

52@ddt.ddt 

53class MacrosanShareDriverTestCase(test.TestCase): 

54 

55 def setUp(self): 

56 self.mock_object(macrosan_nas.CONF, '_check_required_opts') 

57 super(MacrosanShareDriverTestCase, self).setUp() 

58 

59 def _safe_get(opt): 

60 return getattr(self.configuration, opt) 

61 

62 self._context = context.get_admin_context() 

63 self.configuration = mock.Mock(spec=configuration.Configuration) 

64 self.configuration.safe_get = mock.Mock(side_effect=_safe_get) 

65 self.configuration.driver_handles_share_servers = False 

66 self.configuration.share_backend_name = 'fake_share_backend_name' 

67 self.configuration.macrosan_nas_http_protocol = 'https' 

68 self.configuration.macrosan_nas_ip = 'fake_ip' 

69 self.configuration.macrosan_nas_port = 'fake_port' 

70 self.configuration.macrosan_nas_username = 'fake_username' 

71 self.configuration.macrosan_nas_password = 'fake_password' 

72 self.configuration.macrosan_nas_prefix = 'nas' 

73 self.configuration.macrosan_share_pools = ['fake_pool'] 

74 self.configuration.macrosan_timeout = 60 

75 self.configuration.macrosan_ssl_cert_verify = False 

76 

77 self.configuration.network_config_group = 'fake_network_config_group' 

78 self.configuration.admin_network_config_group = ( 

79 'fake_admin_network_config_group') 

80 self.configuration.config_group = 'fake_config_group' 

81 self.configuration.reserved_share_percentage = 0 

82 self.configuration.reserved_share_from_snapshot_percentage = 0 

83 self.configuration.reserved_share_extend_percentage = 0 

84 self.configuration.filter_function = None 

85 self.configuration.goodness_function = None 

86 self.driver = macrosan_nas.MacrosanNasDriver( 

87 configuration=self.configuration) 

88 self.result_success_storage_pools = { 

89 'code': 0, 

90 'message': 'success', 

91 'data': [{ 

92 'name': 'fake_pool', 

93 'size': '1000.0G', 

94 'allocated': '100G', 

95 'free': '900G', 

96 'health': 'ONLINE', 

97 'rwStatus': 'off' 

98 }] 

99 } 

100 

101 def test_do_setup(self): 

102 mock_login = self.mock_object(rest_helper.RestHelper, 'login') 

103 self.driver.do_setup(self._context) 

104 mock_login.assert_called_once() 

105 

106 def test_do_setup_login_fail(self): 

107 mock_login = self.mock_object( 

108 rest_helper.RestHelper, 'login', 

109 mock.Mock( 

110 side_effect=exception.ShareBackendException( 

111 msg='fake_exception'))) 

112 self.assertRaises(exception.ShareBackendException, 

113 self.driver.do_setup, 

114 self._context) 

115 mock_login.assert_called_once() 

116 

117 @ddt.data({'nfs_status': constants.NFS_NON_CONFIG, 

118 'cifs_status': constants.CIFS_NON_CONFIG}, 

119 {'nfs_status': constants.NFS_DISABLED, 

120 'cifs_status': constants.CIFS_DISABLED}, 

121 {'nfs_status': constants.NFS_ENABLED, 

122 'cifs_status': constants.CIFS_ENABLED}, 

123 {'nfs_status': constants.NFS_ENABLED, 

124 'cifs_status': constants.CIFS_SHARE_MODE}) 

125 @ddt.unpack 

126 def test_check_for_setup_error_non_config(self, nfs_status, cifs_status): 

127 mock_gnss = self.mock_object( 

128 rest_helper.RestHelper, '_get_nfs_service_status', 

129 mock.Mock(return_value={ 

130 "serviceStatus": nfs_status, 

131 "nfs3Status": constants.NFS_NON_SUPPORTED, 

132 "nfs4Status": constants.NFS_NON_SUPPORTED 

133 })) 

134 

135 mock_cns = self.mock_object(rest_helper.RestHelper, 

136 '_config_nfs_service') 

137 mock_sns = self.mock_object(rest_helper.RestHelper, 

138 '_start_nfs_service') 

139 if cifs_status == constants.CIFS_DISABLED: 

140 mock_gcss = self.mock_object( 

141 rest_helper.RestHelper, '_get_cifs_service_status', 

142 mock.Mock(side_effect=[cifs_status, 

143 constants.CIFS_SHARE_MODE])) 

144 else: 

145 mock_gcss = self.mock_object( 

146 rest_helper.RestHelper, '_get_cifs_service_status', 

147 mock.Mock(return_value=cifs_status)) 

148 mock_ccs = self.mock_object(rest_helper.RestHelper, 

149 '_config_cifs_service') 

150 mock_scs = self.mock_object(rest_helper.RestHelper, 

151 '_start_cifs_service') 

152 self.driver.check_for_setup_error() 

153 if (nfs_status == constants.NFS_NON_CONFIG or 

154 nfs_status == constants.NFS_DISABLED): 

155 mock_cns.assert_called_once() 

156 mock_sns.assert_called_once() 

157 else: 

158 mock_cns.assert_called_once() 

159 mock_gnss.assert_called_once() 

160 if cifs_status == constants.CIFS_NON_CONFIG: 

161 mock_gcss.assert_called_once() 

162 mock_ccs.assert_called_once() 

163 mock_scs.assert_called_once() 

164 elif cifs_status == constants.CIFS_DISABLED: 

165 mock_gcss.assert_called() 

166 mock_ccs.assert_called_once() 

167 mock_scs.assert_called_once() 

168 elif cifs_status == constants.CIFS_SHARE_MODE: 

169 mock_gcss.assert_called_once() 

170 mock_ccs.assert_called_once() 

171 else: 

172 mock_gcss.assert_called_once() 

173 

174 def test_check_for_setup_error_nfs_service_error(self): 

175 mock_gnss = self.mock_object( 

176 rest_helper.RestHelper, '_get_nfs_service_status', 

177 mock.Mock(return_value={ 

178 "serviceStatus": constants.NFS_EXCEPTION, 

179 "nfs3Status": constants.NFS_NON_SUPPORTED, 

180 "nfs4Status": constants.NFS_NON_SUPPORTED 

181 })) 

182 self.assertRaises(exception.MacrosanBackendExeption, 

183 self.driver.check_for_setup_error) 

184 mock_gnss.assert_called_once() 

185 

186 def test_check_for_setup_error_cifs_service_error(self): 

187 mock_gnss = self.mock_object( 

188 rest_helper.RestHelper, '_get_nfs_service_status', 

189 mock.Mock(return_value={ 

190 "serviceStatus": constants.NFS_ENABLED, 

191 "nfs3Status": constants.NFS_SUPPORTED, 

192 "nfs4Status": constants.NFS_SUPPORTED 

193 })) 

194 mock_gcss = self.mock_object( 

195 rest_helper.RestHelper, '_get_cifs_service_status', 

196 mock.Mock(return_value=constants.CIFS_EXCEPTION)) 

197 self.assertRaises(exception.MacrosanBackendExeption, 

198 self.driver.check_for_setup_error) 

199 mock_gnss.assert_called_once() 

200 mock_gcss.assert_called_once() 

201 

202 @ddt.data('nfs', 'cifs') 

203 def test_create_share(self, share_proto): 

204 share = fake_share.fake_share( 

205 share_proto=share_proto, host="fake_host@fake_backend#fake_pool") 

206 mock_cf = self.mock_object(rest_helper.RestHelper, 

207 '_create_filesystem') 

208 mock_cfd = self.mock_object(rest_helper.RestHelper, 

209 '_create_filesystem_dir') 

210 mock_cns = self.mock_object(rest_helper.RestHelper, 

211 '_create_nfs_share') 

212 self.mock_object(macrosan_helper.MacrosanHelper, 

213 '_ensure_user', 

214 mock.Mock(return_value=True)) 

215 mock_ccs = self.mock_object(rest_helper.RestHelper, 

216 '_create_cifs_share') 

217 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1" 

218 

219 location = self.driver.create_share(self._context, share) 

220 if share_proto == 'nfs': 

221 expect_location = r'172.0.0.1:/manila_fakeid/manila_fakeid' 

222 print('test location:', location) 

223 self.assertEqual(location, expect_location) 

224 else: 

225 expect_location = r'\\172.0.0.1\manila_fakeid' 

226 self.assertEqual(location, expect_location) 

227 mock_cf.assert_called_once_with(fs_name='manila_fakeid', 

228 pool_name='fake_pool', 

229 filesystem_quota='1GB') 

230 mock_cf.assert_called() 

231 share_path = self.driver.helper._generate_share_path('manila_fakeid') 

232 mock_cfd.assert_called_once_with(share_path) 

233 if share_proto == 'nfs': 

234 mock_cns.assert_called_once_with(share_path=share_path) 

235 else: 

236 mock_ccs.assert_called_once() 

237 

238 def test_create_share_user_error(self): 

239 share = fake_share.fake_share( 

240 share_proto='cifs', host="fake_host@fake_backend#fake_pool") 

241 mock_cf = self.mock_object(rest_helper.RestHelper, 

242 '_create_filesystem') 

243 mock_cfd = self.mock_object(rest_helper.RestHelper, 

244 '_create_filesystem_dir') 

245 self.mock_object(macrosan_helper.MacrosanHelper, 

246 '_ensure_user', 

247 mock.Mock(return_value=False)) 

248 mock_df = self.mock_object(rest_helper.RestHelper, 

249 '_delete_filesystem') 

250 self.assertRaises(exception.MacrosanBackendExeption, 

251 self.driver.create_share, 

252 self._context, 

253 share) 

254 mock_cf.assert_called_once() 

255 share_path = self.driver.helper._generate_share_path('manila_fakeid') 

256 mock_cfd.assert_called_once_with(share_path) 

257 mock_df.assert_called_once_with('manila_fakeid') 

258 

259 @ddt.data('nfs', 'cifs') 

260 def test_delete_share(self, share_proto): 

261 share = fake_share.fake_share( 

262 share_proto=share_proto, host="fake_host@fake_backend#fake_pool") 

263 expect_share_path = self.driver.helper._generate_share_path( 

264 'manila_fakeid') 

265 

266 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

267 mock.Mock(return_value={ 

268 "path": "fake_path", 

269 "clients": ["client"], 

270 "protocol": "fake_protocol" 

271 })) 

272 mock_dns = self.mock_object( 

273 rest_helper.RestHelper, '_delete_nfs_share') 

274 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

275 mock.Mock(return_value={ 

276 "path": "fake_path", 

277 "cifsname": "fake_cifsname", 

278 "protocol": "fake_protocol", 

279 "roList": ["fake_ro"], 

280 "rwList": ["fake_rw"], 

281 "allowList": ["fake_allow"], 

282 "denyList": ["fake_deny"] 

283 })) 

284 mock_dcs = self.mock_object(rest_helper.RestHelper, 

285 '_delete_cifs_share') 

286 mock_df = self.mock_object(rest_helper.RestHelper, 

287 '_delete_filesystem') 

288 self.driver.delete_share(self._context, share) 

289 

290 if share_proto == "nfs": 

291 mock_gns.assert_called_once_with(expect_share_path) 

292 mock_dns.assert_called_once_with(expect_share_path) 

293 else: 

294 mock_gcs.assert_called_once_with(expect_share_path) 

295 mock_dcs.assert_called_once_with('manila_fakeid', 

296 expect_share_path) 

297 mock_df.assert_called_once_with('manila_fakeid') 

298 

299 @ddt.data('nfs', 'cifs') 

300 def test_delete_share_not_exist(self, share_proto): 

301 share = fake_share.fake_share(share_proto=share_proto, 

302 host="fake_host@fake_backend#fake_pool") 

303 expect_share_path = self.driver.helper._generate_share_path( 

304 'manila_fakeid') 

305 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

306 mock.Mock(return_value=None)) 

307 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem', 

308 mock.Mock(return_value={ 

309 "name": "fake_name", 

310 "poolName": "fake_pool", 

311 "quotaStatus": "1GB" 

312 })) 

313 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

314 mock.Mock(return_value=None)) 

315 mock_df = self.mock_object(rest_helper.RestHelper, 

316 '_delete_filesystem') 

317 self.driver.delete_share(self._context, share) 

318 

319 if share_proto == 'nfs': 

320 mock_gns.assert_called_once_with(expect_share_path) 

321 else: 

322 mock_gcs.assert_called_once_with(expect_share_path) 

323 

324 mock_gf.assert_called_once_with('manila_fakeid') 

325 mock_df.assert_called_once_with('manila_fakeid') 

326 

327 @ddt.data('nfs', 'cifs') 

328 def test_extend_share(self, share_proto): 

329 share = fake_share.fake_share(share_proto=share_proto, 

330 host="fake_host@fake_backend#fake_pool") 

331 expect_share_path = self.driver.helper._generate_share_path( 

332 'manila_fakeid') 

333 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

334 mock.Mock(return_value={ 

335 "path": "fake_path", 

336 "clients": ["client"], 

337 "protocol": "fake_protocol" 

338 })) 

339 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

340 mock.Mock(return_value={ 

341 "path": "fake_path", 

342 "cifsname": "fake_cifsname", 

343 "protocol": "fake_protocol", 

344 "roList": ["fake_ro"], 

345 "rwList": ["fake_rw"], 

346 "allowList": ["fake_allow"], 

347 "denyList": ["fake_deny"] 

348 })) 

349 mock_uss = self.mock_object(rest_helper.RestHelper, 

350 '_update_share_size') 

351 self.driver.extend_share(share, 2) 

352 

353 if share_proto == 'nfs': 

354 mock_gns.assert_called_once_with(expect_share_path) 

355 else: 

356 mock_gcs.assert_called_once_with(expect_share_path) 

357 

358 mock_uss.assert_called_once_with('manila_fakeid', '2GB') 

359 

360 def test_extend_share_not_exist(self): 

361 share = fake_share.fake_share(share_proto='nfs', 

362 size=1, 

363 host="fake_host@fake_backend#fake_pool") 

364 expect_share_path = self.driver.helper._generate_share_path( 

365 'manila_fakeid') 

366 mock_gns = self.mock_object(rest_helper.RestHelper, 

367 '_get_nfs_share', 

368 mock.Mock(return_value=None)) 

369 self.assertRaises(exception.ShareResourceNotFound, 

370 self.driver.extend_share, 

371 share, 

372 2) 

373 

374 mock_gns.assert_called_once_with(expect_share_path) 

375 

376 @ddt.data('nfs', 'cifs') 

377 def test_shrink_share(self, share_proto): 

378 share = fake_share.fake_share(share_proto=share_proto, 

379 size=5, 

380 host="fake_host@fake_backend#fake_pool") 

381 expect_share_path = self.driver.helper._generate_share_path( 

382 'manila_fakeid') 

383 mock_gns = self.mock_object(rest_helper.RestHelper, 

384 '_get_nfs_share', 

385 mock.Mock(return_value={ 

386 "path": "fake_path", 

387 "clients": ["client"], 

388 "protocol": "fake_protocol" 

389 })) 

390 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

391 mock.Mock(return_value={ 

392 "path": "fake_path", 

393 "cifsname": "fake_cifsname", 

394 "protocol": "fake_protocol", 

395 "roList": ["fake_ro"], 

396 "rwList": ["fake_rw"], 

397 "allowList": ["fake_allow"], 

398 "denyList": ["fake_deny"] 

399 })) 

400 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem', 

401 mock.Mock(return_value={ 

402 "name": "fake_name", 

403 "poolName": "fake_pool", 

404 "quotaStatus": "5GB", 

405 "usedCapacity": '1GB' 

406 })) 

407 mock_uss = self.mock_object(rest_helper.RestHelper, 

408 '_update_share_size') 

409 self.driver.shrink_share(share, 3) 

410 if share_proto == 'nfs': 

411 mock_gns.assert_called_once_with(expect_share_path) 

412 else: 

413 mock_gcs.assert_called_once_with(expect_share_path) 

414 

415 mock_gf.assert_called_once_with('manila_fakeid') 

416 mock_uss.assert_called_once_with('manila_fakeid', '3GB') 

417 

418 @ddt.data('nfs', 'cifs') 

419 def test_shrink_share_not_exist(self, share_proto): 

420 share = fake_share.fake_share(share_proto=share_proto, 

421 size=3, 

422 host="fake_host@fake_backend#fake_pool") 

423 expect_share_path = self.driver.helper._generate_share_path( 

424 'manila_fakeid') 

425 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

426 mock.Mock(return_value=None)) 

427 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

428 mock.Mock(return_value=None)) 

429 

430 self.assertRaises(exception.ShareResourceNotFound, 

431 self.driver.shrink_share, 

432 share, 

433 1) 

434 if share_proto == 'nfs': 

435 mock_gns.assert_called_once_with(expect_share_path) 

436 elif share_proto == 'cifs': 436 ↛ exitline 436 didn't return from function 'test_shrink_share_not_exist' because the condition on line 436 was always true

437 mock_gcs.assert_called_once_with(expect_share_path) 

438 

439 def test_shrink_share_size_fail(self): 

440 share = fake_share.fake_share(share_proto='nfs', 

441 size=3, 

442 host="fake_host@fake_backend#fake_pool") 

443 expect_share_path = self.driver.helper._generate_share_path( 

444 'manila_fakeid') 

445 mock_gns = self.mock_object(rest_helper.RestHelper, 

446 '_get_nfs_share', 

447 mock.Mock(return_value={ 

448 "path": "fake_path", 

449 "clients": ["client"], 

450 "protocol": "fake_protocol" 

451 })) 

452 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem', 

453 mock.Mock(return_value={ 

454 "name": "fake_name", 

455 "poolName": "fake_pool", 

456 "quotaStatus": "3GB", 

457 "usedCapacity": '2GB' 

458 })) 

459 self.assertRaises(exception.ShareShrinkingPossibleDataLoss, 

460 self.driver.shrink_share, 

461 share, 

462 1) 

463 mock_gf.assert_called_once_with('manila_fakeid') 

464 mock_gns.assert_called_once_with(expect_share_path) 

465 

466 @ddt.data('nfs', 'cifs') 

467 def test_ensure_share(self, share_proto): 

468 share = fake_share.fake_share(share_proto=share_proto, 

469 host="fake_host@fake_backend#fake_pool") 

470 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

471 mock.Mock(return_value={ 

472 "path": "fake_path", 

473 "clients": ["client"], 

474 "protocol": "fake_protocol" 

475 })) 

476 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

477 mock.Mock(return_value={ 

478 "path": "fake_path", 

479 "cifsname": "fake_cifsname", 

480 "protocol": "fake_protocol", 

481 "roList": ["fake_ro"], 

482 "rwList": ["fake_rw"], 

483 "allowList": ["fake_allow"], 

484 "denyList": ["fake_deny"], 

485 })) 

486 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1" 

487 locations = self.driver.ensure_share(self._context, share) 

488 expect_share_path = self.driver.helper._generate_share_path( 

489 'manila_fakeid') 

490 if share_proto == 'nfs': 

491 expect_locations = [r'172.0.0.1:/manila_fakeid/manila_fakeid'] 

492 self.assertEqual(locations, expect_locations) 

493 mock_gns.assert_called_once_with(expect_share_path) 

494 else: 

495 expect_locations = [r'\\172.0.0.1\manila_fakeid'] 

496 self.assertEqual(locations, expect_locations) 

497 mock_gcs.assert_called_once_with(expect_share_path) 

498 

499 def test_ensure_share_proto_fail(self): 

500 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool") 

501 self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

502 mock.Mock(return_value={ 

503 "path": "fake_path", 

504 "clients": ["client"], 

505 "protocol": "fake_protocol" 

506 })) 

507 self.assertRaises(exception.MacrosanBackendExeption, 

508 self.driver.ensure_share, 

509 self._context, 

510 share) 

511 

512 def test_ensure_share_not_exist(self): 

513 share = fake_share.fake_share(share_proto='nfs', 

514 host="fake_host@fake_backend#fake_pool") 

515 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

516 mock.Mock(return_value=None)) 

517 self.assertRaises(exception.ShareResourceNotFound, 

518 self.driver.ensure_share, 

519 self._context, 

520 share) 

521 expect_share_path = self.driver.helper._generate_share_path( 

522 'manila_fakeid') 

523 mock_gns.assert_called_once_with(expect_share_path) 

524 

525 @ddt.data('nfs', 'cifs') 

526 def test_allow_access_success(self, share_proto): 

527 share = fake_share.fake_share(share_proto=share_proto, 

528 host="fake_host@fake_backend#fake_pool") 

529 if share_proto == 'nfs': 

530 access = { 

531 'access_type': 'ip', 

532 'access_to': '0.0.0.0/0', 

533 'access_level': 'rw', 

534 } 

535 else: 

536 access = { 

537 'access_type': 'user', 

538 'access_to': 'fake_user', 

539 'access_level': 'rw', 

540 } 

541 mock_gns = self.mock_object(rest_helper.RestHelper, 

542 '_get_nfs_share', 

543 mock.Mock(return_value={ 

544 "path": "fake_path", 

545 "clients": ["client"], 

546 "protocol": "fake_protocol" 

547 })) 

548 mock_gafns = self.mock_object(rest_helper.RestHelper, 

549 '_get_access_from_nfs_share', 

550 mock.Mock(return_value=None)) 

551 mock_anar = self.mock_object(rest_helper.RestHelper, 

552 '_allow_nfs_access_rest') 

553 mock_gcs = self.mock_object(rest_helper.RestHelper, 

554 '_get_cifs_share', 

555 mock.Mock(return_value={ 

556 "path": "fake_path", 

557 "cifsname": "fake_cifsname", 

558 "protocol": "fake_protocol", 

559 "roList": ["fake_ro"], 

560 "rwList": ["fake_rw"], 

561 "allowList": ["fake_allow"], 

562 "denyList": ["fake_deny"], 

563 })) 

564 mock_gafcs = self.mock_object(rest_helper.RestHelper, 

565 '_get_access_from_cifs_share', 

566 mock.Mock(return_value=None)) 

567 mock_acar = self.mock_object(rest_helper.RestHelper, 

568 '_allow_cifs_access_rest') 

569 self.driver.helper._allow_access(share, access) 

570 expect_share_path = self.driver.helper._generate_share_path( 

571 'manila_fakeid') 

572 if access['access_to'] == '0.0.0.0/0': 

573 access['access_to'] = '*' 

574 if share_proto == 'nfs': 

575 mock_gns.assert_called_once_with(expect_share_path) 

576 mock_gafns.assert_called_once_with(expect_share_path, 

577 access['access_to']) 

578 mock_anar.assert_called_once_with(expect_share_path, 

579 access['access_to'], 

580 access['access_level']) 

581 else: 

582 mock_gcs.assert_called_once_with(expect_share_path) 

583 mock_gafcs.assert_called_once_with(expect_share_path, 

584 access['access_to']) 

585 mock_acar.assert_called_once_with(expect_share_path, 

586 access['access_to'], 

587 access['access_level']) 

588 

589 def test_allow_access_nfs_change(self): 

590 share = fake_share.fake_share(share_proto='nfs', 

591 host="fake_host@fake_backend#fake_pool") 

592 access = { 

593 'access_type': 'ip', 

594 'access_to': '172.0.0.1', 

595 'access_level': 'rw', 

596 } 

597 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

598 mock.Mock(return_value={ 

599 "path": "/manila_fakeid", 

600 "clients": ["client"], 

601 "protocol": "fake_protocol" 

602 })) 

603 mock_gafns = self.mock_object(rest_helper.RestHelper, 

604 '_get_access_from_nfs_share', 

605 mock.Mock(return_value={ 

606 "path": "/manila_fakeid", 

607 "clientName": "fake_client_name", 

608 "accessRight": "ro", 

609 })) 

610 mock_cnar = self.mock_object(rest_helper.RestHelper, 

611 '_change_nfs_access_rest') 

612 self.driver.helper._allow_access(share, access) 

613 expect_share_path = self.driver.helper._generate_share_path( 

614 'manila_fakeid') 

615 mock_gns.assert_called_once_with(expect_share_path) 

616 mock_gafns.assert_called_once_with(expect_share_path, 

617 access['access_to']) 

618 mock_cnar.assert_called_once_with(expect_share_path, 

619 access['access_to'], 

620 access['access_level']) 

621 

622 def test_allow_access_cifs_change(self): 

623 share = fake_share.fake_share(share_proto='cifs', 

624 host="fake_host@fake_backend#fake_pool") 

625 access = { 

626 'access_type': 'user', 

627 'access_to': 'fake_user', 

628 'access_level': 'rw', 

629 } 

630 mock_gcs = self.mock_object(rest_helper.RestHelper, 

631 '_get_cifs_share', 

632 mock.Mock(return_value={ 

633 "path": "fake_path", 

634 "cifsname": "fake_cifsname", 

635 "protocol": "fake_protocol", 

636 "roList": ["fake_ro"], 

637 "rwList": ["fake_rw"], 

638 "allowList": ["fake_allow"], 

639 "denyList": ["fake_deny"], 

640 })) 

641 mock_gafcs = self.mock_object(rest_helper.RestHelper, 

642 '_get_access_from_cifs_share', 

643 mock.Mock(return_value={ 

644 "path": "fake_path", 

645 "ugName": "fake_user", 

646 "ugType": "0", 

647 "accessRight": "ro", 

648 })) 

649 mock_ccar = self.mock_object(rest_helper.RestHelper, 

650 '_change_cifs_access_rest') 

651 self.driver.helper._allow_access(share, access) 

652 expect_share_path = self.driver.helper._generate_share_path( 

653 'manila_fakeid') 

654 mock_gcs.assert_called_once_with(expect_share_path) 

655 mock_gafcs.assert_called_once_with(expect_share_path, 

656 access['access_to']) 

657 mock_ccar.assert_called_once_with(expect_share_path, 

658 access['access_to'], 

659 access['access_level'], 

660 '0') 

661 

662 @ddt.data( 

663 { 

664 'access_type': 'user', 

665 'access_to': 'user_name', 

666 'access_level': 'rw', 

667 }, 

668 { 

669 'access_type': 'user', 

670 'access_to': 'group_name', 

671 'access_level': 'rw', 

672 }, 

673 { 

674 'access_type': 'user', 

675 'access_to': '/domain_user', 

676 'access_level': 'rw', 

677 }, 

678 { 

679 'access_type': 'user', 

680 'access_to': '/domain_group', 

681 'access_level': 'rw', 

682 }, 

683 ) 

684 def test_allow_access_cifs(self, access): 

685 share = fake_share.fake_share(share_proto='cifs', 

686 host="fake_host@fake_backend#fake_pool") 

687 mock_gcs = self.mock_object(rest_helper.RestHelper, 

688 '_get_cifs_share', 

689 mock.Mock(return_value={ 

690 "path": "fake_path", 

691 "cifsname": "fake_cifsname", 

692 "protocol": "fake_protocol", 

693 "roList": ["fake_ro"], 

694 "rwList": ["fake_rw"], 

695 "allowList": ["fake_allow"], 

696 "denyList": ["fake_deny"], 

697 })) 

698 mock_gafcs = self.mock_object(rest_helper.RestHelper, 

699 '_get_access_from_cifs_share', 

700 mock.Mock(return_value=None)) 

701 mock_acar = self.mock_object(rest_helper.RestHelper, 

702 '_allow_cifs_access_rest') 

703 

704 self.driver.helper._allow_access(share, access) 

705 expect_share_path = self.driver.helper._generate_share_path( 

706 'manila_fakeid') 

707 mock_gcs.assert_called_once_with(expect_share_path) 

708 mock_gafcs.assert_called_once_with(expect_share_path, 

709 access['access_to']) 

710 mock_acar.assert_called_once_with(expect_share_path, 

711 access['access_to'], 

712 access['access_level']) 

713 

714 @ddt.data('nfs', 'cifs') 

715 def test_allow_access_share_not_exist(self, share_proto): 

716 share = fake_share.fake_share(share_proto=share_proto, 

717 host="fake_host@fake_backend#fake_pool") 

718 access = {} 

719 if share_proto == 'nfs': 

720 access = { 

721 'access_type': 'ip', 

722 'access_to': '172.0.0.1', 

723 'access_level': 'rw', 

724 } 

725 else: 

726 access = { 

727 'access_type': 'user', 

728 'access_to': 'fake_user', 

729 'access_level': 'rw', 

730 } 

731 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share', 

732 mock.Mock(return_value=None)) 

733 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share', 

734 mock.Mock(return_value=None)) 

735 self.assertRaises(exception.ShareResourceNotFound, 

736 self.driver.helper._allow_access, 

737 share, 

738 access) 

739 expect_share_path = self.driver.helper._generate_share_path( 

740 'manila_fakeid') 

741 if share_proto == 'nfs': 

742 mock_gns.assert_called_once_with(expect_share_path) 

743 else: 

744 mock_gcs.assert_called_once_with(expect_share_path) 

745 

746 def test_allow_access_proto_fail(self): 

747 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool") 

748 access = { 

749 'access_type': 'user', 

750 'access_to': 'fake_user', 

751 'access_level': 'rw', 

752 } 

753 self.assertRaises(exception.MacrosanBackendExeption, 

754 self.driver.helper._allow_access, 

755 share, 

756 access) 

757 

758 def test_allow_access_nfs_user_fail(self): 

759 share = fake_share.fake_share(share_proto='nfs', 

760 host="fake_host@fake_backend#fake_pool") 

761 access = { 

762 'access_type': 'user', 

763 'access_to': 'fake_user', 

764 'access_level': 'rw', 

765 } 

766 self.assertRaises(exception.InvalidShareAccess, 

767 self.driver.helper._allow_access, 

768 share, 

769 access) 

770 

771 def test_allow_access_cifs_ip_fail(self): 

772 share = fake_share.fake_share(share_proto='cifs', 

773 host="fake_host@fake_backend#fake_pool") 

774 access = { 

775 'access_type': 'ip', 

776 'access_to': '172.0.0.1', 

777 'access_level': 'rw', 

778 } 

779 self.assertRaises(exception.InvalidShareAccess, 

780 self.driver.helper._allow_access, 

781 share, 

782 access) 

783 

784 def test_allow_access_nfs_level_fail(self): 

785 share = fake_share.fake_share(share_proto='nfs', 

786 host="fake_host@fake_backend#fake_pool") 

787 access = { 

788 'access_type': 'ip', 

789 'access_to': '172.0.0.1', 

790 'access_level': 'r', 

791 } 

792 self.assertRaises(exception.InvalidShareAccess, 

793 self.driver.helper._allow_access, 

794 share, 

795 access) 

796 

797 @ddt.data('nfs', 'cifs') 

798 def test_deny_access(self, share_proto): 

799 share = fake_share.fake_share(share_proto=share_proto, 

800 host="fake_host@fake_backend#fake_pool") 

801 if share_proto == 'nfs': 

802 access = { 

803 'access_type': 'ip', 

804 'access_to': '0.0.0.0/0', 

805 'access_level': 'rw', 

806 } 

807 else: 

808 access = { 

809 'access_type': 'user', 

810 'access_to': 'fake_user', 

811 'access_level': 'rw', 

812 } 

813 mock_gafns = self.mock_object(rest_helper.RestHelper, 

814 '_get_access_from_nfs_share', 

815 mock.Mock(return_value={ 

816 "path": "fake_path", 

817 "clientName": "fake_client_name", 

818 "accessRight": "rw", 

819 })) 

820 mock_dnar = self.mock_object(rest_helper.RestHelper, 

821 '_delete_nfs_access_rest') 

822 mock_gafcs = self.mock_object(rest_helper.RestHelper, 

823 '_get_access_from_cifs_share', 

824 mock.Mock(return_value={ 

825 "path": "fake_path", 

826 "ugName": "fake_user", 

827 "ugType": "0", 

828 "accessRight": "rw", 

829 })) 

830 mock_dcar = self.mock_object(rest_helper.RestHelper, 

831 '_delete_cifs_access_rest') 

832 expect_share_path = self.driver.helper._generate_share_path( 

833 'manila_fakeid') 

834 self.driver.helper._deny_access(share, access) 

835 if access['access_to'] == '0.0.0.0/0': 

836 access['access_to'] = '*' 

837 if share_proto == 'nfs': 

838 mock_gafns.assert_called_once_with(expect_share_path, 

839 access['access_to']) 

840 mock_dnar.assert_called_once_with(expect_share_path, 

841 access['access_to']) 

842 else: 

843 mock_gafcs.assert_called_once_with(expect_share_path, 

844 access['access_to']) 

845 mock_dcar.assert_called_once_with(expect_share_path, 

846 "fake_user", "0") 

847 

848 def test_deny_access_nfs_type_fail(self): 

849 share = fake_share.fake_share(share_proto='nfs', 

850 host="fake_host@fake_backend#fake_pool") 

851 access = { 

852 'access_type': 'fake_type', 

853 'access_to': '172.0.0.1', 

854 'access_level': 'rw', 

855 } 

856 result = self.driver.helper._deny_access(share, access) 

857 self.assertIsNone(result) 

858 

859 def test_deny_access_nfs_share_not_exist(self): 

860 share = fake_share.fake_share(share_proto='nfs', 

861 host="fake_host@fake_backend#fake_pool") 

862 access = { 

863 'access_type': 'ip', 

864 'access_to': '172.0.0.1', 

865 'access_level': 'rw', 

866 } 

867 mock_gafns = self.mock_object(rest_helper.RestHelper, 

868 '_get_access_from_nfs_share', 

869 mock.Mock(return_value=None)) 

870 result = self.driver.helper._deny_access(share, access) 

871 self.assertIsNone(result) 

872 expect_share_path = self.driver.helper._generate_share_path( 

873 'manila_fakeid') 

874 mock_gafns.assert_called_once_with(expect_share_path, 

875 access['access_to']) 

876 

877 def test_deny_access_cifs_type_fail(self): 

878 share = fake_share.fake_share(share_proto='cifs', 

879 host="fake_host@fake_backend#fake_pool") 

880 access = { 

881 'access_type': 'fake_type', 

882 'access_to': 'fake_user', 

883 'access_level': 'rw', 

884 } 

885 result = self.driver.helper._deny_access(share, access) 

886 self.assertIsNone(result) 

887 

888 def test_deny_access_cifs_share_not_exist(self): 

889 share = fake_share.fake_share(share_proto='cifs', 

890 host="fake_host@fake_backend#fake_pool") 

891 access = { 

892 'access_type': 'user', 

893 'access_to': 'fake_user', 

894 'access_level': 'rw', 

895 } 

896 mock_gafcs = self.mock_object(rest_helper.RestHelper, 

897 '_get_access_from_cifs_share', 

898 mock.Mock(return_value=None)) 

899 result = self.driver.helper._deny_access(share, access) 

900 self.assertIsNone(result) 

901 expect_share_path = self.driver.helper._generate_share_path( 

902 'manila_fakeid') 

903 mock_gafcs.assert_called_once_with(expect_share_path, 

904 access['access_to']) 

905 

906 def test_update_access_add_delete(self): 

907 share = fake_share.fake_share(share_proto='nfs', 

908 host="fake_host@fake_backend#fake_pool") 

909 add_rules = [{'access_type': 'ip', 

910 'access_to': '172.0.2.1', 

911 'access_level': 'rw', }] 

912 delete_rules = [{'access_type': 'ip', 

913 'access_to': '172.0.2.2', 

914 'access_level': 'rw', }] 

915 self.mock_object(macrosan_helper.MacrosanHelper, 

916 '_allow_access') 

917 self.mock_object(macrosan_helper.MacrosanHelper, 

918 '_deny_access') 

919 self.driver.update_access(self._context, share, 

920 None, add_rules, delete_rules, None) 

921 

922 @ddt.data('nfs', 'cifs') 

923 def test_update_access_nfs(self, proto): 

924 share = fake_share.fake_share(share_proto=proto, 

925 host="fake_host@fake_backend#fake_pool") 

926 if proto == 'nfs': 

927 access_rules = [{'access_type': 'ip', 

928 'access_to': '172.0.3.1', 

929 'access_level': 'rw', }, 

930 {'access_type': 'ip', 

931 'access_to': '172.0.3.2', 

932 'access_level': 'rw', }] 

933 else: 

934 access_rules = [{'access_type': 'user', 

935 'access_to': 'user_l', 

936 'access_level': 'rw', }, 

937 {'access_type': 'user', 

938 'access_to': 'user_a', 

939 'access_level': 'rw', }] 

940 mock_ca = self.mock_object(macrosan_helper.MacrosanHelper, 

941 '_clear_access') 

942 self.mock_object(macrosan_helper.MacrosanHelper, 

943 '_allow_access') 

944 self.driver.update_access(self._context, share, 

945 access_rules, {}, {}, {}) 

946 mock_ca.assert_called_once_with(share, None) 

947 

948 def test_update_access_fail(self): 

949 share = fake_share.fake_share(share_proto='nfs', 

950 host="fake_host@fake_backend#fake_pool") 

951 access_rules = [{'access_id': 'fakeid', 

952 'access_type': 'ip', 

953 'access_to': '172.0.3.1', 

954 'access_level': 'rw', }] 

955 mock_ca = self.mock_object(macrosan_helper.MacrosanHelper, 

956 '_clear_access') 

957 self.mock_object(macrosan_helper.MacrosanHelper, 

958 '_allow_access', 

959 mock.Mock(side_effect=exception.InvalidShareAccess( 

960 reason='fake_exception'))) 

961 result = self.driver.update_access(self._context, share, 

962 access_rules, None, None, None) 

963 expect = { 

964 'fakeid': { 

965 'state': 'error', 

966 } 

967 } 

968 self.assertEqual(result, expect) 

969 mock_ca.assert_called_once_with(share, None) 

970 

971 def test_update_access_add_fail(self): 

972 share = fake_share.fake_share(share_proto='nfs', 

973 host="fake_host@fake_backend#fake_pool") 

974 add_rules = [{'access_id': 'fakeid', 

975 'access_type': 'ip', 

976 'access_to': '172.0.2.1', 

977 'access_level': 'rw', }] 

978 delete_rules = [] 

979 self.mock_object(macrosan_helper.MacrosanHelper, 

980 '_allow_access', 

981 mock.Mock(side_effect=exception.InvalidShareAccess( 

982 reason='fake_exception'))) 

983 self.mock_object(macrosan_helper.MacrosanHelper, 

984 '_deny_access') 

985 result = self.driver.update_access(self._context, share, 

986 None, add_rules, delete_rules, None) 

987 expect = { 

988 'fakeid': { 

989 'state': 'error' 

990 } 

991 } 

992 self.assertEqual(result, expect) 

993 

994 @ddt.data('nfs', 'cifs') 

995 def test__clear_access(self, share_proto): 

996 share = fake_share.fake_share(share_proto=share_proto, 

997 host="fake_host@fake_backend#fake_pool") 

998 fake_nfs_share_backend = [ 

999 { 

1000 'share_path': 'fake_path', 

1001 'access_to': '172.0.0.1', 

1002 'access_level': 'rw' 

1003 }, 

1004 { 

1005 'share_path': 'default_path', 

1006 'access_to': '172.0.0.2', 

1007 'access_level': 'rw' 

1008 }] 

1009 fake_cifs_share_backend = [ 

1010 { 

1011 'share_path': 'fake_path', 

1012 'access_to': 'user_name', 

1013 'ugType': '0', 

1014 'access_level': 'rw' 

1015 }, 

1016 { 

1017 'share_path': 'default_path', 

1018 'access_to': 'manilanobody', 

1019 'ugType': '0', 

1020 'access_level': 'rw' 

1021 }] 

1022 mock_ganar = self.mock_object( 

1023 rest_helper.RestHelper, 

1024 '_get_all_nfs_access_rest', 

1025 mock.Mock(return_value=fake_nfs_share_backend)) 

1026 mock_gacar = self.mock_object( 

1027 rest_helper.RestHelper, '_get_all_cifs_access_rest', 

1028 mock.Mock(return_value=fake_cifs_share_backend)) 

1029 self.mock_object(rest_helper.RestHelper, 

1030 '_delete_nfs_access_rest') 

1031 self.mock_object(rest_helper.RestHelper, 

1032 '_delete_cifs_access_rest') 

1033 self.driver.helper._clear_access(share) 

1034 expect_share_path = self.driver.helper._generate_share_path( 

1035 'manila_fakeid') 

1036 if share_proto == 'nfs': 

1037 mock_ganar.assert_called_once_with(expect_share_path) 

1038 else: 

1039 mock_gacar.assert_called_once_with(expect_share_path) 

1040 

1041 @ddt.data('nfs', 'cifs') 

1042 def test__clear_access_no_access_list(self, share_proto): 

1043 share = fake_share.fake_share(share_proto=share_proto, 

1044 host="fake_host@fake_backend#fake_pool") 

1045 mock_ganar = self.mock_object( 

1046 rest_helper.RestHelper, 

1047 '_get_all_nfs_access_rest', 

1048 mock.Mock(return_value=[])) 

1049 mock_gacar = self.mock_object( 

1050 rest_helper.RestHelper, '_get_all_cifs_access_rest', 

1051 mock.Mock(return_value=[])) 

1052 self.driver.helper._clear_access(share) 

1053 expect_share_path = self.driver.helper._generate_share_path( 

1054 'manila_fakeid') 

1055 if share_proto == 'nfs': 

1056 mock_ganar.assert_called_once_with(expect_share_path) 

1057 else: 

1058 mock_gacar.assert_called_once_with(expect_share_path) 

1059 

1060 @ddt.data(constants.USER_NOT_EXIST, 

1061 constants.USER_EXIST, 

1062 constants.USER_FORMAT_ERROR) 

1063 def test__ensure_user(self, query_result): 

1064 mock_qu = self.mock_object(rest_helper.RestHelper, 

1065 '_query_user', 

1066 mock.Mock(return_value=query_result)) 

1067 mock_qg = self.mock_object( 

1068 rest_helper.RestHelper, 

1069 '_query_group', 

1070 mock.Mock(return_value=constants.GROUP_NOT_EXIST)) 

1071 mock_alg = self.mock_object(rest_helper.RestHelper, 

1072 '_add_localgroup') 

1073 mock_alu = self.mock_object(rest_helper.RestHelper, 

1074 '_add_localuser') 

1075 result = self.driver.helper._ensure_user('fake_user', 

1076 'fake_passwd', 

1077 'fake_group') 

1078 if query_result == constants.USER_NOT_EXIST: 

1079 mock_qg.assert_called_once_with('fake_group') 

1080 mock_alg.assert_called_once_with('fake_group') 

1081 mock_alu.assert_called_once_with('fake_user', 

1082 'fake_passwd', 

1083 'fake_group') 

1084 self.assertTrue(result) 

1085 elif query_result == constants.USER_EXIST: 

1086 self.assertTrue(result) 

1087 else: 

1088 self.assertFalse(result) 

1089 mock_qu.assert_called_once_with('fake_user') 

1090 

1091 def test__ensure_user_fail(self): 

1092 mock_qu = self.mock_object( 

1093 rest_helper.RestHelper, 

1094 '_query_user', 

1095 mock.Mock(return_value=constants.USER_NOT_EXIST)) 

1096 mock_qg = self.mock_object( 

1097 rest_helper.RestHelper, 

1098 '_query_group', 

1099 mock.Mock(return_value=constants.GROUP_FORMAT_ERROR)) 

1100 self.assertRaises(exception.InvalidInput, 

1101 self.driver.helper._ensure_user, 

1102 'fake_user', 

1103 'fake_passwd', 

1104 'fake_group') 

1105 mock_qu.assert_called_once_with('fake_user') 

1106 mock_qg.assert_called_once_with('fake_group') 

1107 

1108 def test__update_share_stats(self): 

1109 self.driver.helper.pools = ['fake_pool'] 

1110 mock_gap = self.mock_object(rest_helper.RestHelper, 

1111 '_get_all_pool', 

1112 mock.Mock(return_value='fake_result')) 

1113 mock_gpc = self.mock_object(macrosan_helper.MacrosanHelper, 

1114 '_get_pool_capacity', 

1115 mock.Mock(return_value={ 

1116 "totalcapacity": 10, 

1117 "freecapacity": 9, 

1118 "allocatedcapacity": 1, 

1119 })) 

1120 mock_uss = self.mock_object(driver.ShareDriver, '_update_share_stats') 

1121 

1122 self.driver._update_share_stats() 

1123 

1124 data = {} 

1125 data['vendor_name'] = self.driver.VENDOR 

1126 data['driver_version'] = self.driver.VERSION 

1127 data['storage_protocol'] = self.driver.PROTOCOL 

1128 data['share_backend_name'] = 'fake_share_backend_name' 

1129 data['pools'] = [{ 

1130 'pool_name': 'fake_pool', 

1131 'total_capacity_gb': 10, 

1132 'free_capacity_gb': 9, 

1133 'allocated_capacity_gb': 1, 

1134 'reserved_percentage': 0, 

1135 'reserved_snapshot_percentage': 0, 

1136 'reserved_share_extend_percentage': 0, 

1137 'dedupe': False, 

1138 'compression': False, 

1139 'qos': False, 

1140 'thin_provisioning': False, 

1141 'snapshot_support': False, 

1142 'create_share_from_snapshot_support': 

1143 False, 

1144 }] 

1145 mock_gap.assert_called_once() 

1146 mock_gpc.assert_called_once_with('fake_pool', 'fake_result') 

1147 mock_uss.assert_called_once_with(data) 

1148 

1149 def test__update_share_stats_pool_not_exist(self): 

1150 self.driver.helper.pools = ['fake_pool'] 

1151 self.mock_object(rest_helper.RestHelper, '_get_all_pool', 

1152 mock.Mock(return_value='fake_result')) 

1153 self.mock_object(macrosan_helper.MacrosanHelper, 

1154 '_get_pool_capacity', 

1155 mock.Mock(return_value={})) 

1156 self.assertRaises(exception.InvalidInput, 

1157 self.driver._update_share_stats 

1158 ) 

1159 

1160 def test__get_pool_capacity(self): 

1161 self.mock_object(macrosan_helper.MacrosanHelper, 

1162 '_find_pool_info', 

1163 mock.Mock(return_value={ 

1164 "name": "fake_pool", 

1165 "totalcapacity": "100.0G", 

1166 "allocatedcapacity": "22G", 

1167 "freecapacity": "78G", 

1168 "health": "ONLINE", 

1169 "rw": "off", 

1170 })) 

1171 res = self.driver.helper._get_pool_capacity("fake_pool", 

1172 "fake_result") 

1173 self.assertEqual(100, res['totalcapacity']) 

1174 self.assertEqual(78, res['freecapacity']) 

1175 self.assertEqual(22, res['allocatedcapacity']) 

1176 

1177 def test__generate_share_name(self): 

1178 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool") 

1179 result = self.driver.helper._generate_share_name(share) 

1180 self.assertEqual("manila_fakeid", result) 

1181 

1182 def test__format_name(self): 

1183 a = 'fake-1234567890-1234567890-1234567890' 

1184 expect = 'fake_1234567890_1234567890_1234' 

1185 result = self.driver.helper._format_name(a) 

1186 self.assertEqual(expect, result) 

1187 

1188 def test__generate_share_path(self): 

1189 share_name = 'manila_fakeid' 

1190 result = self.driver.helper._generate_share_path(share_name) 

1191 

1192 self.assertEqual(r'/manila_fakeid/manila_fakeid', result) 

1193 

1194 @ddt.data('nfs', 'cifs') 

1195 def test__get_location_path(self, share_proto): 

1196 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1" 

1197 result = self.driver.helper._get_location_path('fake_path', 

1198 'fake_name', 

1199 share_proto) 

1200 if share_proto == 'nfs': 

1201 expect = r'172.0.0.1:fake_path' 

1202 elif share_proto == 'cifs': 1202 ↛ 1204line 1202 didn't jump to line 1204 because the condition on line 1202 was always true

1203 expect = r'\\172.0.0.1\fake_name' 

1204 self.assertEqual(expect, result) 

1205 

1206 def test__get_share_instance_pnp_pool_error(self): 

1207 share = fake_share.fake_share( 

1208 share_proto="nfs", host="fake_host@fake_backend") 

1209 self.assertRaises(exception.InvalidHost, 

1210 self.driver.helper._get_share_instance_pnp, 

1211 share) 

1212 

1213 def test__get_share_instance_pnp_proto_error(self): 

1214 share = fake_share.fake_share( 

1215 share_proto="CephFS", host="fake_host@fake_backend#fake_pool") 

1216 self.assertRaises(exception.MacrosanBackendExeption, 

1217 self.driver.helper._get_share_instance_pnp, 

1218 share) 

1219 

1220 @ddt.data('2000000000', '2000000KB', '2000MB', '20GB', '2TB') 

1221 def test__unit_convert_toGB(self, capacity): 

1222 convert = {'2000000000': '%.0f' % (float(2000000000) / 1024 ** 3), 

1223 '2000000KB': '%.0f' % (float(2000000) / 1024 ** 2), 

1224 '2000MB': '%.0f' % (float(2000) / 1024), 

1225 '20GB': '%.0f' % float(20), 

1226 '2TB': '%.0f' % (float(2) * 1024)} 

1227 expect = float(convert[capacity]) 

1228 result = self.driver.helper._unit_convert_toGB(capacity) 

1229 self.assertEqual(expect, result) 

1230 

1231 @ddt.data('nfs', 'cifs') 

1232 def test__get_share(self, proto): 

1233 proto = proto.upper() 

1234 mock_gns = self.mock_object(rest_helper.RestHelper, 

1235 '_get_nfs_share', 

1236 mock.Mock(return_value={ 

1237 "path": "/manila_fakeid", 

1238 "clients": ["client"], 

1239 "protocol": "NFS" 

1240 })) 

1241 mock_gcs = self.mock_object(rest_helper.RestHelper, 

1242 '_get_cifs_share', 

1243 mock.Mock(return_value={ 

1244 "path": "fake_path", 

1245 "cifsname": "fake_cifsname", 

1246 "protocol": "CIFS", 

1247 "roList": ["fake_ro"], 

1248 "rwList": ["fake_rw"], 

1249 "allowList": ["fake_allow"], 

1250 "denyList": ["fake_deny"], 

1251 })) 

1252 expect_nfs = { 

1253 "path": "/manila_fakeid", 

1254 "clients": ["client"], 

1255 "protocol": "NFS"} 

1256 expect_cifs = { 

1257 "path": "fake_path", 

1258 "cifsname": "fake_cifsname", 

1259 "protocol": "CIFS", 

1260 "roList": ["fake_ro"], 

1261 "rwList": ["fake_rw"], 

1262 "allowList": ["fake_allow"], 

1263 "denyList": ["fake_deny"]} 

1264 result = self.driver.helper._get_share('fake_path', proto) 

1265 if proto == 'NFS': 

1266 mock_gns.assert_called_once_with('fake_path') 

1267 self.assertEqual(expect_nfs, result) 

1268 elif proto == 'CIFS': 1268 ↛ exitline 1268 didn't return from function 'test__get_share' because the condition on line 1268 was always true

1269 mock_gcs.assert_called_once_with('fake_path') 

1270 self.assertEqual(expect_cifs, result) 

1271 

1272 def test__find_pool_info(self): 

1273 pool_info = self.driver.helper._find_pool_info( 

1274 'fake_pool', 

1275 self.result_success_storage_pools) 

1276 self.assertIsNotNone(pool_info) 

1277 

1278 def test__find_pool_info_fail(self): 

1279 pool_info = self.driver.helper._find_pool_info( 

1280 'error_pool', 

1281 self.result_success_storage_pools) 

1282 expect = {} 

1283 self.assertEqual(expect, pool_info) 

1284 

1285 

1286@ddt.ddt 

1287class RestHelperTestCase(test.TestCase): 

1288 

1289 def setUp(self): 

1290 self.mock_object(CONF, '_check_required_opts') 

1291 super(RestHelperTestCase, self).setUp() 

1292 

1293 def _safe_get(opt): 

1294 return getattr(self.configuration, opt) 

1295 

1296 self.configuration = mock.Mock(spec=configuration.Configuration) 

1297 self.configuration.safe_get = mock.Mock(side_effect=_safe_get) 

1298 self.configuration.macrosan_nas_http_protocol = 'https' 

1299 self.configuration.macrosan_nas_ip = 'fake_ip' 

1300 self.configuration.macrosan_nas_port = 'fake_port' 

1301 self.configuration.macrosan_nas_prefix = 'nas' 

1302 self.configuration.macrosan_nas_username = 'fake_username' 

1303 self.configuration.macrosan_nas_password = 'fake_password' 

1304 self.configuration.macrosan_timeout = 60 

1305 self.configuration.macrosan_ssl_cert_verify = False 

1306 self.resthelper = rest_helper.RestHelper( 

1307 configuration=self.configuration) 

1308 self.post = 'POST' 

1309 self.get = 'GET' 

1310 self.delete = 'DELETE' 

1311 self.put = 'PUT' 

1312 self.fake_message = 'fake_message' 

1313 self.result_success = { 

1314 'code': 0, 

1315 'message': 'success', 

1316 'data': 'fake_data' 

1317 } 

1318 self.result_success_return_0 = { 

1319 'code': 0, 

1320 'message': 'success', 

1321 'data': '0' 

1322 } 

1323 self.result_success_return_1 = { 

1324 'code': 0, 

1325 'message': 'success', 

1326 'data': '1' 

1327 } 

1328 self.result_failed = { 

1329 'code': 1, 

1330 'message': 'failed', 

1331 'data': 'fake_data' 

1332 } 

1333 self.result_failed_not_exist = { 

1334 'code': constants.CODE_SOURCE_NOT_EXIST, 

1335 'message': 'failed', 

1336 'data': '', 

1337 } 

1338 self.result_success_storage_pools = { 

1339 'code': 0, 

1340 'message': 'success', 

1341 'data': [{ 

1342 'name': 'fake_pool', 

1343 'size': '1000.0G', 

1344 'allocated': '100G', 

1345 'free': '900G', 

1346 'health': 'ONLINE', 

1347 'rwStatus': 'off' 

1348 }] 

1349 } 

1350 

1351 @ddt.data( 

1352 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'}, 

1353 'method': 'POST'}, 

1354 {'url': 'fake_url', 'data': None, 

1355 'method': 'GET'}, 

1356 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'}, 

1357 'method': 'DELETE'}, 

1358 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'}, 

1359 'method': 'PUT'}, 

1360 ) 

1361 @ddt.unpack 

1362 def test_call(self, url, data, method): 

1363 self.resthelper._token = 'fake_token' 

1364 request_method = method.lower() 

1365 fake_response = FakeResponse(200, self.result_success) 

1366 mock_request = self.mock_object(requests, request_method, 

1367 mock.Mock(return_value=fake_response)) 

1368 self.resthelper.call(url, data, method) 

1369 expected_url = ('https://%(ip)s:%(port)s/%(rest)s/%(url)s' 

1370 % {'ip': 'fake_ip', 

1371 'port': 'fake_port', 

1372 'rest': 'nas', 

1373 'url': 'fake_url'}) 

1374 header = {'Authorization': 'fake_token'} 

1375 mock_request.assert_called_once_with( 

1376 expected_url, data=data, headers=header, 

1377 timeout=self.configuration.macrosan_timeout, 

1378 verify=False) 

1379 

1380 def test_call_method_fail(self): 

1381 self.resthelper._token = 'fake_token' 

1382 self.assertRaises(exception.ShareBackendException, 

1383 self.resthelper.call, 

1384 'fake_url', 

1385 'fake_data', 

1386 'error_method') 

1387 

1388 def test_call_token_fail(self): 

1389 self.resthelper._token = 'fake_token' 

1390 fake_result_fail = { 

1391 'code': 302, 

1392 'message': 'fake_message', 

1393 'data': 'fake_data' 

1394 } 

1395 self.mock_object(self.resthelper, 'do_request', 

1396 mock.Mock(return_value=fake_result_fail)) 

1397 self.assertRaises(exception.MacrosanBackendExeption, 

1398 self.resthelper.call, 

1399 'fake_url', 

1400 'fake_data', 

1401 self.post) 

1402 

1403 def test_call_token_none(self): 

1404 self.resthelper._token = None 

1405 self.mock_object(self.resthelper, 'do_request', 

1406 mock.Mock(return_value=self.result_success)) 

1407 mock_l = self.mock_object(self.resthelper, 'login', 

1408 mock.Mock(return_value='fake_token')) 

1409 self.resthelper.call('fake_url', 'fake_data', self.post) 

1410 mock_l.assert_called_once() 

1411 

1412 def test_call_token_expired(self): 

1413 self.resthelper._token = 'fake_token' 

1414 fake_result = { 

1415 'code': 301, 

1416 'message': 'token expired', 

1417 'data': 'fake_data' 

1418 } 

1419 self.mock_object( 

1420 self.resthelper, 'do_request', 

1421 mock.Mock(side_effect=[fake_result, self.result_success])) 

1422 mock_l = self.mock_object(self.resthelper, 'login', 

1423 mock.Mock(return_value='fake_token')) 

1424 self.resthelper.call('fake_url', 'fake_data', self.post) 

1425 mock_l.assert_called_once() 

1426 

1427 def test_call_fail(self): 

1428 self.resthelper._token = 'fake_token' 

1429 fake_response = FakeResponse(302, self.result_success) 

1430 self.mock_object(requests, 'post', 

1431 mock.Mock(return_value=fake_response)) 

1432 self.assertRaises(exception.NetworkException, 

1433 self.resthelper.call, 

1434 'fake_url', 

1435 'fake_data', 

1436 self.post) 

1437 

1438 def test_login(self): 

1439 fake_result = { 

1440 'code': 0, 

1441 'message': 'Login success', 

1442 'data': 'fake_token' 

1443 } 

1444 mock_rd = self.mock_object(self.resthelper, 'do_request', 

1445 mock.Mock(return_value=fake_result)) 

1446 self.resthelper.login() 

1447 login_data = {'userName': self.configuration.macrosan_nas_username, 

1448 'userPasswd': self.configuration.macrosan_nas_password} 

1449 mock_rd.assert_called_once_with('rest/token', login_data, 

1450 self.post) 

1451 self.assertEqual('fake_token', self.resthelper._token) 

1452 

1453 def test_login_fail(self): 

1454 mock_rd = self.mock_object(self.resthelper, 'do_request', 

1455 mock.Mock(return_value=self.result_failed)) 

1456 

1457 self.assertRaises(exception.ShareBackendException, 

1458 self.resthelper.login) 

1459 login_data = {'userName': self.configuration.macrosan_nas_username, 

1460 'userPasswd': self.configuration.macrosan_nas_password} 

1461 mock_rd.assert_called_once_with('rest/token', login_data, 

1462 self.post) 

1463 

1464 def test__assert_result_code(self): 

1465 self.resthelper._assert_result_code(self.result_success, 

1466 self.fake_message) 

1467 

1468 def test__assert_result_code_fail(self): 

1469 self.assertRaises(exception.ShareBackendException, 

1470 self.resthelper._assert_result_code, 

1471 self.result_failed, 

1472 self.fake_message) 

1473 

1474 def test__assert_result_data(self): 

1475 self.resthelper._assert_result_data(self.result_success, 

1476 self.fake_message) 

1477 

1478 def test__assert_result_data_fail(self): 

1479 fake_result = { 

1480 'code': 0, 

1481 'message': 'fake_message' 

1482 } 

1483 self.assertRaises(exception.ShareBackendException, 

1484 self.resthelper._assert_result_data, 

1485 fake_result, 

1486 self.fake_message) 

1487 

1488 def test__create_nfs_share(self): 

1489 mock_call = self.mock_object(self.resthelper, 

1490 'call') 

1491 self.mock_object(self.resthelper, 

1492 '_assert_result_code') 

1493 self.resthelper._create_nfs_share('fake_path') 

1494 url = 'rest/nfsShare' 

1495 data = { 

1496 'path': 'fake_path', 

1497 'authority': 'ro', 

1498 'accessClient': '192.0.2.0', 

1499 } 

1500 mock_call.assert_called_once_with(url, data, self.post) 

1501 

1502 def test__get_nfs_share(self): 

1503 fake_result = { 

1504 'code': 0, 

1505 'message': 'success', 

1506 'data': { 

1507 "path": "fake_path", 

1508 "clients": ["client"], 

1509 "protocol": "fake_protocol" 

1510 } 

1511 } 

1512 mock_call = self.mock_object(self.resthelper, 

1513 'call', 

1514 mock.Mock(return_value=fake_result)) 

1515 self.mock_object(self.resthelper, 

1516 '_assert_result_code') 

1517 result = self.resthelper._get_nfs_share('fake_path') 

1518 expect = { 

1519 "path": "fake_path", 

1520 "clients": ["client"], 

1521 "protocol": "fake_protocol" 

1522 } 

1523 self.assertEqual(expect, result) 

1524 url = 'rest/nfsShare?path=fake_path' 

1525 mock_call.assert_called_once_with(url, None, self.get) 

1526 

1527 def test__delete_nfs_share(self): 

1528 mock_call = self.mock_object(self.resthelper, 

1529 'call') 

1530 self.mock_object(self.resthelper, 

1531 '_assert_result_code') 

1532 self.resthelper._delete_nfs_share('fake_path') 

1533 url = 'rest/nfsShare?path=fake_path' 

1534 mock_call.assert_called_once_with(url, None, self.delete) 

1535 

1536 def test__create_cifs_share(self): 

1537 mock_call = self.mock_object(self.resthelper, 

1538 'call') 

1539 self.mock_object(self.resthelper, 

1540 '_assert_result_code') 

1541 self.resthelper._create_cifs_share('fake_name', 

1542 'fake_path', 

1543 ['fake_user'], 

1544 ['0']) 

1545 url = 'rest/cifsShare' 

1546 data = { 

1547 'path': 'fake_path', 

1548 'cifsName': 'fake_name', 

1549 'cifsDescription': '', 

1550 'RoList': [], 

1551 'RoListType': [], 

1552 'RwList': ['fake_user'], 

1553 'RwListType': ['0'], 

1554 'allowList': [], 

1555 'denyList': [], 

1556 } 

1557 mock_call.assert_called_once_with(url, data, self.post) 

1558 

1559 def test__get_cifs_share(self): 

1560 fake_result = { 

1561 'code': 0, 

1562 'message': 'success', 

1563 'data': { 

1564 "path": "fake_path", 

1565 "cifsname": "fake_cifsname", 

1566 "protocol": "fake_protocol", 

1567 "roList": ["fake_ro"], 

1568 "rwList": ["fake_rw"], 

1569 "allowList": ["fake_allow"], 

1570 "denyList": ["fake_deny"] 

1571 } 

1572 } 

1573 mock_call = self.mock_object(self.resthelper, 

1574 'call', 

1575 mock.Mock(return_value=fake_result)) 

1576 self.mock_object(self.resthelper, 

1577 '_assert_result_code') 

1578 result = self.resthelper._get_cifs_share('fake_path') 

1579 expect = { 

1580 "path": "fake_path", 

1581 "cifsname": "fake_cifsname", 

1582 "protocol": "fake_protocol", 

1583 "roList": ["fake_ro"], 

1584 "rwList": ["fake_rw"], 

1585 "allowList": ["fake_allow"], 

1586 "denyList": ["fake_deny"] 

1587 } 

1588 self.assertEqual(expect, result) 

1589 url = 'rest/cifsShare?path=fake_path' 

1590 mock_call.assert_called_once_with(url, None, self.get) 

1591 

1592 def test__delete_cifs_share(self): 

1593 mock_call = self.mock_object(self.resthelper, 

1594 'call') 

1595 self.mock_object(self.resthelper, 

1596 '_assert_result_code') 

1597 self.resthelper._delete_cifs_share('fake_name', 'fake_path') 

1598 url = 'rest/cifsShare?path=fake_path&cifsName=fake_name' 

1599 mock_call.assert_called_once_with(url, None, self.delete) 

1600 

1601 def test__update_share_size(self): 

1602 mock_call = self.mock_object(self.resthelper, 

1603 'call') 

1604 self.mock_object(self.resthelper, 

1605 '_assert_result_code') 

1606 self.resthelper._update_share_size('fake_filesystem', '2GB') 

1607 url = 'rest/filesystem/fake_filesystem' 

1608 data = { 

1609 'capacity': '2GB', 

1610 } 

1611 mock_call.assert_called_once_with(url, data, self.put) 

1612 

1613 def test___create_filesystem(self): 

1614 mock_call = self.mock_object(self.resthelper, 

1615 'call') 

1616 self.mock_object(self.resthelper, 

1617 '_assert_result_code') 

1618 self.resthelper._create_filesystem('fake_filesystem', 

1619 'fake_pool', 

1620 '1GB') 

1621 url = 'rest/filesystem' 

1622 data = { 

1623 'fsName': 'fake_filesystem', 

1624 'poolName': 'fake_pool', 

1625 'createType': '0', 

1626 'fileSystemQuota': '1GB', 

1627 'fileSystemReserve': '1GB', 

1628 'wormStatus': 0, 

1629 'defaultTimeStatus': 0, 

1630 'defaultTimeNum': 0, 

1631 'defaultTimeUnit': 'year', 

1632 'isAutoLock': 0, 

1633 'isAutoDelete': 0, 

1634 'lockTime': 0 

1635 } 

1636 mock_call.assert_called_once_with(url, data, self.post) 

1637 

1638 def test__delete_filesystem(self): 

1639 mock_call = self.mock_object(self.resthelper, 

1640 'call') 

1641 self.mock_object(self.resthelper, 

1642 '_assert_result_code') 

1643 self.resthelper._delete_filesystem('fake_filesystem') 

1644 url = 'rest/filesystem/fake_filesystem' 

1645 mock_call.assert_called_once_with(url, None, self.delete) 

1646 

1647 def test__get_filesystem(self): 

1648 fake_result = { 

1649 'code': 0, 

1650 'message': 'success', 

1651 'data': { 

1652 'name': 'fake_filesystem', 

1653 'poolName': 'fake_pool', 

1654 } 

1655 } 

1656 mock_call = self.mock_object(self.resthelper, 

1657 'call', 

1658 mock.Mock(return_value=fake_result)) 

1659 self.mock_object(self.resthelper, 

1660 '_assert_result_code') 

1661 result = self.resthelper._get_filesystem('fake_filesystem') 

1662 expect = { 

1663 'name': 'fake_filesystem', 

1664 'poolName': 'fake_pool', 

1665 } 

1666 self.assertEqual(expect, result) 

1667 url = 'rest/filesystem/fake_filesystem' 

1668 mock_call.assert_called_once_with(url, None, self.get) 

1669 

1670 def test__create_filesystem_dir(self): 

1671 mock_call = self.mock_object(self.resthelper, 

1672 'call') 

1673 self.mock_object(self.resthelper, 

1674 '_assert_result_code') 

1675 self.resthelper._create_filesystem_dir('/fake_path/fake_dir') 

1676 url = 'rest/fileDir' 

1677 data = { 

1678 'path': '/fake_path', 

1679 'dirName': 'fake_dir', 

1680 } 

1681 mock_call.assert_called_once_with(url, data, self.post) 

1682 

1683 def test__delete_filesystem_dir(self): 

1684 mock_call = self.mock_object(self.resthelper, 

1685 'call') 

1686 self.mock_object(self.resthelper, 

1687 '_assert_result_code') 

1688 self.resthelper._delete_filesystem_dir('/fake_path/fake_dir') 

1689 url = 'rest/fileDir?path=/fake_path&dirName=fake_dir' 

1690 mock_call.assert_called_once_with(url, None, self.delete) 

1691 

1692 @ddt.data('nfs', 'cifs') 

1693 def test__allow_access_rest(self, share_proto): 

1694 share_proto = share_proto.upper() 

1695 mock_anar = self.mock_object(self.resthelper, 

1696 '_allow_nfs_access_rest') 

1697 mock_acar = self.mock_object(self.resthelper, 

1698 '_allow_cifs_access_rest') 

1699 self.resthelper._allow_access_rest('fake_path', 'fake_access', 

1700 'rw', share_proto) 

1701 if share_proto == 'NFS': 

1702 mock_anar.assert_called_once_with('fake_path', 

1703 'fake_access', 

1704 'rw') 

1705 elif share_proto == 'CIFS': 1705 ↛ exitline 1705 didn't return from function 'test__allow_access_rest' because the condition on line 1705 was always true

1706 mock_acar.assert_called_once_with('fake_path', 

1707 'fake_access', 

1708 'rw') 

1709 

1710 def test__allow_access_rest_proto_error(self): 

1711 self.assertRaises(exception.InvalidInput, 

1712 self.resthelper._allow_access_rest, 

1713 'fake_path', 

1714 'fake_access', 

1715 'rw', 

1716 'error_proto') 

1717 

1718 def test__allow_nfs_access_rest(self): 

1719 mock_call = self.mock_object( 

1720 self.resthelper, 

1721 'call', 

1722 mock.Mock(return_value=self.result_success)) 

1723 self.mock_object(self.resthelper, 

1724 '_assert_result_code') 

1725 self.resthelper._allow_nfs_access_rest('fake_path', '172.0.0.1', 'rw') 

1726 url = 'rest/nfsShareClient' 

1727 data = { 

1728 'path': 'fake_path', 

1729 'client': '172.0.0.1', 

1730 'authority': 'rw', 

1731 } 

1732 mock_call.assert_called_once_with(url, data, self.post) 

1733 

1734 @ddt.data( 

1735 {'access_to': 'fake_user', 

1736 'group': False}, 

1737 {'access_to': 'fake_group', 

1738 'group': True}, 

1739 {'access_to': '/fake_user', 

1740 'group': False}, 

1741 {'access_to': '/fake_group', 

1742 'group': True} 

1743 ) 

1744 @ddt.unpack 

1745 def test__allow_cifs_access_rest(self, access_to, group): 

1746 ug_type_list = { 

1747 'localUser': '0', 

1748 'localGroup': '1', 

1749 'adUser': '2', 

1750 'adGroup': '3', 

1751 } 

1752 if not group: 

1753 mock_call = self.mock_object( 

1754 self.resthelper, 

1755 'call', 

1756 mock.Mock(return_value=self.result_success)) 

1757 else: 

1758 mock_call = self.mock_object( 

1759 self.resthelper, 

1760 'call', 

1761 mock.Mock(side_effect=[self.result_failed_not_exist, 

1762 self.result_success])) 

1763 self.mock_object(self.resthelper, 

1764 '_assert_result_code') 

1765 self.resthelper._allow_cifs_access_rest('fake_path', 

1766 access_to, 

1767 'rw') 

1768 url = 'rest/cifsShareClient' 

1769 actual_type = ug_type_list["localUser"] 

1770 if '/' not in access_to: 

1771 if not group: 

1772 actual_type = ug_type_list["localUser"] 

1773 access_to = access_to 

1774 else: 

1775 if not group: 

1776 actual_type = ug_type_list["adUser"] 

1777 access_to = access_to[access_to.index('/') + 1:] 

1778 data = { 

1779 'path': 'fake_path', 

1780 'right': 'rw', 

1781 'ugName': access_to, 

1782 'ugType': actual_type, 

1783 } 

1784 if not group: 

1785 mock_call.assert_called_once_with(url, data, self.post) 

1786 else: 

1787 mock_call.assert_called() 

1788 

1789 def test__allow_cifs_access_rest_fail(self): 

1790 mock_call = self.mock_object( 

1791 self.resthelper, 

1792 'call', 

1793 mock.Mock(side_effect=[self.result_failed_not_exist, 

1794 self.result_failed_not_exist])) 

1795 self.assertRaises(exception.InvalidShare, 

1796 self.resthelper._allow_cifs_access_rest, 

1797 'fake_path', 

1798 'fake_user', 

1799 'rw') 

1800 mock_call.assert_called() 

1801 

1802 def test__get_access_from_nfs_share(self): 

1803 fake_result = { 

1804 'code': 0, 

1805 'message': 'success', 

1806 'data': { 

1807 "path": "fake_path", 

1808 "clientName": "fake_client", 

1809 "accessRight": "rw", 

1810 } 

1811 } 

1812 mock_call = self.mock_object(self.resthelper, 

1813 'call', 

1814 mock.Mock(return_value=fake_result)) 

1815 self.mock_object(self.resthelper, 

1816 '_assert_result_code') 

1817 result = self.resthelper._get_access_from_nfs_share('fake_path', 

1818 'fake_client') 

1819 expect = { 

1820 "path": "fake_path", 

1821 "clientName": "fake_client", 

1822 "accessRight": "rw", 

1823 } 

1824 self.assertEqual(expect, result) 

1825 url = 'rest/nfsShareClient?path=fake_path&client=fake_client' 

1826 mock_call.assert_called_once_with(url, None, self.get) 

1827 

1828 @ddt.data({'access_to': 'fake_user', 

1829 'ug_type': '0', 

1830 'code': 0, 

1831 'group': False}, 

1832 {'access_to': 'fake_user', 

1833 'ug_type': None, 

1834 'code': 0, 

1835 'group': False}, 

1836 {'access_to': 'fake_group', 

1837 'ug_type': None, 

1838 'code': 0, 

1839 'group': True}, 

1840 {'access_to': 'fake_user', 

1841 'ug_type': None, 

1842 'code': 4, 

1843 'group': False}, 

1844 {'access_to': '/fake_user', 

1845 'ug_type': None, 

1846 'code': 0, 

1847 'group': False}, 

1848 {'access_to': '/fake_group', 

1849 'ug_type': None, 

1850 'code': 0, 

1851 'group': True}, 

1852 {'access_to': '/fake_user', 

1853 'ug_type': None, 

1854 'code': 4, 

1855 'group': False}) 

1856 @ddt.unpack 

1857 def test__get_access_from_cifs_share(self, 

1858 access_to, ug_type, code, group): 

1859 fake_result_failed = { 

1860 'code': code, 

1861 'message': 'failed', 

1862 'data': {} 

1863 } 

1864 fake_result = { 

1865 'code': code, 

1866 'message': 'success', 

1867 'data': { 

1868 'path': 'fake_path', 

1869 'ugName': 'fake_user', 

1870 'ugType': '0', 

1871 'accessRight': 'rw' 

1872 } 

1873 } 

1874 fake_result_group = { 

1875 'code': code, 

1876 'message': 'success', 

1877 'data': { 

1878 'path': 'fake_path', 

1879 'ugName': 'fake_group', 

1880 'ugType': '1', 

1881 'accessRight': 'rw' 

1882 } 

1883 } 

1884 if code == 4: 

1885 fake_result = fake_result_failed 

1886 ug_type_list = { 

1887 'localUser': '0', 

1888 'localGroup': '1', 

1889 'adUser': '2', 

1890 'adGroup': '3', 

1891 } 

1892 expect = { 

1893 'path': 'fake_path', 

1894 'ugName': 'fake_user', 

1895 'ugType': '0', 

1896 'accessRight': 'rw' 

1897 } 

1898 expect_group = { 

1899 'path': 'fake_path', 

1900 'ugName': 'fake_group', 

1901 'ugType': '1', 

1902 'accessRight': 'rw' 

1903 } 

1904 if '/' in access_to: 

1905 expect['ugType'] = '2' 

1906 expect_group['ugType'] = '3' 

1907 fake_result['data']['ugType'] = '2' 

1908 fake_result_group['data']['ugType'] = '3' 

1909 if ug_type is not None: 

1910 mock_call = self.mock_object(self.resthelper, 

1911 'call', 

1912 mock.Mock(return_value=fake_result)) 

1913 else: 

1914 if not group: 

1915 mock_call = self.mock_object( 

1916 self.resthelper, 

1917 'call', 

1918 mock.Mock(return_value=fake_result)) 

1919 else: 

1920 mock_call = self.mock_object( 

1921 self.resthelper, 

1922 'call', 

1923 mock.Mock(side_effect=[fake_result_failed, 

1924 fake_result_group])) 

1925 

1926 self.mock_object(self.resthelper, 

1927 '_assert_result_code') 

1928 result = self.resthelper._get_access_from_cifs_share('fake_path', 

1929 access_to, 

1930 ug_type) 

1931 if ug_type: 

1932 self.assertEqual(expect, result) 

1933 url = f'rest/cifsShareClient?path=fake_path&' \ 

1934 f'ugName={access_to}&ugType={ug_type}' 

1935 mock_call.assert_called_once_with(url, None, self.get) 

1936 else: 

1937 if '/' not in access_to: 

1938 if not group: 

1939 actual_type = ug_type_list["localUser"] 

1940 actual_access = access_to 

1941 else: 

1942 if not group: 

1943 actual_type = ug_type_list["adUser"] 

1944 actual_access = access_to[access_to.index('/') + 1:] 

1945 if code == 4: 

1946 self.assertIsNone(result) 

1947 else: 

1948 if not group: 

1949 self.assertEqual(expect, result) 

1950 url = f'rest/cifsShareClient?path=fake_path&' \ 

1951 f'ugName={actual_access}&' \ 

1952 f'ugType={actual_type}' 

1953 mock_call.assert_called_once_with(url, None, self.get) 

1954 else: 

1955 self.assertEqual(expect_group, result) 

1956 mock_call.assert_called() 

1957 

1958 def test__get_all_nfs_access_rest(self): 

1959 fake_result = { 

1960 'code': 0, 

1961 'message': 'success', 

1962 'data': [ 

1963 { 

1964 'path': 'fake_path', 

1965 'clientName': '172.0.0.1', 

1966 'accessRight': 'rw' 

1967 }, 

1968 { 

1969 'path': 'default_path', 

1970 'clientName': '172.0.0.2', 

1971 'accessRight': 'rw' 

1972 }] 

1973 } 

1974 mock_call = self.mock_object(self.resthelper, 

1975 'call', 

1976 mock.Mock(return_value=fake_result)) 

1977 self.mock_object(self.resthelper, 

1978 '_assert_result_code') 

1979 result = self.resthelper._get_all_nfs_access_rest( 

1980 '/manila_fakeid/manila_fakeid') 

1981 expect = [ 

1982 { 

1983 'share_path': 'fake_path', 

1984 'access_to': '172.0.0.1', 

1985 'access_level': 'rw' 

1986 }, 

1987 { 

1988 'share_path': 'default_path', 

1989 'access_to': '172.0.0.2', 

1990 'access_level': 'rw' 

1991 }] 

1992 self.assertEqual(expect, result) 

1993 url = 'rest/allNfsShareClient?path=/manila_fakeid/manila_fakeid' 

1994 mock_call.assert_called_once_with(url, None, self.get) 

1995 

1996 def test__get_all_cifs_access_rest(self): 

1997 fake_result = { 

1998 'code': 0, 

1999 'message': 'success', 

2000 'data': [ 

2001 { 

2002 'path': 'fake_path', 

2003 'ugName': 'user_name', 

2004 'ugType': '0', 

2005 'accessRight': 'rw' 

2006 }, 

2007 { 

2008 'path': 'default_path', 

2009 'ugName': 'manilanobody', 

2010 'ugType': '0', 

2011 'accessRight': 'rw' 

2012 }] 

2013 } 

2014 mock_call = self.mock_object(self.resthelper, 

2015 'call', 

2016 mock.Mock(return_value=fake_result)) 

2017 self.mock_object(self.resthelper, 

2018 '_assert_result_code') 

2019 result = self.resthelper._get_all_cifs_access_rest( 

2020 '/manila_fakeid/manila_fakeid') 

2021 expect = [ 

2022 { 

2023 'share_path': 'fake_path', 

2024 'access_to': 'user_name', 

2025 'ugType': '0', 

2026 'access_level': 'rw' 

2027 }, 

2028 { 

2029 'share_path': 'default_path', 

2030 'access_to': 'manilanobody', 

2031 'ugType': '0', 

2032 'access_level': 'rw' 

2033 }] 

2034 self.assertEqual(expect, result) 

2035 url = 'rest/allCifsShareClient?path=/manila_fakeid/manila_fakeid' 

2036 mock_call.assert_called_once_with(url, None, self.get) 

2037 

2038 def test__change_nfs_access_rest(self): 

2039 mock_call = self.mock_object(self.resthelper, 

2040 'call') 

2041 self.mock_object(self.resthelper, 

2042 '_assert_result_code') 

2043 self.resthelper._change_nfs_access_rest( 

2044 '/manila_fakeid/manila_fakeid', '172.0.0.1', 'rw') 

2045 url = 'rest/nfsShareClient' 

2046 data = { 

2047 'path': '/manila_fakeid/manila_fakeid', 

2048 'oldNfsClientName': '172.0.0.1', 

2049 'clientName': '', 

2050 'accessRight': 'rw', 

2051 'allSquash': '', 

2052 'rootSquash': '', 

2053 'secure': '', 

2054 'anonuid': '', 

2055 'anongid': '', 

2056 } 

2057 mock_call.assert_called_once_with(url, data, self.put) 

2058 

2059 def test__change_cifs_access_rest(self): 

2060 mock_call = self.mock_object(self.resthelper, 

2061 'call') 

2062 self.mock_object(self.resthelper, 

2063 '_assert_result_code') 

2064 self.resthelper._change_cifs_access_rest( 

2065 '/manila_fakeid/manila_fakeid', '/fake_user', 'rw', '0') 

2066 url = 'rest/cifsShareClient' 

2067 data = { 

2068 'path': '/manila_fakeid/manila_fakeid', 

2069 'right': 'rw', 

2070 'ugName': 'fake_user', 

2071 'ugType': '0', 

2072 } 

2073 mock_call.assert_called_once_with(url, data, self.put) 

2074 

2075 def test__delete_nfs_access_rest(self): 

2076 mock_call = self.mock_object(self.resthelper, 

2077 'call') 

2078 self.mock_object(self.resthelper, 

2079 '_assert_result_code') 

2080 self.resthelper._delete_nfs_access_rest( 

2081 '/manila_fakeid/manila_fakeid', '*') 

2082 url = 'rest/nfsShareClient?path=/manila_fakeid/manila_fakeid&client=*' 

2083 mock_call.assert_called_once_with(url, None, self.delete) 

2084 

2085 def test__delete_cifs_access_rest(self): 

2086 mock_call = self.mock_object(self.resthelper, 

2087 'call') 

2088 self.mock_object(self.resthelper, 

2089 '_assert_result_code') 

2090 self.resthelper._delete_cifs_access_rest( 

2091 '/manila_fakeid/manila_fakeid', 'fake_user', '0') 

2092 url = 'rest/cifsShareClient?path=/manila_fakeid/manila_fakeid' \ 

2093 '&ugName=fake_user&ugType=0' 

2094 mock_call.assert_called_once_with(url, None, self.delete) 

2095 

2096 def test__get_nfs_service_status(self): 

2097 fake_result = { 

2098 'code': 0, 

2099 'message': 'success', 

2100 'data': { 

2101 'serviceStatus': constants.NFS_ENABLED, 

2102 'nfs3Status': constants.NFS_SUPPORTED, 

2103 'nfs4Status': constants.NFS_SUPPORTED 

2104 } 

2105 } 

2106 mock_call = self.mock_object(self.resthelper, 

2107 'call', 

2108 mock.Mock(return_value=fake_result)) 

2109 self.mock_object(self.resthelper, 

2110 '_assert_result_code') 

2111 result = self.resthelper._get_nfs_service_status() 

2112 expect = { 

2113 'serviceStatus': constants.NFS_ENABLED, 

2114 'nfs3Status': constants.NFS_SUPPORTED, 

2115 'nfs4Status': constants.NFS_SUPPORTED 

2116 } 

2117 self.assertEqual(expect, result) 

2118 url = 'rest/nfsService' 

2119 mock_call.assert_called_once_with(url, None, self.get) 

2120 

2121 def test__start_nfs_service(self): 

2122 mock_call = self.mock_object(self.resthelper, 

2123 'call') 

2124 self.mock_object(self.resthelper, 

2125 '_assert_result_code') 

2126 self.resthelper._start_nfs_service() 

2127 url = 'rest/nfsService' 

2128 data = { 

2129 "openStatus": "1", 

2130 } 

2131 mock_call.assert_called_once_with(url, data, self.put) 

2132 

2133 def test__config_nfs_service(self): 

2134 mock_call = self.mock_object(self.resthelper, 

2135 'call') 

2136 self.mock_object(self.resthelper, 

2137 '_assert_result_code') 

2138 self.resthelper._config_nfs_service() 

2139 url = 'rest/nfsConfig' 

2140 data = { 

2141 'configNfs3': "yes", 

2142 'configNfs4': "yes", 

2143 } 

2144 mock_call.assert_called_once_with(url, data, self.put) 

2145 

2146 def test__get_cifs_service_status(self): 

2147 mock_call = self.mock_object( 

2148 self.resthelper, 

2149 'call', 

2150 mock.Mock(return_value=self.result_success_return_1)) 

2151 self.mock_object(self.resthelper, 

2152 '_assert_result_code') 

2153 result = self.resthelper._get_cifs_service_status() 

2154 self.assertEqual('1', result) 

2155 url = 'rest/cifsService' 

2156 mock_call.assert_called_once_with(url, None, self.get) 

2157 

2158 def test__start_cifs_service(self): 

2159 mock_call = self.mock_object(self.resthelper, 

2160 'call') 

2161 self.mock_object(self.resthelper, 

2162 '_assert_result_code') 

2163 self.resthelper._start_cifs_service() 

2164 url = 'rest/cifsService' 

2165 data = { 

2166 'openStatus': '1', 

2167 } 

2168 mock_call.assert_called_once_with(url, data, self.put) 

2169 

2170 def test__config_cifs_service(self): 

2171 mock_call = self.mock_object(self.resthelper, 

2172 'call') 

2173 self.mock_object(self.resthelper, 

2174 '_assert_result_code') 

2175 self.resthelper._config_cifs_service() 

2176 url = 'rest/cifsConfig' 

2177 data = { 

2178 'workName': 'manila', 

2179 'description': '', 

2180 'access_way': 'user', 

2181 'isCache': 'no', 

2182 'adsName': '', 

2183 'adsIP': '', 

2184 'adsUSER': '', 

2185 'adsPASSWD': '', 

2186 'allowList': [], 

2187 'denyList': [], 

2188 } 

2189 mock_call.assert_called_once_with(url, data, self.put) 

2190 

2191 def test__get_all_pool(self): 

2192 mock_call = self.mock_object( 

2193 self.resthelper, 

2194 'call', 

2195 mock.Mock(return_value=self.result_success_storage_pools)) 

2196 self.mock_object(self.resthelper, 

2197 '_assert_result_code') 

2198 result = self.resthelper._get_all_pool() 

2199 self.assertEqual(self.result_success_storage_pools, result) 

2200 url = 'rest/storagepool' 

2201 mock_call.assert_called_once_with(url, None, self.get) 

2202 

2203 def test__query_user(self): 

2204 mock_call = self.mock_object( 

2205 self.resthelper, 

2206 'call', 

2207 mock.Mock(return_value=self.result_success_return_0)) 

2208 self.mock_object(self.resthelper, 

2209 '_assert_result_code') 

2210 result = self.resthelper._query_user('fake_user') 

2211 self.assertEqual('0', result) 

2212 url = 'rest/user/fake_user' 

2213 mock_call.assert_called_once_with(url, None, self.get) 

2214 

2215 def test__add_localuser(self): 

2216 mock_call = self.mock_object(self.resthelper, 

2217 'call') 

2218 self.mock_object(self.resthelper, 

2219 '_assert_result_code') 

2220 self.resthelper._add_localuser('fake_user', 

2221 'fake_passwd', 'fake_group') 

2222 url = 'rest/localUser' 

2223 data = { 

2224 'userName': 'fake_user', 

2225 'mgGroup': 'fake_group', 

2226 'userPasswd': 'fake_passwd', 

2227 'unusedGroup': []} 

2228 mock_call.assert_called_once_with(url, data, self.post) 

2229 

2230 def test__query_group(self): 

2231 mock_call = self.mock_object( 

2232 self.resthelper, 

2233 'call', 

2234 mock.Mock(return_value=self.result_success_return_0)) 

2235 self.mock_object(self.resthelper, 

2236 '_assert_result_code') 

2237 result = self.resthelper._query_group('fake_group') 

2238 self.assertEqual('0', result) 

2239 url = 'rest/group/fake_group' 

2240 mock_call.assert_called_once_with(url, None, self.get) 

2241 

2242 def test__add_localgroup(self): 

2243 mock_call = self.mock_object(self.resthelper, 

2244 'call') 

2245 self.mock_object(self.resthelper, 

2246 '_assert_result_code') 

2247 self.resthelper._add_localgroup('fake_group') 

2248 url = 'rest/localGroup' 

2249 data = {'groupName': 'fake_group'} 

2250 mock_call.assert_called_once_with(url, data, self.post)